Agent Skills Framework Extension
Message Queue Patterns Skill
When to Use This Skill
Use this skill when implementing message queue patterns patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
RabbitMQ, Kafka, async messaging, event-driven architecture, and queue management patterns.
Core Capabilities
- Producer/Consumer - Reliable message delivery
- Pub/Sub - Fan-out messaging
- Dead Letter Queues - Failed message handling
- Ordering - Guaranteed message sequence
- Idempotency - Safe message reprocessing
RabbitMQ Patterns
Producer with Confirms
// src/messaging/rabbitmq/producer.ts
import amqp, { Channel, Connection, ConfirmChannel } from 'amqplib';
interface MessageOptions {
persistent?: boolean;
expiration?: string;
priority?: number;
headers?: Record<string, unknown>;
}
export class RabbitMQProducer {
private connection: Connection | null = null;
private channel: ConfirmChannel | null = null;
constructor(private readonly url: string) {}
async connect(): Promise<void> {
this.connection = await amqp.connect(this.url);
this.channel = await this.connection.createConfirmChannel();
// Handle connection errors
this.connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
this.reconnect();
});
this.connection.on('close', () => {
console.log('RabbitMQ connection closed');
this.reconnect();
});
}
private async reconnect(): Promise<void> {
setTimeout(async () => {
try {
await this.connect();
console.log('RabbitMQ reconnected');
} catch (err) {
console.error('RabbitMQ reconnect failed:', err);
this.reconnect();
}
}, 5000);
}
async publish(
exchange: string,
routingKey: string,
message: unknown,
options: MessageOptions = {}
): Promise<void> {
if (!this.channel) {
throw new Error('Channel not initialized');
}
const content = Buffer.from(JSON.stringify(message));
return new Promise((resolve, reject) => {
this.channel!.publish(
exchange,
routingKey,
content,
{
persistent: options.persistent ?? true,
expiration: options.expiration,
priority: options.priority,
headers: options.headers,
contentType: 'application/json',
timestamp: Date.now(),
},
(err) => {
if (err) {
reject(err);
} else {
resolve();
}
}
);
});
}
async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
Consumer with DLQ
// src/messaging/rabbitmq/consumer.ts
import amqp, { Channel, Connection, ConsumeMessage } from 'amqplib';
interface ConsumerOptions {
prefetch?: number;
maxRetries?: number;
retryDelay?: number;
}
export class RabbitMQConsumer<T> {
private connection: Connection | null = null;
private channel: Channel | null = null;
constructor(
private readonly url: string,
private readonly queue: string,
private readonly handler: (message: T) => Promise<void>,
private readonly options: ConsumerOptions = {}
) {}
async start(): Promise<void> {
this.connection = await amqp.connect(this.url);
this.channel = await this.connection.createChannel();
// Set prefetch for fair dispatch
await this.channel.prefetch(this.options.prefetch ?? 10);
// Setup main queue with DLQ
await this.setupQueues();
// Start consuming
await this.channel.consume(
this.queue,
this.processMessage.bind(this),
{ noAck: false }
);
console.log(`Consumer started on queue: ${this.queue}`);
}
private async setupQueues(): Promise<void> {
const dlxExchange = `${this.queue}.dlx`;
const dlqQueue = `${this.queue}.dlq`;
// Dead letter exchange and queue
await this.channel!.assertExchange(dlxExchange, 'direct', { durable: true });
await this.channel!.assertQueue(dlqQueue, { durable: true });
await this.channel!.bindQueue(dlqQueue, dlxExchange, this.queue);
// Main queue with DLX
await this.channel!.assertQueue(this.queue, {
durable: true,
arguments: {
'x-dead-letter-exchange': dlxExchange,
'x-dead-letter-routing-key': this.queue,
},
});
}
private async processMessage(msg: ConsumeMessage | null): Promise<void> {
if (!msg) return;
const retryCount = (msg.properties.headers?.['x-retry-count'] as number) || 0;
const maxRetries = this.options.maxRetries ?? 3;
try {
const content = JSON.parse(msg.content.toString()) as T;
await this.handler(content);
this.channel!.ack(msg);
} catch (error) {
console.error('Message processing failed:', error);
if (retryCount < maxRetries) {
// Retry with delay
await this.retryMessage(msg, retryCount + 1);
this.channel!.ack(msg);
} else {
// Send to DLQ
this.channel!.nack(msg, false, false);
}
}
}
private async retryMessage(msg: ConsumeMessage, retryCount: number): Promise<void> {
const delay = (this.options.retryDelay ?? 1000) * Math.pow(2, retryCount - 1);
// Use delayed message plugin or simple setTimeout
setTimeout(() => {
this.channel!.publish(
'',
this.queue,
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount,
},
}
);
}, delay);
}
async stop(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
Apache Kafka Patterns
// src/messaging/kafka/producer.ts
import { Kafka, Producer, ProducerRecord, Message } from 'kafkajs';
interface KafkaProducerConfig {
clientId: string;
brokers: string[];
idempotent?: boolean;
}
export class KafkaProducer<T> {
private kafka: Kafka;
private producer: Producer;
constructor(private readonly config: KafkaProducerConfig) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
});
this.producer = this.kafka.producer({
idempotent: config.idempotent ?? true,
maxInFlightRequests: 5,
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async send(
topic: string,
messages: Array<{ key?: string; value: T; partition?: number }>
): Promise<void> {
const kafkaMessages: Message[] = messages.map(msg => ({
key: msg.key,
value: JSON.stringify(msg.value),
partition: msg.partition,
}));
await this.producer.send({
topic,
messages: kafkaMessages,
});
}
async sendBatch(records: ProducerRecord[]): Promise<void> {
await this.producer.sendBatch({
topicMessages: records,
});
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
}
}
// src/messaging/kafka/consumer.ts
import { Kafka, Consumer, EachMessagePayload, ConsumerRunConfig } from 'kafkajs';
interface KafkaConsumerConfig {
clientId: string;
brokers: string[];
groupId: string;
topics: string[];
}
export class KafkaConsumer<T> {
private kafka: Kafka;
private consumer: Consumer;
constructor(
private readonly config: KafkaConsumerConfig,
private readonly handler: (message: T, metadata: EachMessagePayload) => Promise<void>
) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
});
this.consumer = this.kafka.consumer({
groupId: config.groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
}
async start(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({
topics: this.config.topics,
fromBeginning: false,
});
await this.consumer.run({
eachMessage: async (payload) => {
const { topic, partition, message } = payload;
const value = JSON.parse(message.value?.toString() || '{}') as T;
try {
await this.handler(value, payload);
} catch (error) {
console.error(`Error processing message from ${topic}:${partition}`, error);
// Kafka handles offset commits, so this message will be reprocessed
throw error;
}
},
});
}
async stop(): Promise<void> {
await this.consumer.disconnect();
}
}
BullMQ (Redis-based)
// src/messaging/bullmq/queue.ts
import { Queue, Worker, Job, QueueEvents } from 'bullmq';
import Redis from 'ioredis';
interface JobData {
type: string;
payload: unknown;
}
interface QueueConfig {
name: string;
connection: Redis;
concurrency?: number;
}
export class JobQueue {
private queue: Queue<JobData>;
private worker: Worker<JobData> | null = null;
private events: QueueEvents;
constructor(private readonly config: QueueConfig) {
this.queue = new Queue(config.name, {
connection: config.connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: 1000,
removeOnFail: 5000,
},
});
this.events = new QueueEvents(config.name, {
connection: config.connection,
});
}
async add(
jobName: string,
data: JobData,
options?: { delay?: number; priority?: number }
): Promise<Job<JobData>> {
return this.queue.add(jobName, data, {
delay: options?.delay,
priority: options?.priority,
});
}
async addBulk(
jobs: Array<{ name: string; data: JobData }>
): Promise<Job<JobData>[]> {
return this.queue.addBulk(jobs);
}
startWorker(
processor: (job: Job<JobData>) => Promise<unknown>
): void {
this.worker = new Worker(
this.config.name,
processor,
{
connection: this.config.connection,
concurrency: this.config.concurrency ?? 5,
}
);
this.worker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
this.worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err);
});
}
async stop(): Promise<void> {
await this.worker?.close();
await this.queue.close();
await this.events.close();
}
}
// Usage
const emailQueue = new JobQueue({
name: 'emails',
connection: redis,
concurrency: 10,
});
await emailQueue.add('send-welcome', {
type: 'welcome',
payload: { userId: '123', email: 'user@example.com' },
});
emailQueue.startWorker(async (job) => {
const { type, payload } = job.data;
if (type === 'welcome') {
await sendWelcomeEmail(payload);
}
});
Idempotent Consumer
// src/messaging/idempotent-consumer.ts
import Redis from 'ioredis';
export class IdempotentConsumer<T> {
constructor(
private readonly redis: Redis,
private readonly handler: (message: T) => Promise<void>,
private readonly keyExtractor: (message: T) => string,
private readonly ttl: number = 86400 // 24 hours
) {}
async process(message: T): Promise<boolean> {
const messageId = this.keyExtractor(message);
const key = `processed:${messageId}`;
// Try to set the key (atomic check-and-set)
const wasSet = await this.redis.set(key, '1', 'EX', this.ttl, 'NX');
if (!wasSet) {
// Message already processed
console.log(`Duplicate message: ${messageId}`);
return false;
}
try {
await this.handler(message);
return true;
} catch (error) {
// Remove the key on failure to allow retry
await this.redis.del(key);
throw error;
}
}
}
Usage Examples
Setup RabbitMQ Queue
Apply message-queue-patterns skill to create RabbitMQ producer/consumer with DLQ for order processing
Add Kafka Streaming
Apply message-queue-patterns skill to implement Kafka producer/consumer for event streaming
Create Job Queue
Apply message-queue-patterns skill to set up BullMQ for background email processing
Success Output
When this skill is successfully applied, you MUST output:
✅ SKILL COMPLETE: message-queue-patterns
Completed:
- [x] Message queue implementation with chosen technology (RabbitMQ/Kafka/BullMQ)
- [x] Producer configured with confirms/acknowledgments
- [x] Consumer implemented with error handling and retries
- [x] Dead Letter Queue (DLQ) setup for failed messages
- [x] Idempotency handling to prevent duplicate processing
Outputs:
- Producer implementation: [file path]
- Consumer implementation: [file path]
- Queue configuration: [file path]
- Integration tests: [file path]
Completion Checklist
Before marking this skill as complete, verify:
- Message producer sends messages with confirms/acknowledgments
- Consumer processes messages with proper error handling
- DLQ configured and tested with failed message scenarios
- Retry logic implemented with exponential backoff
- Idempotency checking prevents duplicate processing
- Connection pooling and reconnection logic in place
- Monitoring metrics exposed (queue depth, message rate, errors)
- Integration tests cover happy path and failure scenarios
Failure Indicators
This skill has FAILED if:
- ❌ Messages lost without acknowledgment or persistence
- ❌ Consumer crashes on malformed messages without recovery
- ❌ No DLQ configured, failed messages disappear
- ❌ Retry logic missing, transient failures cause permanent loss
- ❌ Duplicate messages processed multiple times (no idempotency)
- ❌ Connection errors cause application crash (no reconnection)
- ❌ No observability into queue health or message flow
When NOT to Use
Do NOT use this skill when:
- Simple request-response patterns suffice (use REST API instead)
- Real-time synchronous communication required (use WebSockets/gRPC)
- Single-process application with no distributed components (use in-memory queues)
- Message volume very low (<100/day) and not critical (simpler alternatives work)
- No requirement for guaranteed delivery or asynchronous processing
Use alternatives:
- For request-response: REST API or GraphQL
- For real-time sync: WebSockets, Server-Sent Events, gRPC streaming
- For in-process async: Thread pools, async/await patterns
- For simple background jobs: Cron jobs, scheduled tasks
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| No message acknowledgment | Messages lost on consumer crash | Always use manual ack after successful processing |
| Missing DLQ | Failed messages disappear | Configure DLQ for all queues |
| Infinite retries | Poison messages block queue forever | Set max retry limit, use DLQ |
| Synchronous processing | Defeats purpose of async messaging | Process messages asynchronously |
| No idempotency | Duplicate processing causes data corruption | Implement idempotency keys/checks |
| Single consumer | No scalability or fault tolerance | Use consumer groups/multiple workers |
| No monitoring | Can't detect queue buildup or failures | Expose metrics: queue depth, throughput, errors |
| Storing large payloads | Poor performance, resource waste | Store reference/ID, fetch from storage |
Principles
This skill embodies:
- #1 Recycle → Extend → Re-Use → Create - Leverage proven queue patterns (RabbitMQ, Kafka, BullMQ)
- #3 Keep It Simple - Choose simplest queue technology that meets requirements
- #4 Separation of Concerns - Decouple producers from consumers
- #8 No Assumptions - Explicit error handling, retries, and idempotency
- #11 Resilience and Robustness - DLQ, retries, reconnection logic ensure system stability
Full Standard: CODITECT-STANDARD-AUTOMATION.md
Integration Points
- event-driven-architecture - Event sourcing patterns
- microservices-patterns - Service-to-service communication
- monitoring-observability - Queue metrics and alerting