expliot.plugins.mqtt.mqttauth

Test the authentication of a MQTT broker.

Attributes

DEFAULT_MQTT_PORT

MQTT_REFERENCE

Classes

SimpleMqttClient

A wrapper around publish and subscribe methods.

TCategory

Representation of Test Category.

Test

Representation of Test.

TLog

Representation of a Test Log.

TTarget

Representation of Test Target class.

MqttAuth

Test the authentication of a MQTT broker.

Functions

readlines(file)

Helper method for reading one line at a time from a file.

Module Contents

expliot.plugins.mqtt.mqttauth.readlines(file)

Helper method for reading one line at a time from a file.

It will yielding it for loops. The file is closed automatically even if the caller exits the loop early (break, exception, etc).

Parameters:

file – The file to read data from.

Returns:

yield a line in a loop

expliot.plugins.mqtt.mqttauth.DEFAULT_MQTT_PORT = 1883
class expliot.plugins.mqtt.mqttauth.SimpleMqttClient

A wrapper around publish and subscribe methods.

It also few implements helper methods.

static sub(topics, **kwargs)

Wrapper around paho-mqtt subscribe.simple() method.

For details on arguments, please refer paho/mqtt/subscribe.py in paho-mqtt project (https://pypi.org/project/paho-mqtt/).

Parameters:
  • topics – Either a string containing a single topic or a list containing multiple topics

  • kwargs – subscribe.simple() keyword arguments

Returns:

List of msg_count messages (from the topics subscribed to) received from the broker. msg_count subscribe.simple() argument is the count of messages to retrieve.

static pub(topic, **kwargs)

Wrapper around paho-mqtt publish.single() method.

For details on arguments, please refer paho/mqtt/publish.py in paho-mqtt project (https://pypi.org/project/paho-mqtt/).

Parameters:
  • topic – Topic to which the messahed will be published.

  • kwargs – publish.single() keyword arguments

Returns:

static pubmultiple(msgs, **kwargs)

Wrapper around paho-mqtt publish.multiple() method.

For details on arguments, please refer paho/mqtt/publish.py in paho-mqtt project (https://pypi.org/project/paho-mqtt/).

Parameters:
  • msgs

    List of messages to publish. Based on paho-mqtt doc,

    each message can either be:

    1. dict: msg = {‘topic’:”<topic>”, ‘payload’:”<payload>”, ‘qos’:<qos>

    2. tuple: (“<topic>”, “<payload>”, qos, retain)

  • kwargs – publish.multiple() keyword arguments

Returns:

static connauth(host, client_id=None, user=None, passwd=None, **kw)

Help to check if a client can connect to a broker.

Test for specific client ID and/or credentials.

Parameters:
  • host – Host to connect to

  • client_id – Client ID to use. If not specified paho-mqtt generates a random ID

  • user – User name of the client. If None or empty, connection is attempted without username and password

  • passwd – Password of the client. If None, only user name is sent

  • kw – Client.connect() keyword arguments (excluding host)

Returns:

Two comma separated values. The result code and its string representation

static _on_connauth(client, userdata, flags, return_code)

Execute method for paho-mqtt client.

The arguments are passed by client object. Details of the arguments are documented in paho/mqtt/client.py (https://pypi.org/project/paho-mqtt/.

This method is internally used for connauth().

Parameters:
  • client – The client instance for this callback

  • userdata – The private user data as set in Client() or userdata_set()

  • flags – Response flags sent by the broker

  • return_code – The connection result

Returns:

None

class expliot.plugins.mqtt.mqttauth.TCategory(tech, iface, action)

Bases: namedtuple('TCategory', 'tech, iface, action')

Representation of Test Category.

The class that defines the category of the test case. It is part of the Test class member _category. It can be used to identify the type of test or search for a specific category. It is a namedtuple that defines three attributes (for categorizing test cases).

  1. tech: What technology does the test use

  2. iface: Interface of the test i.e. whether it is for software or hardware

  3. action: What action does the test perform i.e. is it an exploit or a

    recon test for example.

COAP = 'coap'
DICOM = 'dicom'
HTTP = 'http'
MDNS = 'mdns'
MODBUS = 'modbus'
MQTT = 'mqtt'
TCP = 'tcp'
TLS = 'tls'
UDP = 'udp'
UPNP = 'upnp'
BLE = 'ble'
IEEE802154 = '802154'
ZIGBEE = 'zigbee'
CAN = 'can'
I2C = 'i2c'
JTAG = 'jtag'
SPI = 'spi'
UART = 'uart'
CRYPTO = 'crypto'
FW = 'firmware'
DISCLOSURE = 'disclosure'
ZB_AUDITOR = 'zbauditor'
BUS_AUDITOR = 'busauditor'
FW_AUDITOR = 'fwauditor'
NMAP = 'nmap'
_tech
HW = 'hardware'
RD = 'radio'
SW = 'software'
_interfaces
ANALYSIS = 'analysis'
COMPLIANCE = 'compliance'
DISCOVERY = 'discovery'
EXPLOIT = 'exploit'
FUZZ = 'fuzz'
RECON = 'recon'
_actions
class expliot.plugins.mqtt.mqttauth.Test(**kwargs)

Representation of Test.

The Base class for test cases (plugins). It defines the basic interface and basic implementation for the test cases. All test case plugins need to inherit from a test class derived from this class or this class itself depending on the purpose of the test case.

pre()

Action to take before the test.

post()

Action to take after the test.

execute()

Execute the test.

intro()

Show the intro for test.

output_dict_iter(cblog, robj, rlevel, key=None, value=None)

Callback method for recurse_list_dict().

It iterates over the dict

output passed from a plugin to output_handler(). It performs two operations on the dict

  1. If the output data is to be TLog(ged) (LOGPRETTY) on the console, then log the data recursively from the dict.

  2. Convert any bytes or bytearray objects in the dict to binary string and update the original dict object itself.

Args:
cblog (dict): Contains logging information i.e. to log the data or not?

and the Log prefix type.

robj (list or dict): The list or dict object at the specified recursion

level. This is updated by this callback i.e. bytes and bytearray objects found are converted to binary strings.

rlevel (int): The current recursion level at which this callback

instance is called.

key (str): The key if the robj is a dict. value (can be any type): 1. The value of the key if robj is a dict or

  1. A value from the robj if it is a list

Returns:

Nothing

output_handler(tlogtype=TLog.SUCCESS, msg=None, logkwargs=LOGPRETTY, **kwargs)

Handle the Test execution output data.

  • Add(append) data (dict) as an item in the TResult output (list).

  • And/or Log (print) the output

Args:
tlogtype (int): TLog prefix type to use i.e. Success, fail etc.

Check TLog class for prefix details.

msg (str): Specify a message to be logged, if any, apart from

output data.

logkwargs=LOGPRETTY(int): There are three options for kwargs logging

LOGPRETTY(0) - formatted logging for dict or list. LOGNORMAL(1) - Direct print of dict or list as is. LOGNO(2) - Do not log kwargs.

**kwargs: plugin output keyword arguments (or a **dictObject)

Returns:

Nothing.

run(arglist)

Run the test.

Args:

arglist (list): The argument list of the plugin.

Returns:
dict: The plugin result (status and output) on success,

or an empty dict in case of any error.

_assertpriv()

Raise an exception if the plugin needs root privileges.

Only if program is not executing as root.

Args:

None

Returns:

Nothing

_setid()

Set the Unique Test ID.

The ID is the plugin class name in lowercase.

Args:

None

Returns:

Nothing

_logstatus()

Handle the log status.

Args:

None

Returns:

Nothing

class expliot.plugins.mqtt.mqttauth.TLog

Representation of a Test Log.

Logger class for logging test case output. By default log to sys.stdout Must not instantiate. Use class methods. The logger needs to be initialized with the output file using init() class method

SUCCESS = 0
FAIL = 1
TRYDO = 2
GENERIC = 3
_prefix = ['[+]', '[-]', '[?]', '[*]']
_errprefix = '[.]'
_file
classmethod init(file=None)

Initialize the file object.

This method should be called in the beginning of the application to open the log output file.

Parameters:

file – The file where to log the test output

Returns:

classmethod close()

Close the file object if it is not sys.stdout.

Returns:

classmethod print(prefixtype, message)

The actual print methods.

Write the formatted message to the _file file.

Args:

prefixtype(int): the prefix type to be used for the message (defined above) message(str): The actual message from the Test object

Returns:

Nothing

classmethod success(message)

Write a message with success prefix to the file.

Parameters:

message – The message to be written

Returns:

classmethod fail(message)

Write a message with fail prefix to the file.

Parameters:

message – The message to be written

Returns:

classmethod trydo(message)

Write a message with try prefix to the file.

Parameters:

message – The message to be written

Returns:

void

classmethod generic(message)

Write a message with success prefix to the file.

Parameters:

message – The message to be written

Returns:

void

class expliot.plugins.mqtt.mqttauth.TTarget(name, version, vendor)

Bases: namedtuple('TTarget', 'name, version, vendor')

Representation of Test Target class.

Class that hold details about the target of the test. It is a namedtuple and holds the below details:

  1. name - Target/product name

  2. version - Version of the product

  3. vendor - Vendor that owns the product

Please note, in case it is a generic test case that can be used for multiple products use Target.GENERIC for all attributes.

GENERIC = 'generic'
LINUX = 'linux'
AWS = 'aws'
_name
AMAZON = 'amazon'
_vendor
expliot.plugins.mqtt.mqttauth.MQTT_REFERENCE = 'http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html'
class expliot.plugins.mqtt.mqttauth.MqttAuth

Bases: expliot.core.tests.test.Test

Test the authentication of a MQTT broker.

Output Format: If the auth is successful i.e. correct password found, then it’s details are present in the output. If the auth fails for all passwords from the –pfile (or single password from –passwd), then the Test fails and output is empty as for any other Test failure case. [

{

“user”: “foouser”, “password”: “foopass”, “reason_code”: 0, reason_code_str”: “Connection Accepted.”

}

]

execute()

Execute the test.