Command - MQTT Topics

Command and control is the operation of sending a message to a device requesting it to perform some action. Optionally, the device can acknowledge that is has succeeded or failed to complete the action.

This implementation focuses on the use of MQTT topics for performing request and response patterns (e.g. command and control). Please refer to the Designing MQTT Topic for AWS IoT Core, specifically the Using the MQTT Topics for Commands section. This white paper provides alternative topic patterns that go beyond the scope of this implementation.

Use Cases

  • Near real-time action taken when command issued to a device
    • I want the light to turn on when I press a button from my mobile app
  • Interact with a peripheral, actuator, or operation of a device
    • I want to turn the LED on or off on my device
    • I want to remotely restart the device
  • Request device data or state
    • I want the device to publish the latest log file to a topic
  • Update device operating or saving configuration
    • I want to increase the current frequency of telemetry emitted on my device
    • I want to change my devices saved configuration that will take effect on next restart

Reference Architecture

Command and control via MQTT topics

  • AWS IoT Core is the MQTT message broker processing messages on behalf of the clients
  • Device is the IoT thing to be controlled
  • Application is the remote logic that issues commands
  1. The Device establishes an MQTT connection to the AWS IoT Core endpoint, and then subscribes to the cmd/device1/req (request) topic. This is the topic where incoming messages will be received and processed.
  2. The Application establishes an MQTT connection to the AWS IoT Core endpoint, and then subscribes to the cmd/device1/resp (response) topic. This is the topic where the acknowledgement messages from the device will be received.
  3. To send a command, the Application publishes a message on the cmd/device1/req topic, and the device receives the message on its subscription to that topic and take some action.
  4. (Optional) Once the command has been processed, the device then publishes the result of the action onto the cmd/device1/resp topic. The Application receives the response message and reconciles the outstanding action.
DeviceMQTT BrokerApplicationConnect and subscribeconnect(iot_endpoint)subscribe("cmd/device1/req")connect(iot_endpoint)subscribe("cmd/device1/resp")Command operation and (optional) responsepublish("cmd/device1/req","light: on", "id: 1111")publish("cmd/device1/req","light: on", "id: 1111")Turn on lightpublish("cmd/device1/resp","success", id: "1111")publish("cmd/device1/resp","success", id: "1111")reconcile("id: 1111")

Assumptions

This implementation approach assumes the Device is connected at all times, subscribed to a topic for incoming commands, and can receive and process the command. It also assumes that the Device can notify the sending Application that the command was received and processed if needed. Finally, it assumed that all three participants successfully receive, forward, and act up the message flows in either direction.

Implementation

Both the Application and the Device use similar approaches to connecting, sending, and receiving messages. The code samples below are complete for each participant.

The code samples focus on the command design in general. Please refer to the Getting started with AWS IoT Core for details on creating things, certificates, and obtaining your endpoint. The code samples below are used to demonstrate the basic capability of the Command pattern.

Device

The Device code focuses on connecting to Broker and then subscribing to a topic to receive commands. Upon receipt of a command, the Device performs some local action and then publishes the outcome (success, failure) of the command back to the Application, which then can reconcile the command.

The Device will continue to receive and respond to commands until stopped.

Please refer to this pubsub sample for more details.

  • Install SDK from PyPI: python3 -m pip install awsiotsdk
  • Replace the global variables with valid endpoint, clientId, and credentials
  • Start in a separate terminal session before running the Application: python3 client.py
# client.py - Demonstrates waiting for a command to be evaluated and processed
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
import sys
import time


# This sample uses the Message Broker for AWS IoT to send and receive messages
# through an MQTT connection. On startup, the device connects to the server,
# subscribes to a request topic, invokes a callback upon receipt of a message, and
# then responds on the response topic back to the calling application.

io.init_logging(getattr(io.LogLevel, "Info"), "stderr")

# Using globals to simplify sample code
client_id = "device1"
endpoint = "REPLACE_WITH_YOUR_ENDPOINT_FQDN"
client_certificate = "PATH_TO_CLIENT_CERTIFICATE_FILE"
client_private_key = "PATH_TO_CLIENT_PRIVATE_KEY_FILE"
root_ca = "PATH_TO_ROOT_CA_CERTIFICATE_FILE"


# Topic to subscribe for incoming commands
request_topic = "cmd/device1/req"
# Topic to send result of command
response_topic = "cmd/device1/resp"


# Callback's are the main method to asynchronously process MQTT events
# using the device SDKs.

# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
    print(f"Connection interrupted. error: {error}")


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print(
        f"Connection resumed. return_code: {return_code} session_present: {session_present}"
    )

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print(f"Resubscribe results: {resubscribe_results}")

    for topic, qos in resubscribe_results["topics"]:
        if qos is None:
            sys.exit(f"Server rejected resubscribe to topic: {topic}")


# Callback when the subscribed topic receives a command message
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    global response_topic
    print(f"Received command from topic '{topic}': {payload}")

    # Action to perform on receiving a command, assert True for this example
    #################################################
    ############### COMMAND CODE HERE ###############
    #################################################
    # result = do_something(payload)
    result = True

    if result:
        message = f"SUCCESS: command ran successfully from payload: {payload}"
    else:
        message = f"FAILURE: command did not run successfully from payload: {payload}"
    print(f"Publishing message to topic '{response_topic}': {message}")
    mqtt_connection.publish(
        topic=response_topic, payload=message, qos=mqtt.QoS.AT_LEAST_ONCE
    )


if __name__ == "__main__":
    # Create SDK-based resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    # Create native MQTT connection from credentials on path (filesystem)
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        cert_filepath=client_certificate,
        pri_key_filepath=client_private_key,
        client_bootstrap=client_bootstrap,
        ca_filepath=root_ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=client_id,
        clean_session=False,
        keep_alive_secs=6,
    )

    print(f"Connecting to {endpoint} with client ID '{client_id}'...")

    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available
    connect_future.result()
    print("Connected!")

    # Subscribe
    print(f"Subscribing to topic '{request_topic}'...")
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=request_topic, qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_message_received
    )

    subscribe_result = subscribe_future.result()
    print(f"Subscribed with {str(subscribe_result['qos'])}")

    # All logic happens in the on_message_received() callback, loop until
    # program stopped (e.g., CTRL+C)
    print(f"Listening for commands on topic: {request_topic}")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        # Disconnect
        print("Disconnecting...")
        disconnect_future = mqtt_connection.disconnect()
        disconnect_future.result()
        print("Disconnected!")

Application

The Application code connects to the Broker and subscribes to the Device’s response topic. It then waits for a command to be entered and sends that to the Device’s request topic.

Please refer to this pubsub sample for more details.

  • Install SDK from PyPI: python3 -m pip install awsiotsdk
  • Replace the global variables with valid endpoint, clientId, and credentials
  • Start in a separate terminal session after the Device code is running: python3 application.py
# application.py - Demonstrates sending commands to a device
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
import sys
import time


# This sample uses the Message Broker for AWS IoT to send and receive messages
# through an MQTT connection. On startup, the app connects to the server,
# subscribes to a response topic which is invoked via a callback upon receipt of a message.
# Commands are sent

io.init_logging(getattr(io.LogLevel, "Info"), "stderr")

# Using globals to simplify sample code
client_id = "app1"
endpoint = "REPLACE_WITH_YOUR_ENDPOINT_FQDN"
client_certificate = "PATH_TO_CLIENT_CERTIFICATE_FILE"
client_private_key = "PATH_TO_CLIENT_PRIVATE_KEY_FILE"
root_ca = "PATH_TO_ROOT_CA_CERTIFICATE_FILE"

# Topics for the test target device
target_device = "device1"
# Topic to send a command to the device
request_topic = f"cmd/{target_device}/req"
# Topic to subscribe for command responses
response_topic = f"cmd/{target_device}/resp"


# Callback's are the main method to asynchronously process MQTT events
# using the device SDKs.

# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
    print(f"Connection interrupted. error: {error}")


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print(
        f"Connection resumed. return_code: {return_code} session_present: {session_present}"
    )

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print(f"Resubscribe results: {resubscribe_results}")

    for topic, qos in resubscribe_results["topics"]:
        if qos is None:
            sys.exit(f"Server rejected resubscribe to topic: {topic}")


# Callback when the subscribed topic receives a command message
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    global response_topic, target_device

    print(f"Received command response on topic '{topic}' with payload: {payload}")

    # Action to perform on device completing the command
    #################################################
    ############### COMMAND CODE HERE ###############
    #################################################
    # result = reconcile_command(payload)


if __name__ == "__main__":
    # Create SDK-based resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    # Create native MQTT connection from credentials on path (filesystem)
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        cert_filepath=client_certificate,
        pri_key_filepath=client_private_key,
        client_bootstrap=client_bootstrap,
        ca_filepath=root_ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=client_id,
        clean_session=False,
        keep_alive_secs=6,
    )

    print(f"Connecting to {endpoint} with client ID '{client_id}'...")

    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available
    connect_future.result()
    print("Connected!")

    # Subscribe to the device response topic
    print(f"Subscribing to topic '{response_topic}'...")
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=response_topic, qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_message_received
    )

    subscribe_result = subscribe_future.result()
    print(f"Subscribed with {str(subscribe_result['qos'])}")

    # All logic happens in the on_message_received() callback, loop until
    # program stopped (e.g., CTRL+C)
    try:
        while True:
            command = input(f"Enter command to send to {target_device}: ")
            print(f"Publishing command to topic '{request_topic}': {command}")
            mqtt_connection.publish(
                topic=request_topic, payload=command, qos=mqtt.QoS.AT_LEAST_ONCE
            )
            # Wait for response text before requesting next input
            time.sleep(2)

    except KeyboardInterrupt:
        # Disconnect
        print("Disconnecting...")
        disconnect_future = mqtt_connection.disconnect()
        disconnect_future.result()
        print("Disconnected!")

Considerations

This implementation covers the basics of a command and control pattern. It does not cover certain aspects that may arise in production use.

Duplicate, or Out-of-Order Messages

In normal operation, MQTT command messages may be lost between the application and the broker, or between the broker and the device. This can be caused by QoS settings, retransmissions or retries from the publishers or broker, or other internal application errors such as a blocked thread.

For duplicate or out-of-order message, a command should nominally only be processed once, the inclusion of a unique transaction id (TID) (see Command pattern examples) in both the request and response provides the application to resolve the message. For duplicate messages, the device can keep a list of recently processed messages and only act on new TIDs.

For out-of-order messages, the TID can encompass either an incrementing number or sequence, and the device can track the order of messages received. Another approach could be to use a timestamp for a TID where the device tracks the last processed message and if it receives one with a timestamp earlier, can discard or otherwise act on the message out of order.

When dealing with command messages that need to be processed in a certain order (e.g., A, then B, then C), both the device and application should agree on a state of commands and respond accordingly if they come out of order. For instance, if a device receives command A (valid) and then command C (invalid as the next command should have been B), it should report back a failure and notify the calling application of it’s current state.

Lost Messages or Device Disconnected During Delivery

Another consideration is how a command message should be tracked and processed if it is never delivered. It is common for IoT devices to be offline to the broker for periods of time. This could be due to the device being turned off, local or widespread ISP outages, or due to intermittent network issues. Also, even with the use of a QoS of 1, there is still the potential for message loss.

The application should have logic to track outstanding commands, such as a timer to retransmit the command if it does not receive a response in a certain period of time with retry logic. In the event the device does not respond within a set period of time, the application can raise an error for device connectivity.

From a device perspective, if it is disconnected during the processing of command or series of commands, the device should publish an error once it reconnects. This can give the application making the request context for how to proceed.

This implementation is focused on a single application and device interaction. Some use cases may have a single command affecting multiple devices. For instance, an application may request all warning lights to turn on a factory floor in the event of an emergency. Or, it could be requesting a fleet of devices to report back their current configuration and status.

For a small group of devices, an approach could be for the application to send an individual command to each device and reconcile the response.

However, when the target device group is large, this would require the application to send 1 per-device. In this case, devices could subscribe to a common topic for fan-out types of messages, and the application only needs to send out a single message and reconcile the individual device responses.

For instance, device1, device2, and device3, can all subscribe to the topic device_status/request. When the application publishes a message on that topic, each device will receive a copy of the message, can act upon it, and each can the publish their response to a common device_status/response topic. The application or AWS IoT Rules Engine can then reconcile the responses. This pattern is covered in the Designing MQTT Topics for AWS IoT Core whitepaper.