Skip to main content

Agent Skills Framework Extension

Message Queue Patterns Skill

When to Use This Skill

Use this skill when implementing message queue patterns patterns in your codebase.

How to Use This Skill

  1. Review the patterns and examples below
  2. Apply the relevant patterns to your implementation
  3. Follow the best practices outlined in this skill

RabbitMQ, Kafka, async messaging, event-driven architecture, and queue management patterns.

Core Capabilities

  1. Producer/Consumer - Reliable message delivery
  2. Pub/Sub - Fan-out messaging
  3. Dead Letter Queues - Failed message handling
  4. Ordering - Guaranteed message sequence
  5. Idempotency - Safe message reprocessing

RabbitMQ Patterns

Producer with Confirms

// src/messaging/rabbitmq/producer.ts
import amqp, { Channel, Connection, ConfirmChannel } from 'amqplib';

interface MessageOptions {
persistent?: boolean;
expiration?: string;
priority?: number;
headers?: Record<string, unknown>;
}

export class RabbitMQProducer {
private connection: Connection | null = null;
private channel: ConfirmChannel | null = null;

constructor(private readonly url: string) {}

async connect(): Promise<void> {
this.connection = await amqp.connect(this.url);
this.channel = await this.connection.createConfirmChannel();

// Handle connection errors
this.connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
this.reconnect();
});

this.connection.on('close', () => {
console.log('RabbitMQ connection closed');
this.reconnect();
});
}

private async reconnect(): Promise<void> {
setTimeout(async () => {
try {
await this.connect();
console.log('RabbitMQ reconnected');
} catch (err) {
console.error('RabbitMQ reconnect failed:', err);
this.reconnect();
}
}, 5000);
}

async publish(
exchange: string,
routingKey: string,
message: unknown,
options: MessageOptions = {}
): Promise<void> {
if (!this.channel) {
throw new Error('Channel not initialized');
}

const content = Buffer.from(JSON.stringify(message));

return new Promise((resolve, reject) => {
this.channel!.publish(
exchange,
routingKey,
content,
{
persistent: options.persistent ?? true,
expiration: options.expiration,
priority: options.priority,
headers: options.headers,
contentType: 'application/json',
timestamp: Date.now(),
},
(err) => {
if (err) {
reject(err);
} else {
resolve();
}
}
);
});
}

async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}

Consumer with DLQ

// src/messaging/rabbitmq/consumer.ts
import amqp, { Channel, Connection, ConsumeMessage } from 'amqplib';

interface ConsumerOptions {
prefetch?: number;
maxRetries?: number;
retryDelay?: number;
}

export class RabbitMQConsumer<T> {
private connection: Connection | null = null;
private channel: Channel | null = null;

constructor(
private readonly url: string,
private readonly queue: string,
private readonly handler: (message: T) => Promise<void>,
private readonly options: ConsumerOptions = {}
) {}

async start(): Promise<void> {
this.connection = await amqp.connect(this.url);
this.channel = await this.connection.createChannel();

// Set prefetch for fair dispatch
await this.channel.prefetch(this.options.prefetch ?? 10);

// Setup main queue with DLQ
await this.setupQueues();

// Start consuming
await this.channel.consume(
this.queue,
this.processMessage.bind(this),
{ noAck: false }
);

console.log(`Consumer started on queue: ${this.queue}`);
}

private async setupQueues(): Promise<void> {
const dlxExchange = `${this.queue}.dlx`;
const dlqQueue = `${this.queue}.dlq`;

// Dead letter exchange and queue
await this.channel!.assertExchange(dlxExchange, 'direct', { durable: true });
await this.channel!.assertQueue(dlqQueue, { durable: true });
await this.channel!.bindQueue(dlqQueue, dlxExchange, this.queue);

// Main queue with DLX
await this.channel!.assertQueue(this.queue, {
durable: true,
arguments: {
'x-dead-letter-exchange': dlxExchange,
'x-dead-letter-routing-key': this.queue,
},
});
}

private async processMessage(msg: ConsumeMessage | null): Promise<void> {
if (!msg) return;

const retryCount = (msg.properties.headers?.['x-retry-count'] as number) || 0;
const maxRetries = this.options.maxRetries ?? 3;

try {
const content = JSON.parse(msg.content.toString()) as T;
await this.handler(content);
this.channel!.ack(msg);
} catch (error) {
console.error('Message processing failed:', error);

if (retryCount < maxRetries) {
// Retry with delay
await this.retryMessage(msg, retryCount + 1);
this.channel!.ack(msg);
} else {
// Send to DLQ
this.channel!.nack(msg, false, false);
}
}
}

private async retryMessage(msg: ConsumeMessage, retryCount: number): Promise<void> {
const delay = (this.options.retryDelay ?? 1000) * Math.pow(2, retryCount - 1);

// Use delayed message plugin or simple setTimeout
setTimeout(() => {
this.channel!.publish(
'',
this.queue,
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount,
},
}
);
}, delay);
}

async stop(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}

Apache Kafka Patterns

// src/messaging/kafka/producer.ts
import { Kafka, Producer, ProducerRecord, Message } from 'kafkajs';

interface KafkaProducerConfig {
clientId: string;
brokers: string[];
idempotent?: boolean;
}

export class KafkaProducer<T> {
private kafka: Kafka;
private producer: Producer;

constructor(private readonly config: KafkaProducerConfig) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
});

this.producer = this.kafka.producer({
idempotent: config.idempotent ?? true,
maxInFlightRequests: 5,
});
}

async connect(): Promise<void> {
await this.producer.connect();
}

async send(
topic: string,
messages: Array<{ key?: string; value: T; partition?: number }>
): Promise<void> {
const kafkaMessages: Message[] = messages.map(msg => ({
key: msg.key,
value: JSON.stringify(msg.value),
partition: msg.partition,
}));

await this.producer.send({
topic,
messages: kafkaMessages,
});
}

async sendBatch(records: ProducerRecord[]): Promise<void> {
await this.producer.sendBatch({
topicMessages: records,
});
}

async disconnect(): Promise<void> {
await this.producer.disconnect();
}
}

// src/messaging/kafka/consumer.ts
import { Kafka, Consumer, EachMessagePayload, ConsumerRunConfig } from 'kafkajs';

interface KafkaConsumerConfig {
clientId: string;
brokers: string[];
groupId: string;
topics: string[];
}

export class KafkaConsumer<T> {
private kafka: Kafka;
private consumer: Consumer;

constructor(
private readonly config: KafkaConsumerConfig,
private readonly handler: (message: T, metadata: EachMessagePayload) => Promise<void>
) {
this.kafka = new Kafka({
clientId: config.clientId,
brokers: config.brokers,
});

this.consumer = this.kafka.consumer({
groupId: config.groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
}

async start(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({
topics: this.config.topics,
fromBeginning: false,
});

await this.consumer.run({
eachMessage: async (payload) => {
const { topic, partition, message } = payload;
const value = JSON.parse(message.value?.toString() || '{}') as T;

try {
await this.handler(value, payload);
} catch (error) {
console.error(`Error processing message from ${topic}:${partition}`, error);
// Kafka handles offset commits, so this message will be reprocessed
throw error;
}
},
});
}

async stop(): Promise<void> {
await this.consumer.disconnect();
}
}

BullMQ (Redis-based)

// src/messaging/bullmq/queue.ts
import { Queue, Worker, Job, QueueEvents } from 'bullmq';
import Redis from 'ioredis';

interface JobData {
type: string;
payload: unknown;
}

interface QueueConfig {
name: string;
connection: Redis;
concurrency?: number;
}

export class JobQueue {
private queue: Queue<JobData>;
private worker: Worker<JobData> | null = null;
private events: QueueEvents;

constructor(private readonly config: QueueConfig) {
this.queue = new Queue(config.name, {
connection: config.connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: 1000,
removeOnFail: 5000,
},
});

this.events = new QueueEvents(config.name, {
connection: config.connection,
});
}

async add(
jobName: string,
data: JobData,
options?: { delay?: number; priority?: number }
): Promise<Job<JobData>> {
return this.queue.add(jobName, data, {
delay: options?.delay,
priority: options?.priority,
});
}

async addBulk(
jobs: Array<{ name: string; data: JobData }>
): Promise<Job<JobData>[]> {
return this.queue.addBulk(jobs);
}

startWorker(
processor: (job: Job<JobData>) => Promise<unknown>
): void {
this.worker = new Worker(
this.config.name,
processor,
{
connection: this.config.connection,
concurrency: this.config.concurrency ?? 5,
}
);

this.worker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});

this.worker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err);
});
}

async stop(): Promise<void> {
await this.worker?.close();
await this.queue.close();
await this.events.close();
}
}

// Usage
const emailQueue = new JobQueue({
name: 'emails',
connection: redis,
concurrency: 10,
});

await emailQueue.add('send-welcome', {
type: 'welcome',
payload: { userId: '123', email: 'user@example.com' },
});

emailQueue.startWorker(async (job) => {
const { type, payload } = job.data;

if (type === 'welcome') {
await sendWelcomeEmail(payload);
}
});

Idempotent Consumer

// src/messaging/idempotent-consumer.ts
import Redis from 'ioredis';

export class IdempotentConsumer<T> {
constructor(
private readonly redis: Redis,
private readonly handler: (message: T) => Promise<void>,
private readonly keyExtractor: (message: T) => string,
private readonly ttl: number = 86400 // 24 hours
) {}

async process(message: T): Promise<boolean> {
const messageId = this.keyExtractor(message);
const key = `processed:${messageId}`;

// Try to set the key (atomic check-and-set)
const wasSet = await this.redis.set(key, '1', 'EX', this.ttl, 'NX');

if (!wasSet) {
// Message already processed
console.log(`Duplicate message: ${messageId}`);
return false;
}

try {
await this.handler(message);
return true;
} catch (error) {
// Remove the key on failure to allow retry
await this.redis.del(key);
throw error;
}
}
}

Usage Examples

Setup RabbitMQ Queue

Apply message-queue-patterns skill to create RabbitMQ producer/consumer with DLQ for order processing

Add Kafka Streaming

Apply message-queue-patterns skill to implement Kafka producer/consumer for event streaming

Create Job Queue

Apply message-queue-patterns skill to set up BullMQ for background email processing

Success Output

When this skill is successfully applied, you MUST output:

✅ SKILL COMPLETE: message-queue-patterns

Completed:
- [x] Message queue implementation with chosen technology (RabbitMQ/Kafka/BullMQ)
- [x] Producer configured with confirms/acknowledgments
- [x] Consumer implemented with error handling and retries
- [x] Dead Letter Queue (DLQ) setup for failed messages
- [x] Idempotency handling to prevent duplicate processing

Outputs:
- Producer implementation: [file path]
- Consumer implementation: [file path]
- Queue configuration: [file path]
- Integration tests: [file path]

Completion Checklist

Before marking this skill as complete, verify:

  • Message producer sends messages with confirms/acknowledgments
  • Consumer processes messages with proper error handling
  • DLQ configured and tested with failed message scenarios
  • Retry logic implemented with exponential backoff
  • Idempotency checking prevents duplicate processing
  • Connection pooling and reconnection logic in place
  • Monitoring metrics exposed (queue depth, message rate, errors)
  • Integration tests cover happy path and failure scenarios

Failure Indicators

This skill has FAILED if:

  • ❌ Messages lost without acknowledgment or persistence
  • ❌ Consumer crashes on malformed messages without recovery
  • ❌ No DLQ configured, failed messages disappear
  • ❌ Retry logic missing, transient failures cause permanent loss
  • ❌ Duplicate messages processed multiple times (no idempotency)
  • ❌ Connection errors cause application crash (no reconnection)
  • ❌ No observability into queue health or message flow

When NOT to Use

Do NOT use this skill when:

  • Simple request-response patterns suffice (use REST API instead)
  • Real-time synchronous communication required (use WebSockets/gRPC)
  • Single-process application with no distributed components (use in-memory queues)
  • Message volume very low (<100/day) and not critical (simpler alternatives work)
  • No requirement for guaranteed delivery or asynchronous processing

Use alternatives:

  • For request-response: REST API or GraphQL
  • For real-time sync: WebSockets, Server-Sent Events, gRPC streaming
  • For in-process async: Thread pools, async/await patterns
  • For simple background jobs: Cron jobs, scheduled tasks

Anti-Patterns (Avoid)

Anti-PatternProblemSolution
No message acknowledgmentMessages lost on consumer crashAlways use manual ack after successful processing
Missing DLQFailed messages disappearConfigure DLQ for all queues
Infinite retriesPoison messages block queue foreverSet max retry limit, use DLQ
Synchronous processingDefeats purpose of async messagingProcess messages asynchronously
No idempotencyDuplicate processing causes data corruptionImplement idempotency keys/checks
Single consumerNo scalability or fault toleranceUse consumer groups/multiple workers
No monitoringCan't detect queue buildup or failuresExpose metrics: queue depth, throughput, errors
Storing large payloadsPoor performance, resource wasteStore reference/ID, fetch from storage

Principles

This skill embodies:

  • #1 Recycle → Extend → Re-Use → Create - Leverage proven queue patterns (RabbitMQ, Kafka, BullMQ)
  • #3 Keep It Simple - Choose simplest queue technology that meets requirements
  • #4 Separation of Concerns - Decouple producers from consumers
  • #8 No Assumptions - Explicit error handling, retries, and idempotency
  • #11 Resilience and Robustness - DLQ, retries, reconnection logic ensure system stability

Full Standard: CODITECT-STANDARD-AUTOMATION.md

Integration Points

  • event-driven-architecture - Event sourcing patterns
  • microservices-patterns - Service-to-service communication
  • monitoring-observability - Queue metrics and alerting