Fan-in is a many-to-one communication pattern for consuming telemetry data from many devices through a single data processing channel.
This implementation focuses on the use of an AWS IoT Rule Action to put telemetry data onto Amazon Kinesis Data Firehose stream. The stream is a consolidation of data for multiple devices in a single plant. The stream invokes a Lambda function to enrich and transform telemetry message payloads, then delivers that data to an S3 bucket for storage and future analysis. Please refer to the MQTT Communication Patterns, specifically the Fan-in section. This whitepaper provides alternative topic patterns that go beyond the scope of this implementation.
dt/plant1/dev_n/temp
(data telemetry) topic. This is a location and device specific topic to deliver telemetry messages for a given device or sensor.dt/plant1/+/temp
from the AWS IoT Core to consolidate messages across devices for plant1 and PUTs those messages onto a Amazon Kinesis Data Firehose stream.To experiment quickly, you can test this pattern out by publishing messages with the MQTT test client in the AWS IoT console or using the IoT Device Simulator. In a real world implementation you’ll configure multiple devices as AWS IoT Things that each securely communicate with your AWS IoT Core endpoint.
The configuration and code samples focus on the fan-in design in general. Please refer to the Getting started with AWS IoT Core for details on creating things, certificates, obtaining your endpoint, and publishing telemetry to your endpoint. The configuration and code samples below are used to demonstrate the basic capability of the Fan-in pattern. Refer to the AWS Lambda and Amazon Kinesis Data Firehose Developer Guides for more in depth discussion of these services.
This implementation approach assumes all Devices are not connected to the internet or AWS Iot Core at all times. Each Device publishes temperature telemetry to a single topic. The implementation also assumes that all temperature readings are emitted with a sensor name value of Temperature Celsius or Temperature Fahrenheit and follow the message payload formats outlined below. Alternative options are also called out within the sections below.
Once connected to AWS IoT Core, devices will transmit telemetry data to plant and device specific MQTT topics. The below example demonstrates MQTT topics and payloads for device1 and device2 in plant1. Your implementation might support hundreds, thousands, or millions of devices. Copy and paste the below topic names and message payloads into the subscribe and publish inputs of the MQTT test client to simulate the device traffic we will fan-in. You can subscribe to each topic to view the messages device specific as you publish them or you can subscribe to the wildcard topic dt/plant1/+/temp
to see messages in aggregate.
MQTT Topic name for the temperature of device1 in plant1.
dt/plant1/dev_1/temp
Temperature in Celsius telemetry JSON payload for device1 in plant1.
{
"timestamp": 1601048303,
"sensorId": 17,
"deviceSerial": "sd89w7e82349",
"sensorData": [
{
"sensorName": "Temperature Celsius",
"sensorValue": 34.2211224
}
]
}
MQTT Topic name for the temperature of device2 in plant1.
dt/plant1/dev_2/temp
Temperature in Fahrenheit telemetry JSON payload for device2 in plant1.
{
"timestamp": 1601048303,
"sensorId": 4,
"deviceSerial": "324l5;k;a3",
"sensorData": [
{
"sensorName": "Temperature Fahrenheit",
"sensorValue": 120.3806
}
]
}
In the AWS Lambda console create a new Lambda function. To use the code below, choose Author from scratch
with a Python 3.8 runtime and paste the code below into the code editor. To create your own function, choose the Use a blueprint
option and select the Blueprint kinesis-firehose-process-record-python
which you’ll then modify per your requirements.
fan-in_device_temperature_converter
Create a new role with basic Lambda permissions
Create Function
Code
view to the Configuration
view and click Edit
on the General Configuration paneSave
(Amazon Kinesis Data Firehose requires at least a 1 minute timeout for a Lambda transformer)import base64
import json
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
print(record['recordId'])
payload = json.loads(base64.b64decode(record['data']).decode('utf-8'))
transformedPayload = {}
transformedPayload['deviceSerial'] = payload['deviceSerial']
transformedPayload['timestamp'] = payload['timestamp']
for data in payload['sensorData']:
if data['sensorName'] == 'Temperature Celsius':
transformedPayload['temperature'] = (data['sensorValue'] * 9/5) + 32
else:
transformedPayload['temperature'] = data['sensorValue']
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(transformedPayload).encode('utf-8'))
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
The messages will be transformed by this function to the format below which is flattened out and has all temperature readings in Fahrenheit after converting from Celsius.
{"deviceSerial": "sd89w7e82349", "timestamp": 1601048303, "temperature": 93.59802032}
{"deviceSerial": "324l5;k;a3", "timestamp": 1601048303, "temperature": 120.3806}
In addition to using Lambda from Amazon Kinesis Data Firehose, you can also leverage AWS IoT Analytics to achieve Fan-in processing of messages with transformations and enriching behaviors.
Use the AWS Console or the AWS CLI to create a new S3 bucket as a destination for the Kinesis Firehose Delivery Stream to land plant1 device data in. Replace <AccountId
> with your AWS Account Id.
CLI Command
aws s3 mb s3://fan-in-telemetry-<AccountId>
Amazon Kinesis Data Firehose Destinations can also include Amazon Redshift, Amazon ElasticSearch, HTTP Endpoints and Third-party service providers.
In the AWS Console for Amazon Kinesis Data Firehose
Create Delivery Stream
fan-in_device_temperature_stream
Direct PUT or other sources
and click Next
Enabled
for Transform source records with AWS Lambda and select the function created above fan-in_device_temperature_converter
$LATEST
Disabled
and click Next
Amazon S3
fan-in-telemetry-<AccountId>
Next
Next
Create delivery screen
Alternatively, your AWS IoT Rule can achieve the Fan-in pattern with actions that send messages to Amazon Kinesis Data Streams where other applications can read and act on buffered telemetry data, Amazon Kinesis Data Analytics to perform analytics processing of data real time with SQL or Apache Flink, SQS message queues where you can asynchronously process messages, or any other of of the supported AWS IoT Rule Actions.
From the AWS IoT Console under the Act menu choose Create a Rule.
FanIn_Device_Temperature
select * from 'dt/plant1/+/temp'
Send a message to an Amazon Kinesis Firehose Stream
Configure Action
and continue with detailsfan-in_device_temperature_stream
IoT_Rule_fan-in_Kinesis_Role
Add Action
Create Rule
This implementation covers the basics of a device telemetry fan-in pattern. It does not cover certain aspects that may arise in production use.
Devices may hold messages for delivery retries if connectivity to the MQTT Broker in AWS IoT Core is lost temporarily. The Amazon Kinesis Data Firehose stream marks events with timestamps on the approximate arrival of the event to the stream. Devices publishing telemetry messages should include a timestamp in the message that represents the true time the event took place. Further processing of the telemetry data stored in S3 is needed to create a folder structure based on the device event timestamp rather than the Amazon Kinesis Data Firehose event timestamp if you plan to create partitions for ETL and analysis with AWS Glue and Amazon Athena.
The primary consideration around service quotas for the Fan-in is outlined in the MQTT Communication Patterns section of the Designing MQTT Topics for AWS IoT Core whitepaper. Instead of using the Fan-in pattern over-subscribing multiple devices to a single device topic, use a topic per device and then use the AWS IoT Rules Engine with a wildcard subscription pattern to route to Amazon Kinesis Data Firehose as demonstrated in this implementation. In this implementation you should also be aware of the quotas applicable to each service used your implementation. Be sure to understand if any of these are a concern for your design and if they are adjustable.