Agent Skills Framework Extension
Event-Driven Architecture Skill
When to Use This Skill
Use this skill when implementing event driven architecture patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
CQRS, event sourcing, domain events, and event-driven system design patterns.
Core Capabilities
- Event Sourcing - State as event sequence
- CQRS - Command/query separation
- Domain Events - Business event modeling
- Projections - Read model building
- Snapshotting - Performance optimization
Event Sourcing
Event Store Implementation
// src/eventsourcing/event-store.ts
interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
aggregateType: string;
version: number;
timestamp: Date;
data: unknown;
metadata: Record<string, unknown>;
}
interface EventStore {
append(events: DomainEvent[], expectedVersion: number): Promise<void>;
load(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>;
loadAll(fromPosition?: number): AsyncIterable<DomainEvent>;
}
export class PostgresEventStore implements EventStore {
constructor(private readonly pool: Pool) {}
async append(events: DomainEvent[], expectedVersion: number): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Optimistic concurrency check
const { rows } = await client.query(
`SELECT MAX(version) as current_version
FROM events
WHERE aggregate_id = $1`,
[events[0].aggregateId]
);
const currentVersion = rows[0]?.current_version ?? -1;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`
);
}
// Insert events
for (const event of events) {
await client.query(
`INSERT INTO events (
event_id, event_type, aggregate_id, aggregate_type,
version, timestamp, data, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
event.eventId,
event.eventType,
event.aggregateId,
event.aggregateType,
event.version,
event.timestamp,
JSON.stringify(event.data),
JSON.stringify(event.metadata),
]
);
}
await client.query('COMMIT');
// Publish to subscribers
for (const event of events) {
await this.publish(event);
}
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async load(aggregateId: string, fromVersion = 0): Promise<DomainEvent[]> {
const { rows } = await this.pool.query(
`SELECT * FROM events
WHERE aggregate_id = $1 AND version >= $2
ORDER BY version`,
[aggregateId, fromVersion]
);
return rows.map(row => ({
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
timestamp: row.timestamp,
data: row.data,
metadata: row.metadata,
}));
}
async *loadAll(fromPosition = 0): AsyncIterable<DomainEvent> {
const BATCH_SIZE = 1000;
let position = fromPosition;
while (true) {
const { rows } = await this.pool.query(
`SELECT * FROM events
WHERE position >= $1
ORDER BY position
LIMIT $2`,
[position, BATCH_SIZE]
);
if (rows.length === 0) break;
for (const row of rows) {
yield {
eventId: row.event_id,
eventType: row.event_type,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
version: row.version,
timestamp: row.timestamp,
data: row.data,
metadata: row.metadata,
};
}
position = rows[rows.length - 1].position + 1;
}
}
private async publish(event: DomainEvent): Promise<void> {
// Publish to message broker for projections
await messageBroker.publish(`events.${event.eventType}`, event);
}
}
Aggregate Root
// src/eventsourcing/aggregate.ts
export abstract class AggregateRoot<TState> {
protected state: TState;
private uncommittedEvents: DomainEvent[] = [];
private version = -1;
protected abstract getInitialState(): TState;
protected abstract applyEvent(state: TState, event: DomainEvent): TState;
constructor() {
this.state = this.getInitialState();
}
get aggregateVersion(): number {
return this.version;
}
get pendingEvents(): DomainEvent[] {
return [...this.uncommittedEvents];
}
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.state = this.applyEvent(this.state, event);
this.version = event.version;
}
}
protected raiseEvent(eventType: string, data: unknown): void {
const event: DomainEvent = {
eventId: crypto.randomUUID(),
eventType,
aggregateId: this.getId(),
aggregateType: this.constructor.name,
version: this.version + this.uncommittedEvents.length + 1,
timestamp: new Date(),
data,
metadata: {},
};
this.state = this.applyEvent(this.state, event);
this.uncommittedEvents.push(event);
}
clearUncommittedEvents(): void {
this.version += this.uncommittedEvents.length;
this.uncommittedEvents = [];
}
protected abstract getId(): string;
}
// Example: Order Aggregate
interface OrderState {
id: string;
customerId: string;
items: OrderItem[];
status: 'draft' | 'confirmed' | 'shipped' | 'delivered' | 'cancelled';
totalCents: number;
}
export class Order extends AggregateRoot<OrderState> {
protected getInitialState(): OrderState {
return {
id: '',
customerId: '',
items: [],
status: 'draft',
totalCents: 0,
};
}
protected getId(): string {
return this.state.id;
}
protected applyEvent(state: OrderState, event: DomainEvent): OrderState {
switch (event.eventType) {
case 'OrderCreated':
return {
...state,
id: event.data.orderId,
customerId: event.data.customerId,
status: 'draft',
};
case 'ItemAdded':
const newItem = event.data as OrderItem;
return {
...state,
items: [...state.items, newItem],
totalCents: state.totalCents + newItem.quantity * newItem.unitPriceCents,
};
case 'OrderConfirmed':
return { ...state, status: 'confirmed' };
case 'OrderShipped':
return { ...state, status: 'shipped' };
case 'OrderCancelled':
return { ...state, status: 'cancelled' };
default:
return state;
}
}
// Commands
static create(orderId: string, customerId: string): Order {
const order = new Order();
order.raiseEvent('OrderCreated', { orderId, customerId });
return order;
}
addItem(productId: string, quantity: number, unitPriceCents: number): void {
if (this.state.status !== 'draft') {
throw new Error('Cannot add items to confirmed order');
}
this.raiseEvent('ItemAdded', { productId, quantity, unitPriceCents });
}
confirm(): void {
if (this.state.items.length === 0) {
throw new Error('Cannot confirm empty order');
}
if (this.state.status !== 'draft') {
throw new Error('Order already confirmed');
}
this.raiseEvent('OrderConfirmed', { confirmedAt: new Date() });
}
cancel(reason: string): void {
if (['delivered', 'cancelled'].includes(this.state.status)) {
throw new Error('Cannot cancel order in current status');
}
this.raiseEvent('OrderCancelled', { reason, cancelledAt: new Date() });
}
}
CQRS Pattern
// src/cqrs/command-handler.ts
interface Command {
type: string;
}
interface CommandHandler<T extends Command> {
handle(command: T): Promise<void>;
}
// Commands
interface CreateOrderCommand extends Command {
type: 'CreateOrder';
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
}
// Command Handler
class CreateOrderHandler implements CommandHandler<CreateOrderCommand> {
constructor(
private readonly eventStore: EventStore,
private readonly orderRepository: OrderRepository
) {}
async handle(command: CreateOrderCommand): Promise<void> {
// Create aggregate
const order = Order.create(command.orderId, command.customerId);
// Add items
for (const item of command.items) {
order.addItem(item.productId, item.quantity, item.price);
}
// Confirm order
order.confirm();
// Persist events
await this.eventStore.append(order.pendingEvents, -1);
order.clearUncommittedEvents();
}
}
// src/cqrs/query-handler.ts
interface Query<TResult> {
type: string;
}
interface QueryHandler<TQuery extends Query<TResult>, TResult> {
handle(query: TQuery): Promise<TResult>;
}
// Queries
interface GetOrderQuery extends Query<OrderReadModel> {
type: 'GetOrder';
orderId: string;
}
interface OrderReadModel {
id: string;
customerName: string;
items: Array<{
productName: string;
quantity: number;
unitPrice: number;
total: number;
}>;
status: string;
total: number;
createdAt: Date;
}
// Query Handler (reads from projection)
class GetOrderHandler implements QueryHandler<GetOrderQuery, OrderReadModel> {
constructor(private readonly readDb: Pool) {}
async handle(query: GetOrderQuery): Promise<OrderReadModel> {
const { rows } = await this.readDb.query(
`SELECT * FROM order_read_models WHERE id = $1`,
[query.orderId]
);
if (rows.length === 0) {
throw new NotFoundError('Order', query.orderId);
}
return rows[0];
}
}
Event Projections
// src/projections/order-projection.ts
interface Projection {
name: string;
handle(event: DomainEvent): Promise<void>;
rebuild(): Promise<void>;
}
export class OrderProjection implements Projection {
name = 'order-read-model';
constructor(
private readonly writeDb: Pool,
private readonly eventStore: EventStore
) {}
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.writeDb.query(
`INSERT INTO order_read_models (id, customer_id, status, created_at)
VALUES ($1, $2, 'draft', $3)`,
[event.data.orderId, event.data.customerId, event.timestamp]
);
break;
case 'ItemAdded':
await this.writeDb.query(
`INSERT INTO order_items_read_models (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)`,
[event.aggregateId, event.data.productId, event.data.quantity, event.data.unitPriceCents]
);
await this.writeDb.query(
`UPDATE order_read_models
SET total = total + $1
WHERE id = $2`,
[event.data.quantity * event.data.unitPriceCents, event.aggregateId]
);
break;
case 'OrderConfirmed':
await this.writeDb.query(
`UPDATE order_read_models SET status = 'confirmed' WHERE id = $1`,
[event.aggregateId]
);
break;
case 'OrderCancelled':
await this.writeDb.query(
`UPDATE order_read_models SET status = 'cancelled' WHERE id = $1`,
[event.aggregateId]
);
break;
}
}
async rebuild(): Promise<void> {
// Clear existing projection
await this.writeDb.query('TRUNCATE order_read_models, order_items_read_models');
// Replay all events
for await (const event of this.eventStore.loadAll()) {
if (['OrderCreated', 'ItemAdded', 'OrderConfirmed', 'OrderCancelled'].includes(event.eventType)) {
await this.handle(event);
}
}
}
}
// Projection Manager
class ProjectionManager {
private projections: Map<string, Projection> = new Map();
register(projection: Projection): void {
this.projections.set(projection.name, projection);
}
async handleEvent(event: DomainEvent): Promise<void> {
for (const projection of this.projections.values()) {
try {
await projection.handle(event);
} catch (error) {
console.error(`Projection ${projection.name} failed:`, error);
// Mark for rebuild or alert
}
}
}
async rebuildProjection(name: string): Promise<void> {
const projection = this.projections.get(name);
if (!projection) throw new Error(`Unknown projection: ${name}`);
console.log(`Rebuilding projection: ${name}`);
await projection.rebuild();
console.log(`Projection rebuilt: ${name}`);
}
}
Snapshotting
// src/eventsourcing/snapshots.ts
interface Snapshot<TState> {
aggregateId: string;
version: number;
state: TState;
createdAt: Date;
}
export class SnapshotStore<TState> {
constructor(private readonly pool: Pool) {}
async save(snapshot: Snapshot<TState>): Promise<void> {
await this.pool.query(
`INSERT INTO snapshots (aggregate_id, version, state, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id) DO UPDATE SET version = $2, state = $3, created_at = $4`,
[snapshot.aggregateId, snapshot.version, JSON.stringify(snapshot.state), snapshot.createdAt]
);
}
async load(aggregateId: string): Promise<Snapshot<TState> | null> {
const { rows } = await this.pool.query(
`SELECT * FROM snapshots WHERE aggregate_id = $1`,
[aggregateId]
);
if (rows.length === 0) return null;
return {
aggregateId: rows[0].aggregate_id,
version: rows[0].version,
state: rows[0].state,
createdAt: rows[0].created_at,
};
}
}
// Repository with snapshots
class OrderRepository {
private readonly SNAPSHOT_INTERVAL = 50;
constructor(
private readonly eventStore: EventStore,
private readonly snapshotStore: SnapshotStore<OrderState>
) {}
async load(orderId: string): Promise<Order> {
const order = new Order();
// Try to load from snapshot
const snapshot = await this.snapshotStore.load(orderId);
if (snapshot) {
order.loadFromSnapshot(snapshot);
}
// Load events after snapshot
const fromVersion = snapshot ? snapshot.version + 1 : 0;
const events = await this.eventStore.load(orderId, fromVersion);
order.loadFromHistory(events);
return order;
}
async save(order: Order): Promise<void> {
await this.eventStore.append(order.pendingEvents, order.aggregateVersion);
// Check if snapshot needed
if (order.aggregateVersion % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotStore.save({
aggregateId: order.getId(),
version: order.aggregateVersion,
state: order.getState(),
createdAt: new Date(),
});
}
order.clearUncommittedEvents();
}
}
Usage Examples
Implement Event Sourcing
Apply event-driven-architecture skill to implement event sourcing for order management with full audit trail
Add CQRS
Apply event-driven-architecture skill to separate command and query responsibilities for the user service
Create Projections
Apply event-driven-architecture skill to build read models with event projections for reporting dashboard
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: event-driven-architecture
Completed:
- [x] Event store implemented with optimistic concurrency control
- [x] Aggregate root base class with event sourcing support
- [x] CQRS command and query handlers separated
- [x] Event projections building read models from event stream
- [x] Snapshotting implemented for performance optimization
Outputs:
- Event store implementation (PostgreSQL or EventStoreDB)
- Aggregate root classes (Order, User, etc.)
- Command handlers (CreateOrder, UpdateUser, etc.)
- Query handlers (GetOrder, ListUsers, etc.)
- Projection services (OrderProjection, UserProjection, etc.)
- Snapshot store with configurable snapshot intervals
Completion Checklist
Before marking this skill as complete, verify:
- Event store enforces optimistic concurrency (version checks)
- All events have unique IDs, types, and timestamps
- Aggregates correctly apply events to rebuild state
- Command handlers validate business rules before raising events
- Query handlers read from projections, not event store
- Projections can be rebuilt from event stream
- Snapshots created at configurable intervals (e.g., every 50 events)
- Event publishing to message broker working
- Concurrency conflicts handled gracefully (retry or reject)
Failure Indicators
This skill has FAILED if:
- ❌ Optimistic concurrency not enforced (events overwrite each other)
- ❌ Events missing critical fields (eventId, timestamp, version)
- ❌ Aggregates don't correctly rebuild state from events
- ❌ Command handlers bypass validation (invalid state transitions)
- ❌ Query handlers read from write model (CQRS violated)
- ❌ Projections can't be rebuilt (missing event handlers)
- ❌ Snapshots not used (performance degradation for old aggregates)
- ❌ Event publishing fails silently (no error handling)
- ❌ Concurrency conflicts cause data loss
When NOT to Use
Do NOT use this skill when:
- CRUD-only application with no complex business logic
- Small dataset (<10K records) with simple queries
- Real-time consistency required (event sourcing has eventual consistency)
- No need for audit trail or historical state
- Team unfamiliar with event sourcing (high learning curve)
- No infrastructure for message broker (Kafka, RabbitMQ)
- Legacy system integration without event support
Use these alternatives instead:
- Simple CRUD: Traditional RDBMS with transactions
- Real-time consistency: Synchronous APIs with locks
- No audit trail: Standard ORM patterns
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Mutable events | Event history changes | Events are immutable - never update, only append |
| No versioning | Breaking changes break replay | Always version event schemas |
| Large events | Performance degradation | Keep events focused on business facts, not full state |
| Synchronous projections | Slow command handling | Projections should be async subscribers |
| No snapshots | Slow aggregate loading | Implement snapshots for aggregates with >50 events |
| Missing event metadata | No debugging context | Include userId, correlationId, causationId |
| Skipping concurrency checks | Lost updates | Always check expected version before appending |
Principles
This skill embodies:
- #1 Recycle Before Create - Use proven event sourcing patterns (aggregate, event store, projection)
- #4 Separation of Concerns - CQRS separates writes (commands) from reads (queries)
- #5 Eliminate Ambiguity - Events are explicit business facts (OrderPlaced, not OrderChanged)
- #7 Verification Protocol - Projections can be rebuilt to verify correctness
- #8 No Assumptions - Optimistic concurrency handles concurrent updates
Full Standard: CODITECT-STANDARD-AUTOMATION.md
Integration Points
- system-architecture-design - Overall architecture decisions
- message-queue-patterns - Event distribution
- database-schema-optimization - Event store and projection schemas