Skip to main content

Agent Skills Framework Extension

Event-Driven Architecture Skill

When to Use This Skill

Use this skill when implementing event driven architecture patterns in your codebase.

How to Use This Skill

  1. Review the patterns and examples below
  2. Apply the relevant patterns to your implementation
  3. Follow the best practices outlined in this skill

CQRS, event sourcing, domain events, and event-driven system design patterns.

Core Capabilities

  1. Event Sourcing - State as event sequence
  2. CQRS - Command/query separation
  3. Domain Events - Business event modeling
  4. Projections - Read model building
  5. Snapshotting - Performance optimization

Event Sourcing

Event Store Implementation

// src/eventsourcing/event-store.ts
interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
version: number;
timestamp: Date;
data: unknown;
metadata: Record<string, unknown>;
}

interface EventStore {
append(events: DomainEvent[], expectedVersion: number): Promise<void>;
load(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>;
loadAll(fromPosition?: number): AsyncIterable<DomainEvent>;
}

export class PostgresEventStore implements EventStore {
constructor(private readonly pool: Pool) {}

async append(events: DomainEvent[], expectedVersion: number): Promise<void> {
const client = await this.pool.connect();

try {
await client.query('BEGIN');

// Optimistic concurrency check
const { rows } = await client.query(
`SELECT MAX(version) as current_version
FROM events
WHERE aggregate_id = $1`,
[events[0].aggregateId]
);

const currentVersion = rows[0]?.current_version ?? -1;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`
);
}

// Insert events
for (const event of events) {
await client.query(
`INSERT INTO events (
event_id, event_type, aggregate_id, aggregate_type,
version, timestamp, data, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
event.eventId,
event.eventType,
event.aggregateId,
event.aggregateType,
event.version,
event.timestamp,
JSON.stringify(event.data),
JSON.stringify(event.metadata),
]
);
}

await client.query('COMMIT');

// Publish to subscribers
for (const event of events) {
await this.publish(event);
}
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}

async load(aggregateId: string, fromVersion = 0): Promise<DomainEvent[]> {
const { rows } = await this.pool.query(
`SELECT * FROM events
WHERE aggregate_id = $1 AND version >= $2
ORDER BY version`,
[aggregateId, fromVersion]
);

return rows.map(row => ({
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
timestamp: row.timestamp,
data: row.data,
metadata: row.metadata,
}));
}

async *loadAll(fromPosition = 0): AsyncIterable<DomainEvent> {
const BATCH_SIZE = 1000;
let position = fromPosition;

while (true) {
const { rows } = await this.pool.query(
`SELECT * FROM events
WHERE position >= $1
ORDER BY position
LIMIT $2`,
[position, BATCH_SIZE]
);

if (rows.length === 0) break;

for (const row of rows) {
yield {
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
timestamp: row.timestamp,
data: row.data,
metadata: row.metadata,
};
}

position = rows[rows.length - 1].position + 1;
}
}

private async publish(event: DomainEvent): Promise<void> {
// Publish to message broker for projections
await messageBroker.publish(`events.${event.eventType}`, event);
}
}

Aggregate Root

// src/eventsourcing/aggregate.ts
export abstract class AggregateRoot<TState> {
protected state: TState;
private uncommittedEvents: DomainEvent[] = [];
private version = -1;

protected abstract getInitialState(): TState;
protected abstract applyEvent(state: TState, event: DomainEvent): TState;

constructor() {
this.state = this.getInitialState();
}

get aggregateVersion(): number {
return this.version;
}

get pendingEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}

loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.state = this.applyEvent(this.state, event);
this.version = event.version;
}
}

protected raiseEvent(eventType: string, data: unknown): void {
const event: DomainEvent = {
eventId: crypto.randomUUID(),
eventType,
aggregateId: this.getId(),
aggregateType: this.constructor.name,
version: this.version + this.uncommittedEvents.length + 1,
timestamp: new Date(),
data,
metadata: {},
};

this.state = this.applyEvent(this.state, event);
this.uncommittedEvents.push(event);
}

clearUncommittedEvents(): void {
this.version += this.uncommittedEvents.length;
this.uncommittedEvents = [];
}

protected abstract getId(): string;
}

// Example: Order Aggregate
interface OrderState {
id: string;
customerId: string;
items: OrderItem[];
status: 'draft' | 'confirmed' | 'shipped' | 'delivered' | 'cancelled';
totalCents: number;
}

export class Order extends AggregateRoot<OrderState> {
protected getInitialState(): OrderState {
return {
id: '',
customerId: '',
items: [],
status: 'draft',
totalCents: 0,
};
}

protected getId(): string {
return this.state.id;
}

protected applyEvent(state: OrderState, event: DomainEvent): OrderState {
switch (event.eventType) {
case 'OrderCreated':
return {
...state,
id: event.data.orderId,
customerId: event.data.customerId,
status: 'draft',
};

case 'ItemAdded':
const newItem = event.data as OrderItem;
return {
...state,
items: [...state.items, newItem],
totalCents: state.totalCents + newItem.quantity * newItem.unitPriceCents,
};

case 'OrderConfirmed':
return { ...state, status: 'confirmed' };

case 'OrderShipped':
return { ...state, status: 'shipped' };

case 'OrderCancelled':
return { ...state, status: 'cancelled' };

default:
return state;
}
}

// Commands
static create(orderId: string, customerId: string): Order {
const order = new Order();
order.raiseEvent('OrderCreated', { orderId, customerId });
return order;
}

addItem(productId: string, quantity: number, unitPriceCents: number): void {
if (this.state.status !== 'draft') {
throw new Error('Cannot add items to confirmed order');
}

this.raiseEvent('ItemAdded', { productId, quantity, unitPriceCents });
}

confirm(): void {
if (this.state.items.length === 0) {
throw new Error('Cannot confirm empty order');
}

if (this.state.status !== 'draft') {
throw new Error('Order already confirmed');
}

this.raiseEvent('OrderConfirmed', { confirmedAt: new Date() });
}

cancel(reason: string): void {
if (['delivered', 'cancelled'].includes(this.state.status)) {
throw new Error('Cannot cancel order in current status');
}

this.raiseEvent('OrderCancelled', { reason, cancelledAt: new Date() });
}
}

CQRS Pattern

// src/cqrs/command-handler.ts
interface Command {
type: string;
}

interface CommandHandler<T extends Command> {
handle(command: T): Promise<void>;
}

// Commands
interface CreateOrderCommand extends Command {
type: 'CreateOrder';
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
}

// Command Handler
class CreateOrderHandler implements CommandHandler<CreateOrderCommand> {
constructor(
private readonly eventStore: EventStore,
private readonly orderRepository: OrderRepository
) {}

async handle(command: CreateOrderCommand): Promise<void> {
// Create aggregate
const order = Order.create(command.orderId, command.customerId);

// Add items
for (const item of command.items) {
order.addItem(item.productId, item.quantity, item.price);
}

// Confirm order
order.confirm();

// Persist events
await this.eventStore.append(order.pendingEvents, -1);
order.clearUncommittedEvents();
}
}

// src/cqrs/query-handler.ts
interface Query<TResult> {
type: string;
}

interface QueryHandler<TQuery extends Query<TResult>, TResult> {
handle(query: TQuery): Promise<TResult>;
}

// Queries
interface GetOrderQuery extends Query<OrderReadModel> {
type: 'GetOrder';
orderId: string;
}

interface OrderReadModel {
id: string;
customerName: string;
items: Array<{
productName: string;
quantity: number;
unitPrice: number;
total: number;
}>;
status: string;
total: number;
createdAt: Date;
}

// Query Handler (reads from projection)
class GetOrderHandler implements QueryHandler<GetOrderQuery, OrderReadModel> {
constructor(private readonly readDb: Pool) {}

async handle(query: GetOrderQuery): Promise<OrderReadModel> {
const { rows } = await this.readDb.query(
`SELECT * FROM order_read_models WHERE id = $1`,
[query.orderId]
);

if (rows.length === 0) {
throw new NotFoundError('Order', query.orderId);
}

return rows[0];
}
}

Event Projections

// src/projections/order-projection.ts
interface Projection {
name: string;
handle(event: DomainEvent): Promise<void>;
rebuild(): Promise<void>;
}

export class OrderProjection implements Projection {
name = 'order-read-model';

constructor(
private readonly writeDb: Pool,
private readonly eventStore: EventStore
) {}

async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.writeDb.query(
`INSERT INTO order_read_models (id, customer_id, status, created_at)
VALUES ($1, $2, 'draft', $3)`,
[event.data.orderId, event.data.customerId, event.timestamp]
);
break;

case 'ItemAdded':
await this.writeDb.query(
`INSERT INTO order_items_read_models (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)`,
[event.aggregateId, event.data.productId, event.data.quantity, event.data.unitPriceCents]
);

await this.writeDb.query(
`UPDATE order_read_models
SET total = total + $1
WHERE id = $2`,
[event.data.quantity * event.data.unitPriceCents, event.aggregateId]
);
break;

case 'OrderConfirmed':
await this.writeDb.query(
`UPDATE order_read_models SET status = 'confirmed' WHERE id = $1`,
[event.aggregateId]
);
break;

case 'OrderCancelled':
await this.writeDb.query(
`UPDATE order_read_models SET status = 'cancelled' WHERE id = $1`,
[event.aggregateId]
);
break;
}
}

async rebuild(): Promise<void> {
// Clear existing projection
await this.writeDb.query('TRUNCATE order_read_models, order_items_read_models');

// Replay all events
for await (const event of this.eventStore.loadAll()) {
if (['OrderCreated', 'ItemAdded', 'OrderConfirmed', 'OrderCancelled'].includes(event.eventType)) {
await this.handle(event);
}
}
}
}

// Projection Manager
class ProjectionManager {
private projections: Map<string, Projection> = new Map();

register(projection: Projection): void {
this.projections.set(projection.name, projection);
}

async handleEvent(event: DomainEvent): Promise<void> {
for (const projection of this.projections.values()) {
try {
await projection.handle(event);
} catch (error) {
console.error(`Projection ${projection.name} failed:`, error);
// Mark for rebuild or alert
}
}
}

async rebuildProjection(name: string): Promise<void> {
const projection = this.projections.get(name);
if (!projection) throw new Error(`Unknown projection: ${name}`);

console.log(`Rebuilding projection: ${name}`);
await projection.rebuild();
console.log(`Projection rebuilt: ${name}`);
}
}

Snapshotting

// src/eventsourcing/snapshots.ts
interface Snapshot<TState> {
aggregateId: string;
version: number;
state: TState;
createdAt: Date;
}

export class SnapshotStore<TState> {
constructor(private readonly pool: Pool) {}

async save(snapshot: Snapshot<TState>): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (aggregate_id, version, state, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
[snapshot.aggregateId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
);
}

async load(aggregateId: string): Promise<Snapshot<TState> | null> {
const { rows } = await this.pool.query(
`SELECT * FROM snapshots WHERE aggregate_id = $1`,
[aggregateId]
);

if (rows.length === 0) return null;

return {
aggregateId: rows[0].aggregate_id,
version: rows[0].version,
state: rows[0].state,
createdAt: rows[0].created_at,
};
}
}

// Repository with snapshots
class OrderRepository {
private readonly SNAPSHOT_INTERVAL = 50;

constructor(
private readonly eventStore: EventStore,
private readonly snapshotStore: SnapshotStore<OrderState>
) {}

async load(orderId: string): Promise<Order> {
const order = new Order();

// Try to load from snapshot
const snapshot = await this.snapshotStore.load(orderId);
if (snapshot) {
order.loadFromSnapshot(snapshot);
}

// Load events after snapshot
const fromVersion = snapshot ? snapshot.version + 1 : 0;
const events = await this.eventStore.load(orderId, fromVersion);
order.loadFromHistory(events);

return order;
}

async save(order: Order): Promise<void> {
await this.eventStore.append(order.pendingEvents, order.aggregateVersion);

// Check if snapshot needed
if (order.aggregateVersion % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotStore.save({
aggregateId: order.getId(),
version: order.aggregateVersion,
state: order.getState(),
createdAt: new Date(),
});
}

order.clearUncommittedEvents();
}
}

Usage Examples

Implement Event Sourcing

Apply event-driven-architecture skill to implement event sourcing for order management with full audit trail

Add CQRS

Apply event-driven-architecture skill to separate command and query responsibilities for the user service

Create Projections

Apply event-driven-architecture skill to build read models with event projections for reporting dashboard

Success Output

When successful, this skill MUST output:

✅ SKILL COMPLETE: event-driven-architecture

Completed:
- [x] Event store implemented with optimistic concurrency control
- [x] Aggregate root base class with event sourcing support
- [x] CQRS command and query handlers separated
- [x] Event projections building read models from event stream
- [x] Snapshotting implemented for performance optimization

Outputs:
- Event store implementation (PostgreSQL or EventStoreDB)
- Aggregate root classes (Order, User, etc.)
- Command handlers (CreateOrder, UpdateUser, etc.)
- Query handlers (GetOrder, ListUsers, etc.)
- Projection services (OrderProjection, UserProjection, etc.)
- Snapshot store with configurable snapshot intervals

Completion Checklist

Before marking this skill as complete, verify:

  • Event store enforces optimistic concurrency (version checks)
  • All events have unique IDs, types, and timestamps
  • Aggregates correctly apply events to rebuild state
  • Command handlers validate business rules before raising events
  • Query handlers read from projections, not event store
  • Projections can be rebuilt from event stream
  • Snapshots created at configurable intervals (e.g., every 50 events)
  • Event publishing to message broker working
  • Concurrency conflicts handled gracefully (retry or reject)

Failure Indicators

This skill has FAILED if:

  • ❌ Optimistic concurrency not enforced (events overwrite each other)
  • ❌ Events missing critical fields (eventId, timestamp, version)
  • ❌ Aggregates don't correctly rebuild state from events
  • ❌ Command handlers bypass validation (invalid state transitions)
  • ❌ Query handlers read from write model (CQRS violated)
  • ❌ Projections can't be rebuilt (missing event handlers)
  • ❌ Snapshots not used (performance degradation for old aggregates)
  • ❌ Event publishing fails silently (no error handling)
  • ❌ Concurrency conflicts cause data loss

When NOT to Use

Do NOT use this skill when:

  • CRUD-only application with no complex business logic
  • Small dataset (<10K records) with simple queries
  • Real-time consistency required (event sourcing has eventual consistency)
  • No need for audit trail or historical state
  • Team unfamiliar with event sourcing (high learning curve)
  • No infrastructure for message broker (Kafka, RabbitMQ)
  • Legacy system integration without event support

Use these alternatives instead:

  • Simple CRUD: Traditional RDBMS with transactions
  • Real-time consistency: Synchronous APIs with locks
  • No audit trail: Standard ORM patterns

Anti-Patterns (Avoid)

Anti-PatternProblemSolution
Mutable eventsEvent history changesEvents are immutable - never update, only append
No versioningBreaking changes break replayAlways version event schemas
Large eventsPerformance degradationKeep events focused on business facts, not full state
Synchronous projectionsSlow command handlingProjections should be async subscribers
No snapshotsSlow aggregate loadingImplement snapshots for aggregates with >50 events
Missing event metadataNo debugging contextInclude userId, correlationId, causationId
Skipping concurrency checksLost updatesAlways check expected version before appending

Principles

This skill embodies:

  • #1 Recycle Before Create - Use proven event sourcing patterns (aggregate, event store, projection)
  • #4 Separation of Concerns - CQRS separates writes (commands) from reads (queries)
  • #5 Eliminate Ambiguity - Events are explicit business facts (OrderPlaced, not OrderChanged)
  • #7 Verification Protocol - Projections can be rebuilt to verify correctness
  • #8 No Assumptions - Optimistic concurrency handles concurrent updates

Full Standard: CODITECT-STANDARD-AUTOMATION.md

Integration Points

  • system-architecture-design - Overall architecture decisions
  • message-queue-patterns - Event distribution
  • database-schema-optimization - Event store and projection schemas