Skip to main content

IoT Patterns Skill

IoT Patterns Skill

When to Use This Skill

Use this skill when implementing iot patterns patterns in your codebase.

How to Use This Skill

  1. Review the patterns and examples below
  2. Apply the relevant patterns to your implementation
  3. Follow the best practices outlined in this skill

Comprehensive IoT development patterns for device communication, data ingestion, and cloud integration.

MQTT Communication

Broker Setup (Mosquitto)

# docker-compose.yml
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log

# mosquitto/config/mosquitto.conf
listener 1883
listener 9001
protocol websockets
allow_anonymous false
password_file /mosquitto/config/passwords
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log

Python MQTT Client

# mqtt/client.py
import paho.mqtt.client as mqtt
import json
import ssl
from dataclasses import dataclass
from typing import Callable, Optional

@dataclass
class MQTTConfig:
broker: str
port: int = 1883
username: Optional[str] = None
password: Optional[str] = None
client_id: Optional[str] = None
use_tls: bool = False
ca_cert: Optional[str] = None

class IoTClient:
def __init__(self, config: MQTTConfig):
self.config = config
self.client = mqtt.Client(
client_id=config.client_id,
protocol=mqtt.MQTTv5
)
self._setup_client()

def _setup_client(self):
if self.config.username:
self.client.username_pw_set(
self.config.username,
self.config.password
)

if self.config.use_tls:
self.client.tls_set(
ca_certs=self.config.ca_cert,
tls_version=ssl.PROTOCOL_TLS
)

self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect

def _on_connect(self, client, userdata, flags, rc, properties=None):
if rc == 0:
print("Connected to MQTT broker")
else:
print(f"Connection failed with code {rc}")

def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
self.handle_message(msg.topic, payload)
except json.JSONDecodeError:
print(f"Invalid JSON on topic {msg.topic}")

def _on_disconnect(self, client, userdata, rc):
print(f"Disconnected with code {rc}")

def connect(self):
self.client.connect(
self.config.broker,
self.config.port,
keepalive=60
)
self.client.loop_start()

def disconnect(self):
self.client.loop_stop()
self.client.disconnect()

def subscribe(self, topic: str, qos: int = 1):
self.client.subscribe(topic, qos)

def publish(self, topic: str, payload: dict, qos: int = 1, retain: bool = False):
message = json.dumps(payload)
self.client.publish(topic, message, qos, retain)

def handle_message(self, topic: str, payload: dict):
"""Override in subclass"""
print(f"Received on {topic}: {payload}")


# Usage
class SensorClient(IoTClient):
def __init__(self, config: MQTTConfig, device_id: str):
super().__init__(config)
self.device_id = device_id

def publish_telemetry(self, data: dict):
topic = f"devices/{self.device_id}/telemetry"
self.publish(topic, {
"device_id": self.device_id,
"timestamp": datetime.utcnow().isoformat(),
"data": data
})

def subscribe_commands(self):
topic = f"devices/{self.device_id}/commands/#"
self.subscribe(topic)

def handle_message(self, topic: str, payload: dict):
if "commands" in topic:
self.execute_command(payload)

def execute_command(self, command: dict):
print(f"Executing command: {command}")

Node.js MQTT Client

// mqtt/client.ts
import mqtt, { MqttClient, IClientOptions } from 'mqtt'

interface DeviceConfig {
broker: string
port: number
deviceId: string
username?: string
password?: string
useTLS?: boolean
}

class IoTDevice {
private client: MqttClient
private deviceId: string

constructor(config: DeviceConfig) {
this.deviceId = config.deviceId

const options: IClientOptions = {
clientId: config.deviceId,
username: config.username,
password: config.password,
protocol: config.useTLS ? 'mqtts' : 'mqtt',
port: config.port,
reconnectPeriod: 5000,
connectTimeout: 30000,
}

this.client = mqtt.connect(`mqtt://${config.broker}`, options)
this.setupHandlers()
}

private setupHandlers() {
this.client.on('connect', () => {
console.log('Connected to MQTT broker')
this.subscribeToCommands()
})

this.client.on('message', (topic, message) => {
const payload = JSON.parse(message.toString())
this.handleMessage(topic, payload)
})

this.client.on('error', (error) => {
console.error('MQTT error:', error)
})
}

private subscribeToCommands() {
const topic = `devices/${this.deviceId}/commands/#`
this.client.subscribe(topic, { qos: 1 })
}

publishTelemetry(data: Record<string, unknown>) {
const topic = `devices/${this.deviceId}/telemetry`
const payload = {
deviceId: this.deviceId,
timestamp: new Date().toISOString(),
data,
}
this.client.publish(topic, JSON.stringify(payload), { qos: 1 })
}

private handleMessage(topic: string, payload: unknown) {
if (topic.includes('commands')) {
this.executeCommand(payload as { action: string; params: unknown })
}
}

private executeCommand(command: { action: string; params: unknown }) {
console.log('Executing command:', command)
}

disconnect() {
this.client.end()
}
}

AWS IoT Core

Device Provisioning

# aws_iot/provisioning.py
import boto3
import json

class DeviceProvisioner:
def __init__(self, region: str = 'us-east-1'):
self.iot = boto3.client('iot', region_name=region)

def create_thing(self, thing_name: str, thing_type: str = None):
"""Create IoT thing with certificate"""
# Create thing
thing = self.iot.create_thing(
thingName=thing_name,
thingTypeName=thing_type,
attributePayload={
'attributes': {
'env': 'production'
}
}
)

# Create certificate
cert = self.iot.create_keys_and_certificate(setAsActive=True)

# Attach certificate to thing
self.iot.attach_thing_principal(
thingName=thing_name,
principal=cert['certificateArn']
)

# Attach policy
self.iot.attach_policy(
policyName='IoTDevicePolicy',
target=cert['certificateArn']
)

return {
'thing_name': thing_name,
'thing_arn': thing['thingArn'],
'certificate_arn': cert['certificateArn'],
'certificate_pem': cert['certificatePem'],
'private_key': cert['keyPair']['PrivateKey'],
'public_key': cert['keyPair']['PublicKey'],
}

def create_policy(self, policy_name: str):
"""Create IoT policy"""
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Connect"
],
"Resource": [
f"arn:aws:iot:*:*:client/${{iot:Connection.Thing.ThingName}}"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Publish"
],
"Resource": [
f"arn:aws:iot:*:*:topic/devices/${{iot:Connection.Thing.ThingName}}/*"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Subscribe"
],
"Resource": [
f"arn:aws:iot:*:*:topicfilter/devices/${{iot:Connection.Thing.ThingName}}/commands/*"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Receive"
],
"Resource": [
f"arn:aws:iot:*:*:topic/devices/${{iot:Connection.Thing.ThingName}}/commands/*"
]
}
]
}

self.iot.create_policy(
policyName=policy_name,
policyDocument=json.dumps(policy_document)
)

IoT Rules Engine

# aws_iot/rules.py
import boto3
import json

def create_telemetry_rule(rule_name: str, kinesis_stream: str):
"""Create IoT rule to route telemetry to Kinesis"""
iot = boto3.client('iot')

sql = "SELECT * FROM 'devices/+/telemetry'"

iot.create_topic_rule(
ruleName=rule_name,
topicRulePayload={
'sql': sql,
'awsIotSqlVersion': '2016-03-23',
'actions': [
{
'kinesis': {
'roleArn': f'arn:aws:iam::ACCOUNT:role/IoTKinesisRole',
'streamName': kinesis_stream,
'partitionKey': '${deviceId}'
}
}
],
'errorAction': {
'cloudwatchLogs': {
'roleArn': f'arn:aws:iam::ACCOUNT:role/IoTLogsRole',
'logGroupName': '/aws/iot/errors'
}
}
}
)

Data Ingestion Pipeline

Time Series Database (InfluxDB)

# ingestion/influxdb.py
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

class TelemetryStore:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.bucket = bucket
self.org = org

def write_telemetry(self, device_id: str, measurements: dict, timestamp=None):
point = Point("device_telemetry") \
.tag("device_id", device_id) \
.time(timestamp or datetime.utcnow())

for key, value in measurements.items():
point.field(key, value)

self.write_api.write(bucket=self.bucket, record=point)

def query_device_data(self, device_id: str, start: str = "-1h"):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start})
|> filter(fn: (r) => r["device_id"] == "{device_id}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
return self.query_api.query_data_frame(query, org=self.org)

def get_aggregated_metrics(self, device_id: str, window: str = "5m"):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -24h)
|> filter(fn: (r) => r["device_id"] == "{device_id}")
|> aggregateWindow(every: {window}, fn: mean)
|> yield(name: "mean")
'''
return self.query_api.query_data_frame(query, org=self.org)

Kafka Ingestion

# ingestion/kafka.py
from confluent_kafka import Consumer, Producer, KafkaError
import json

class TelemetryIngestion:
def __init__(self, bootstrap_servers: str):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'acks': 'all',
'retries': 3,
})

self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'telemetry-processor',
'auto.offset.reset': 'earliest',
})

def produce_telemetry(self, device_id: str, data: dict):
topic = 'device-telemetry'
message = {
'device_id': device_id,
'timestamp': datetime.utcnow().isoformat(),
'data': data,
}

self.producer.produce(
topic,
key=device_id.encode(),
value=json.dumps(message).encode(),
callback=self._delivery_callback
)
self.producer.flush()

def _delivery_callback(self, err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()}')

def consume_telemetry(self, topics: list, handler: callable):
self.consumer.subscribe(topics)

try:
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())

data = json.loads(msg.value().decode())
handler(data)
finally:
self.consumer.close()

Edge Computing

Edge Device (Python)

# edge/device.py
import asyncio
from dataclasses import dataclass
from typing import List, Callable
import json

@dataclass
class SensorReading:
sensor_id: str
value: float
unit: str
timestamp: str

class EdgeDevice:
def __init__(self, device_id: str, mqtt_client):
self.device_id = device_id
self.mqtt = mqtt_client
self.sensors: dict[str, Callable] = {}
self.buffer: List[SensorReading] = []
self.buffer_size = 100
self.flush_interval = 60 # seconds

def register_sensor(self, sensor_id: str, read_fn: Callable):
self.sensors[sensor_id] = read_fn

async def start(self):
await asyncio.gather(
self._sensor_loop(),
self._flush_loop(),
)

async def _sensor_loop(self):
while True:
for sensor_id, read_fn in self.sensors.items():
try:
value = read_fn()
reading = SensorReading(
sensor_id=sensor_id,
value=value,
unit="celsius",
timestamp=datetime.utcnow().isoformat()
)
self._process_reading(reading)
except Exception as e:
print(f"Error reading sensor {sensor_id}: {e}")

await asyncio.sleep(1)

def _process_reading(self, reading: SensorReading):
# Edge processing - anomaly detection
if self._is_anomaly(reading):
self._send_alert(reading)

# Buffer for batch upload
self.buffer.append(reading)

if len(self.buffer) >= self.buffer_size:
self._flush_buffer()

def _is_anomaly(self, reading: SensorReading) -> bool:
# Simple threshold check
return reading.value > 100 or reading.value < -10

def _send_alert(self, reading: SensorReading):
topic = f"devices/{self.device_id}/alerts"
self.mqtt.publish(topic, {
"type": "anomaly",
"sensor_id": reading.sensor_id,
"value": reading.value,
"timestamp": reading.timestamp,
})

async def _flush_loop(self):
while True:
await asyncio.sleep(self.flush_interval)
self._flush_buffer()

def _flush_buffer(self):
if not self.buffer:
return

topic = f"devices/{self.device_id}/telemetry/batch"
self.mqtt.publish(topic, {
"device_id": self.device_id,
"readings": [vars(r) for r in self.buffer],
"count": len(self.buffer),
})
self.buffer.clear()

Device Management

Fleet Management

# management/fleet.py
from dataclasses import dataclass
from enum import Enum
from typing import Optional

class DeviceStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
MAINTENANCE = "maintenance"
ERROR = "error"

@dataclass
class Device:
device_id: str
name: str
type: str
status: DeviceStatus
firmware_version: str
last_seen: Optional[datetime] = None
metadata: dict = None

class FleetManager:
def __init__(self, db):
self.db = db

async def register_device(self, device: Device):
await self.db.devices.insert_one(vars(device))

async def update_status(self, device_id: str, status: DeviceStatus):
await self.db.devices.update_one(
{"device_id": device_id},
{"$set": {"status": status.value, "last_seen": datetime.utcnow()}}
)

async def get_devices_by_status(self, status: DeviceStatus):
cursor = self.db.devices.find({"status": status.value})
return await cursor.to_list(length=None)

async def schedule_firmware_update(self, device_ids: list, version: str):
for device_id in device_ids:
await self.send_command(device_id, {
"action": "firmware_update",
"params": {"version": version}
})

Usage Examples

Setup MQTT Communication

Apply iot-patterns skill to implement MQTT client with TLS and automatic reconnection

Configure AWS IoT Core

Apply iot-patterns skill to provision devices with AWS IoT Core and configure rules engine

Implement Edge Processing

Apply iot-patterns skill to create edge device with local anomaly detection and batch uploads

Success Output

When successful, this skill MUST output:

✅ SKILL COMPLETE: iot-patterns

Completed:
- [x] MQTT broker configured (Mosquitto or AWS IoT Core)
- [x] Device client implemented with TLS and authentication
- [x] Telemetry publishing functional
- [x] Command subscription established
- [x] Data ingestion pipeline configured
- [x] Edge processing logic deployed (if applicable)
- [x] Device fleet management initialized

Outputs:
- MQTT client configuration: {broker_url}:{port}
- Device credentials: certificates and keys (secured)
- IoT topic structure: devices/{device_id}/{telemetry|commands}
- Data pipeline: {MQTT → Kinesis → InfluxDB/Timestream}
- Edge device code: device.py with anomaly detection
- Fleet management API: device registration and updates

Connection Test:
- MQTT connection: ESTABLISHED
- Telemetry publish: SUCCESS (last message: {timestamp})
- Command subscription: ACTIVE (topic: devices/+/commands/#)
- Data flow: {messages_per_second} msg/sec

Next Steps:
- Monitor device health via fleet management dashboard
- Configure alerting rules for anomaly detection
- Scale device provisioning for production deployment

Completion Checklist

Before marking this skill as complete, verify:

  • MQTT broker running and accessible (check with mosquitto_sub -t '#')
  • Device client connects successfully with TLS
  • Telemetry messages published to correct topic
  • Command messages received by device
  • Data ingestion pipeline processing messages (check Kinesis/Kafka)
  • InfluxDB/Timestream storing telemetry data
  • Edge device runs anomaly detection locally
  • Fleet management API registers devices
  • Device credentials stored securely (not hardcoded)
  • No authentication or connection errors in logs

Failure Indicators

This skill has FAILED if:

  • ❌ MQTT broker not running or not accessible
  • ❌ TLS handshake fails (certificate errors)
  • ❌ Device authentication fails (invalid credentials)
  • ❌ Telemetry messages not reaching broker
  • ❌ Command messages not received by device
  • ❌ Data pipeline not processing messages (Kinesis/Kafka down)
  • ❌ InfluxDB/Timestream connection errors
  • ❌ Edge anomaly detection not triggering alerts
  • ❌ Fleet management API returns errors
  • ❌ Device credentials exposed in code or logs

When NOT to Use

Do NOT use this skill when:

  • Building simple HTTP-based APIs (not IoT telemetry) - use REST patterns instead
  • Device data doesn't require real-time streaming (use batch uploads)
  • No edge computing needed (use cloud-only processing)
  • Working with single device for testing (use simpler MQTT client)
  • Data volume is low (<1K messages/day) - overhead not justified
  • Need video/audio streaming (use WebRTC or RTSP instead)
  • Bluetooth/BLE local communication only (no cloud integration)
  • Legacy device communication protocols (Modbus, OPC UA) - use protocol-specific clients

Use alternative skills:

  • api-design-patterns - For HTTP REST APIs
  • streaming-patterns - For Kafka/event streaming without IoT specifics
  • edge-computing-patterns - For advanced edge ML deployment

Anti-Patterns (Avoid)

Anti-PatternProblemSolution
Hardcoded credentials in device codeSecurity vulnerabilityUse device provisioning with certificate-based auth
Not using TLS for MQTTUnencrypted telemetry dataAlways enable TLS with valid certificates
Publishing high-frequency data without batchingNetwork congestion, high costsBatch telemetry at edge, publish every 30-60 seconds
No edge processing/filteringSending all data to cloudFilter and aggregate at edge, send summaries
Synchronous telemetry publishingDevice hangs on network issuesUse async publishing with local queuing
Single MQTT broker (no HA)Single point of failureUse clustered Mosquitto or AWS IoT Core
Not handling disconnectionsData loss during offline periodsImplement local buffering and replay
Ignoring device lifecycleNo decommissioning processImplement device registration, update, deactivation flow

Principles

This skill embodies:

  • #2 Full Automation - Automated device provisioning and fleet management
  • #3 Search Before Create - Reuse MQTT patterns, AWS IoT Core services
  • #5 Eliminate Ambiguity - Explicit topic structure (devices/{id}/{telemetry|commands})
  • #6 Clear, Understandable, Explainable - Well-documented IoT architecture
  • #9 Security First - TLS encryption, certificate-based authentication
  • #11 Don't Repeat Yourself (DRY) - Shared MQTT client patterns across devices

Full Principles: CODITECT-STANDARD-AUTOMATION.md