IoT Patterns Skill
IoT Patterns Skill
When to Use This Skill
Use this skill when implementing iot patterns patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
Comprehensive IoT development patterns for device communication, data ingestion, and cloud integration.
MQTT Communication
Broker Setup (Mosquitto)
# docker-compose.yml
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
# mosquitto/config/mosquitto.conf
listener 1883
listener 9001
protocol websockets
allow_anonymous false
password_file /mosquitto/config/passwords
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
Python MQTT Client
# mqtt/client.py
import paho.mqtt.client as mqtt
import json
import ssl
from dataclasses import dataclass
from typing import Callable, Optional
@dataclass
class MQTTConfig:
broker: str
port: int = 1883
username: Optional[str] = None
password: Optional[str] = None
client_id: Optional[str] = None
use_tls: bool = False
ca_cert: Optional[str] = None
class IoTClient:
def __init__(self, config: MQTTConfig):
self.config = config
self.client = mqtt.Client(
client_id=config.client_id,
protocol=mqtt.MQTTv5
)
self._setup_client()
def _setup_client(self):
if self.config.username:
self.client.username_pw_set(
self.config.username,
self.config.password
)
if self.config.use_tls:
self.client.tls_set(
ca_certs=self.config.ca_cert,
tls_version=ssl.PROTOCOL_TLS
)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
def _on_connect(self, client, userdata, flags, rc, properties=None):
if rc == 0:
print("Connected to MQTT broker")
else:
print(f"Connection failed with code {rc}")
def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
self.handle_message(msg.topic, payload)
except json.JSONDecodeError:
print(f"Invalid JSON on topic {msg.topic}")
def _on_disconnect(self, client, userdata, rc):
print(f"Disconnected with code {rc}")
def connect(self):
self.client.connect(
self.config.broker,
self.config.port,
keepalive=60
)
self.client.loop_start()
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
def subscribe(self, topic: str, qos: int = 1):
self.client.subscribe(topic, qos)
def publish(self, topic: str, payload: dict, qos: int = 1, retain: bool = False):
message = json.dumps(payload)
self.client.publish(topic, message, qos, retain)
def handle_message(self, topic: str, payload: dict):
"""Override in subclass"""
print(f"Received on {topic}: {payload}")
# Usage
class SensorClient(IoTClient):
def __init__(self, config: MQTTConfig, device_id: str):
super().__init__(config)
self.device_id = device_id
def publish_telemetry(self, data: dict):
topic = f"devices/{self.device_id}/telemetry"
self.publish(topic, {
"device_id": self.device_id,
"timestamp": datetime.utcnow().isoformat(),
"data": data
})
def subscribe_commands(self):
topic = f"devices/{self.device_id}/commands/#"
self.subscribe(topic)
def handle_message(self, topic: str, payload: dict):
if "commands" in topic:
self.execute_command(payload)
def execute_command(self, command: dict):
print(f"Executing command: {command}")
Node.js MQTT Client
// mqtt/client.ts
import mqtt, { MqttClient, IClientOptions } from 'mqtt'
interface DeviceConfig {
broker: string
port: number
deviceId: string
username?: string
password?: string
useTLS?: boolean
}
class IoTDevice {
private client: MqttClient
private deviceId: string
constructor(config: DeviceConfig) {
this.deviceId = config.deviceId
const options: IClientOptions = {
clientId: config.deviceId,
username: config.username,
password: config.password,
protocol: config.useTLS ? 'mqtts' : 'mqtt',
port: config.port,
reconnectPeriod: 5000,
connectTimeout: 30000,
}
this.client = mqtt.connect(`mqtt://${config.broker}`, options)
this.setupHandlers()
}
private setupHandlers() {
this.client.on('connect', () => {
console.log('Connected to MQTT broker')
this.subscribeToCommands()
})
this.client.on('message', (topic, message) => {
const payload = JSON.parse(message.toString())
this.handleMessage(topic, payload)
})
this.client.on('error', (error) => {
console.error('MQTT error:', error)
})
}
private subscribeToCommands() {
const topic = `devices/${this.deviceId}/commands/#`
this.client.subscribe(topic, { qos: 1 })
}
publishTelemetry(data: Record<string, unknown>) {
const topic = `devices/${this.deviceId}/telemetry`
const payload = {
deviceId: this.deviceId,
timestamp: new Date().toISOString(),
data,
}
this.client.publish(topic, JSON.stringify(payload), { qos: 1 })
}
private handleMessage(topic: string, payload: unknown) {
if (topic.includes('commands')) {
this.executeCommand(payload as { action: string; params: unknown })
}
}
private executeCommand(command: { action: string; params: unknown }) {
console.log('Executing command:', command)
}
disconnect() {
this.client.end()
}
}
AWS IoT Core
Device Provisioning
# aws_iot/provisioning.py
import boto3
import json
class DeviceProvisioner:
def __init__(self, region: str = 'us-east-1'):
self.iot = boto3.client('iot', region_name=region)
def create_thing(self, thing_name: str, thing_type: str = None):
"""Create IoT thing with certificate"""
# Create thing
thing = self.iot.create_thing(
thingName=thing_name,
thingTypeName=thing_type,
attributePayload={
'attributes': {
'env': 'production'
}
}
)
# Create certificate
cert = self.iot.create_keys_and_certificate(setAsActive=True)
# Attach certificate to thing
self.iot.attach_thing_principal(
thingName=thing_name,
principal=cert['certificateArn']
)
# Attach policy
self.iot.attach_policy(
policyName='IoTDevicePolicy',
target=cert['certificateArn']
)
return {
'thing_name': thing_name,
'thing_arn': thing['thingArn'],
'certificate_arn': cert['certificateArn'],
'certificate_pem': cert['certificatePem'],
'private_key': cert['keyPair']['PrivateKey'],
'public_key': cert['keyPair']['PublicKey'],
}
def create_policy(self, policy_name: str):
"""Create IoT policy"""
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Connect"
],
"Resource": [
f"arn:aws:iot:*:*:client/${{iot:Connection.Thing.ThingName}}"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Publish"
],
"Resource": [
f"arn:aws:iot:*:*:topic/devices/${{iot:Connection.Thing.ThingName}}/*"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Subscribe"
],
"Resource": [
f"arn:aws:iot:*:*:topicfilter/devices/${{iot:Connection.Thing.ThingName}}/commands/*"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Receive"
],
"Resource": [
f"arn:aws:iot:*:*:topic/devices/${{iot:Connection.Thing.ThingName}}/commands/*"
]
}
]
}
self.iot.create_policy(
policyName=policy_name,
policyDocument=json.dumps(policy_document)
)
IoT Rules Engine
# aws_iot/rules.py
import boto3
import json
def create_telemetry_rule(rule_name: str, kinesis_stream: str):
"""Create IoT rule to route telemetry to Kinesis"""
iot = boto3.client('iot')
sql = "SELECT * FROM 'devices/+/telemetry'"
iot.create_topic_rule(
ruleName=rule_name,
topicRulePayload={
'sql': sql,
'awsIotSqlVersion': '2016-03-23',
'actions': [
{
'kinesis': {
'roleArn': f'arn:aws:iam::ACCOUNT:role/IoTKinesisRole',
'streamName': kinesis_stream,
'partitionKey': '${deviceId}'
}
}
],
'errorAction': {
'cloudwatchLogs': {
'roleArn': f'arn:aws:iam::ACCOUNT:role/IoTLogsRole',
'logGroupName': '/aws/iot/errors'
}
}
}
)
Data Ingestion Pipeline
Time Series Database (InfluxDB)
# ingestion/influxdb.py
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
class TelemetryStore:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.bucket = bucket
self.org = org
def write_telemetry(self, device_id: str, measurements: dict, timestamp=None):
point = Point("device_telemetry") \
.tag("device_id", device_id) \
.time(timestamp or datetime.utcnow())
for key, value in measurements.items():
point.field(key, value)
self.write_api.write(bucket=self.bucket, record=point)
def query_device_data(self, device_id: str, start: str = "-1h"):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start})
|> filter(fn: (r) => r["device_id"] == "{device_id}")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
return self.query_api.query_data_frame(query, org=self.org)
def get_aggregated_metrics(self, device_id: str, window: str = "5m"):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -24h)
|> filter(fn: (r) => r["device_id"] == "{device_id}")
|> aggregateWindow(every: {window}, fn: mean)
|> yield(name: "mean")
'''
return self.query_api.query_data_frame(query, org=self.org)
Kafka Ingestion
# ingestion/kafka.py
from confluent_kafka import Consumer, Producer, KafkaError
import json
class TelemetryIngestion:
def __init__(self, bootstrap_servers: str):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'acks': 'all',
'retries': 3,
})
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'telemetry-processor',
'auto.offset.reset': 'earliest',
})
def produce_telemetry(self, device_id: str, data: dict):
topic = 'device-telemetry'
message = {
'device_id': device_id,
'timestamp': datetime.utcnow().isoformat(),
'data': data,
}
self.producer.produce(
topic,
key=device_id.encode(),
value=json.dumps(message).encode(),
callback=self._delivery_callback
)
self.producer.flush()
def _delivery_callback(self, err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()}')
def consume_telemetry(self, topics: list, handler: callable):
self.consumer.subscribe(topics)
try:
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
data = json.loads(msg.value().decode())
handler(data)
finally:
self.consumer.close()
Edge Computing
Edge Device (Python)
# edge/device.py
import asyncio
from dataclasses import dataclass
from typing import List, Callable
import json
@dataclass
class SensorReading:
sensor_id: str
value: float
unit: str
timestamp: str
class EdgeDevice:
def __init__(self, device_id: str, mqtt_client):
self.device_id = device_id
self.mqtt = mqtt_client
self.sensors: dict[str, Callable] = {}
self.buffer: List[SensorReading] = []
self.buffer_size = 100
self.flush_interval = 60 # seconds
def register_sensor(self, sensor_id: str, read_fn: Callable):
self.sensors[sensor_id] = read_fn
async def start(self):
await asyncio.gather(
self._sensor_loop(),
self._flush_loop(),
)
async def _sensor_loop(self):
while True:
for sensor_id, read_fn in self.sensors.items():
try:
value = read_fn()
reading = SensorReading(
sensor_id=sensor_id,
value=value,
unit="celsius",
timestamp=datetime.utcnow().isoformat()
)
self._process_reading(reading)
except Exception as e:
print(f"Error reading sensor {sensor_id}: {e}")
await asyncio.sleep(1)
def _process_reading(self, reading: SensorReading):
# Edge processing - anomaly detection
if self._is_anomaly(reading):
self._send_alert(reading)
# Buffer for batch upload
self.buffer.append(reading)
if len(self.buffer) >= self.buffer_size:
self._flush_buffer()
def _is_anomaly(self, reading: SensorReading) -> bool:
# Simple threshold check
return reading.value > 100 or reading.value < -10
def _send_alert(self, reading: SensorReading):
topic = f"devices/{self.device_id}/alerts"
self.mqtt.publish(topic, {
"type": "anomaly",
"sensor_id": reading.sensor_id,
"value": reading.value,
"timestamp": reading.timestamp,
})
async def _flush_loop(self):
while True:
await asyncio.sleep(self.flush_interval)
self._flush_buffer()
def _flush_buffer(self):
if not self.buffer:
return
topic = f"devices/{self.device_id}/telemetry/batch"
self.mqtt.publish(topic, {
"device_id": self.device_id,
"readings": [vars(r) for r in self.buffer],
"count": len(self.buffer),
})
self.buffer.clear()
Device Management
Fleet Management
# management/fleet.py
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class DeviceStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
MAINTENANCE = "maintenance"
ERROR = "error"
@dataclass
class Device:
device_id: str
name: str
type: str
status: DeviceStatus
firmware_version: str
last_seen: Optional[datetime] = None
metadata: dict = None
class FleetManager:
def __init__(self, db):
self.db = db
async def register_device(self, device: Device):
await self.db.devices.insert_one(vars(device))
async def update_status(self, device_id: str, status: DeviceStatus):
await self.db.devices.update_one(
{"device_id": device_id},
{"$set": {"status": status.value, "last_seen": datetime.utcnow()}}
)
async def get_devices_by_status(self, status: DeviceStatus):
cursor = self.db.devices.find({"status": status.value})
return await cursor.to_list(length=None)
async def schedule_firmware_update(self, device_ids: list, version: str):
for device_id in device_ids:
await self.send_command(device_id, {
"action": "firmware_update",
"params": {"version": version}
})
Usage Examples
Setup MQTT Communication
Apply iot-patterns skill to implement MQTT client with TLS and automatic reconnection
Configure AWS IoT Core
Apply iot-patterns skill to provision devices with AWS IoT Core and configure rules engine
Implement Edge Processing
Apply iot-patterns skill to create edge device with local anomaly detection and batch uploads
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: iot-patterns
Completed:
- [x] MQTT broker configured (Mosquitto or AWS IoT Core)
- [x] Device client implemented with TLS and authentication
- [x] Telemetry publishing functional
- [x] Command subscription established
- [x] Data ingestion pipeline configured
- [x] Edge processing logic deployed (if applicable)
- [x] Device fleet management initialized
Outputs:
- MQTT client configuration: {broker_url}:{port}
- Device credentials: certificates and keys (secured)
- IoT topic structure: devices/{device_id}/{telemetry|commands}
- Data pipeline: {MQTT → Kinesis → InfluxDB/Timestream}
- Edge device code: device.py with anomaly detection
- Fleet management API: device registration and updates
Connection Test:
- MQTT connection: ESTABLISHED
- Telemetry publish: SUCCESS (last message: {timestamp})
- Command subscription: ACTIVE (topic: devices/+/commands/#)
- Data flow: {messages_per_second} msg/sec
Next Steps:
- Monitor device health via fleet management dashboard
- Configure alerting rules for anomaly detection
- Scale device provisioning for production deployment
Completion Checklist
Before marking this skill as complete, verify:
- MQTT broker running and accessible (check with
mosquitto_sub -t '#') - Device client connects successfully with TLS
- Telemetry messages published to correct topic
- Command messages received by device
- Data ingestion pipeline processing messages (check Kinesis/Kafka)
- InfluxDB/Timestream storing telemetry data
- Edge device runs anomaly detection locally
- Fleet management API registers devices
- Device credentials stored securely (not hardcoded)
- No authentication or connection errors in logs
Failure Indicators
This skill has FAILED if:
- ❌ MQTT broker not running or not accessible
- ❌ TLS handshake fails (certificate errors)
- ❌ Device authentication fails (invalid credentials)
- ❌ Telemetry messages not reaching broker
- ❌ Command messages not received by device
- ❌ Data pipeline not processing messages (Kinesis/Kafka down)
- ❌ InfluxDB/Timestream connection errors
- ❌ Edge anomaly detection not triggering alerts
- ❌ Fleet management API returns errors
- ❌ Device credentials exposed in code or logs
When NOT to Use
Do NOT use this skill when:
- Building simple HTTP-based APIs (not IoT telemetry) - use REST patterns instead
- Device data doesn't require real-time streaming (use batch uploads)
- No edge computing needed (use cloud-only processing)
- Working with single device for testing (use simpler MQTT client)
- Data volume is low (<1K messages/day) - overhead not justified
- Need video/audio streaming (use WebRTC or RTSP instead)
- Bluetooth/BLE local communication only (no cloud integration)
- Legacy device communication protocols (Modbus, OPC UA) - use protocol-specific clients
Use alternative skills:
- api-design-patterns - For HTTP REST APIs
- streaming-patterns - For Kafka/event streaming without IoT specifics
- edge-computing-patterns - For advanced edge ML deployment
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Hardcoded credentials in device code | Security vulnerability | Use device provisioning with certificate-based auth |
| Not using TLS for MQTT | Unencrypted telemetry data | Always enable TLS with valid certificates |
| Publishing high-frequency data without batching | Network congestion, high costs | Batch telemetry at edge, publish every 30-60 seconds |
| No edge processing/filtering | Sending all data to cloud | Filter and aggregate at edge, send summaries |
| Synchronous telemetry publishing | Device hangs on network issues | Use async publishing with local queuing |
| Single MQTT broker (no HA) | Single point of failure | Use clustered Mosquitto or AWS IoT Core |
| Not handling disconnections | Data loss during offline periods | Implement local buffering and replay |
| Ignoring device lifecycle | No decommissioning process | Implement device registration, update, deactivation flow |
Principles
This skill embodies:
- #2 Full Automation - Automated device provisioning and fleet management
- #3 Search Before Create - Reuse MQTT patterns, AWS IoT Core services
- #5 Eliminate Ambiguity - Explicit topic structure (devices/{id}/{telemetry|commands})
- #6 Clear, Understandable, Explainable - Well-documented IoT architecture
- #9 Security First - TLS encryption, certificate-based authentication
- #11 Don't Repeat Yourself (DRY) - Shared MQTT client patterns across devices
Full Principles: CODITECT-STANDARD-AUTOMATION.md