Agent Skills Framework Extension
FoundationDB Patterns Skill
When to Use This Skill
Use this skill when implementing foundationdb patterns patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
Production FoundationDB patterns for distributed ACID transactions, key design, and layer implementations.
Core Capabilities
- Key Design - Hierarchical keys, tuple encoding, range queries
- Layer Patterns - Document, table, queue implementations
- Transactions - ACID guarantees, conflict resolution, retries
- Performance - Batching, caching, hot key avoidance
- Watch Patterns - Change notifications, triggers, reactive systems
Hierarchical Key Design
# fdb_layers/key_design.py
"""
FoundationDB key design patterns with tuple encoding
"""
import fdb
from fdb import tuple as fdb_tuple
from typing import List, Optional, Any
from dataclasses import dataclass
from enum import Enum
fdb.api_version(710)
class Subspace:
"""Hierarchical namespace for key organization"""
def __init__(self, prefix_tuple: tuple):
self.prefix = fdb_tuple.pack(prefix_tuple)
def key(self, *elements) -> bytes:
"""Create key in this subspace"""
return self.prefix + fdb_tuple.pack(elements)
def pack(self, tuple_elements: tuple) -> bytes:
"""Pack tuple with prefix"""
return self.prefix + fdb_tuple.pack(tuple_elements)
def unpack(self, key: bytes) -> tuple:
"""Unpack key and remove prefix"""
if not key.startswith(self.prefix):
raise ValueError("Key does not belong to this subspace")
return fdb_tuple.unpack(key[len(self.prefix):])
def range(self, prefix_tuple: tuple = ()) -> fdb.KeySelector:
"""Get range for prefix scan"""
prefix = self.prefix + fdb_tuple.pack(prefix_tuple)
return fdb.KeySelector.first_greater_or_equal(prefix), \
fdb.KeySelector.first_greater_or_equal(prefix + b'\xff')
# Example schema design
class Schema:
"""Application schema with subspaces"""
# Top-level subspaces
USERS = Subspace(('users',))
PROJECTS = Subspace(('projects',))
TASKS = Subspace(('tasks',))
INDEXES = Subspace(('indexes',))
COUNTERS = Subspace(('counters',))
# Secondary indexes
USER_BY_EMAIL = Subspace(('indexes', 'user_by_email'))
TASKS_BY_PROJECT = Subspace(('indexes', 'tasks_by_project'))
TASKS_BY_ASSIGNEE = Subspace(('indexes', 'tasks_by_assignee'))
@dataclass
class User:
id: str
email: str
name: str
created_at: int
class UserRepository:
"""User operations with FoundationDB"""
def __init__(self, db: fdb.Database):
self.db = db
@fdb.transactional
def create(self, tr, user: User) -> None:
"""Create user with secondary index"""
# Primary key: ('users', user_id)
user_key = Schema.USERS.key(user.id)
user_data = fdb_tuple.pack((user.email, user.name, user.created_at))
# Check if user exists
if tr[user_key].present():
raise ValueError(f"User {user.id} already exists")
# Check email uniqueness via index
email_key = Schema.USER_BY_EMAIL.key(user.email)
if tr[email_key].present():
raise ValueError(f"Email {user.email} already exists")
# Write user
tr[user_key] = user_data
# Write secondary index: email -> user_id
tr[email_key] = fdb_tuple.pack((user.id,))
# Increment user count
counter_key = Schema.COUNTERS.key('total_users')
self._increment_counter(tr, counter_key)
@fdb.transactional
def find_by_id(self, tr, user_id: str) -> Optional[User]:
"""Get user by ID"""
user_key = Schema.USERS.key(user_id)
value = tr[user_key]
if not value.present():
return None
email, name, created_at = fdb_tuple.unpack(value)
return User(id=user_id, email=email, name=name, created_at=created_at)
@fdb.transactional
def find_by_email(self, tr, email: str) -> Optional[User]:
"""Get user by email using secondary index"""
# Lookup email index
email_key = Schema.USER_BY_EMAIL.key(email)
value = tr[email_key]
if not value.present():
return None
user_id, = fdb_tuple.unpack(value)
# Get user data
return self.find_by_id(tr, user_id)
@fdb.transactional
def delete(self, tr, user_id: str) -> bool:
"""Delete user and indexes"""
# Get user for email
user = self.find_by_id(tr, user_id)
if not user:
return False
# Delete primary key
user_key = Schema.USERS.key(user_id)
del tr[user_key]
# Delete secondary index
email_key = Schema.USER_BY_EMAIL.key(user.email)
del tr[email_key]
# Decrement counter
counter_key = Schema.COUNTERS.key('total_users')
self._decrement_counter(tr, counter_key)
return True
@fdb.transactional
def list_all(self, tr, limit: int = 100) -> List[User]:
"""List users with pagination"""
users = []
# Range read over users subspace
start, end = Schema.USERS.range()
for key, value in tr.get_range(start, end, limit=limit):
user_id, = Schema.USERS.unpack(key)
email, name, created_at = fdb_tuple.unpack(value)
users.append(User(
id=user_id,
email=email,
name=name,
created_at=created_at
))
return users
def _increment_counter(self, tr, key: bytes, amount: int = 1):
"""Atomic counter increment"""
# Use FoundationDB's atomic add operation
tr.add(key, fdb_tuple.pack((amount,)))
def _decrement_counter(self, tr, key: bytes, amount: int = 1):
"""Atomic counter decrement"""
tr.add(key, fdb_tuple.pack((-amount,)))
Document Layer Implementation
# fdb_layers/document_layer.py
"""
Document layer implementation on FoundationDB
"""
import fdb
from fdb import tuple as fdb_tuple
import json
from typing import Dict, List, Optional, Any
from uuid import uuid4
class DocumentLayer:
"""JSON document storage with indexing"""
def __init__(self, db: fdb.Database, namespace: str):
self.db = db
self.docs = Subspace((namespace, 'docs'))
self.indexes = Subspace((namespace, 'indexes'))
@fdb.transactional
def insert(self, tr, collection: str, document: Dict[str, Any]) -> str:
"""Insert document with auto-generated ID"""
doc_id = str(uuid4())
# Primary key: (collection, doc_id)
doc_key = self.docs.key(collection, doc_id)
# Serialize document
doc_bytes = json.dumps(document).encode('utf-8')
tr[doc_key] = doc_bytes
# Create indexes for indexed fields
self._update_indexes(tr, collection, doc_id, document)
return doc_id
@fdb.transactional
def find_by_id(
self,
tr,
collection: str,
doc_id: str
) -> Optional[Dict[str, Any]]:
"""Get document by ID"""
doc_key = self.docs.key(collection, doc_id)
value = tr[doc_key]
if not value.present():
return None
return json.loads(value.decode('utf-8'))
@fdb.transactional
def find_by_index(
self,
tr,
collection: str,
field: str,
value: Any,
limit: int = 100
) -> List[Dict[str, Any]]:
"""Query documents by indexed field"""
# Index key: (collection, field, value, doc_id)
index_prefix = self.indexes.key(collection, field, value)
start = fdb.KeySelector.first_greater_or_equal(index_prefix)
end = fdb.KeySelector.first_greater_or_equal(index_prefix + b'\xff')
results = []
for key, _ in tr.get_range(start, end, limit=limit):
# Extract doc_id from index key
*_, doc_id = self.indexes.unpack(key)
# Fetch document
doc = self.find_by_id(tr, collection, doc_id)
if doc:
results.append(doc)
return results
@fdb.transactional
def update(
self,
tr,
collection: str,
doc_id: str,
updates: Dict[str, Any]
) -> bool:
"""Update document fields"""
# Get existing document
doc = self.find_by_id(tr, collection, doc_id)
if not doc:
return False
# Remove old indexes
self._remove_indexes(tr, collection, doc_id, doc)
# Apply updates
doc.update(updates)
# Write updated document
doc_key = self.docs.key(collection, doc_id)
doc_bytes = json.dumps(doc).encode('utf-8')
tr[doc_key] = doc_bytes
# Create new indexes
self._update_indexes(tr, collection, doc_id, doc)
return True
@fdb.transactional
def delete(self, tr, collection: str, doc_id: str) -> bool:
"""Delete document and indexes"""
# Get document for index cleanup
doc = self.find_by_id(tr, collection, doc_id)
if not doc:
return False
# Delete document
doc_key = self.docs.key(collection, doc_id)
del tr[doc_key]
# Delete indexes
self._remove_indexes(tr, collection, doc_id, doc)
return True
def _update_indexes(
self,
tr,
collection: str,
doc_id: str,
document: Dict[str, Any],
indexed_fields: List[str] = ['status', 'assignee', 'priority']
):
"""Create indexes for specified fields"""
for field in indexed_fields:
if field in document:
value = document[field]
index_key = self.indexes.key(collection, field, value, doc_id)
tr[index_key] = b'' # Empty value, key is the index
def _remove_indexes(
self,
tr,
collection: str,
doc_id: str,
document: Dict[str, Any],
indexed_fields: List[str] = ['status', 'assignee', 'priority']
):
"""Remove indexes for document"""
for field in indexed_fields:
if field in document:
value = document[field]
index_key = self.indexes.key(collection, field, value, doc_id)
del tr[index_key]
Watch Pattern for Change Notifications
# fdb_layers/watch_pattern.py
"""
Watch pattern for reactive change notifications
"""
import fdb
from fdb import tuple as fdb_tuple
import asyncio
from typing import Callable, Any
class WatchManager:
"""Manage FoundationDB watches for change notifications"""
def __init__(self, db: fdb.Database):
self.db = db
self.watchers = {}
async def watch_key(
self,
key: bytes,
callback: Callable[[bytes, Any], None]
):
"""Watch a key for changes"""
while True:
try:
@fdb.transactional
def get_and_watch(tr):
value = tr[key]
watch = tr.watch(key)
return value, watch
current_value, watch = get_and_watch(self.db)
# Call callback with current value
await callback(key, current_value)
# Wait for change
await watch
except fdb.FDBError as e:
if e.code == 1020: # transaction_too_old
continue
raise
async def watch_range(
self,
start_key: bytes,
end_key: bytes,
callback: Callable[[List[tuple]], None],
poll_interval: int = 5
):
"""Watch a range of keys for changes (polling)"""
last_snapshot = {}
while True:
@fdb.transactional
def get_range(tr):
return {
k: v for k, v in
tr.get_range(start_key, end_key)
}
current_snapshot = get_range(self.db)
# Detect changes
if current_snapshot != last_snapshot:
changes = []
# Find new/updated keys
for k, v in current_snapshot.items():
if k not in last_snapshot or last_snapshot[k] != v:
changes.append(('update', k, v))
# Find deleted keys
for k in last_snapshot:
if k not in current_snapshot:
changes.append(('delete', k, None))
if changes:
await callback(changes)
last_snapshot = current_snapshot
# Wait before next poll
await asyncio.sleep(poll_interval)
# Example usage
async def main():
db = fdb.open()
watch_mgr = WatchManager(db)
async def on_user_change(key, value):
print(f"User changed: {key} = {value}")
user_key = Schema.USERS.key('user-123')
await watch_mgr.watch_key(user_key, on_user_change)
if __name__ == '__main__':
asyncio.run(main())
Performance Optimization Patterns
# fdb_layers/performance.py
"""
FoundationDB performance optimization patterns
"""
import fdb
from fdb import tuple as fdb_tuple
from typing import List, Dict, Any
import time
class PerformancePatterns:
"""Performance best practices for FDB"""
def __init__(self, db: fdb.Database):
self.db = db
@fdb.transactional
def batch_write(self, tr, items: List[tuple]):
"""Batch writes in single transaction"""
# Write all items atomically
for key, value in items:
tr[key] = value
# Limit transaction size to ~1MB
# Split large batches across multiple transactions
@fdb.transactional
def parallel_reads(self, tr, keys: List[bytes]) -> Dict[bytes, Any]:
"""Issue parallel reads"""
# FDB automatically parallelizes multiple get() calls
futures = {key: tr[key] for key in keys}
# Await all futures
return {k: future.value for k, future in futures.items()}
@fdb.transactional
def avoid_hot_keys(self, tr, shard_count: int = 10):
"""Shard counters to avoid hot keys"""
import random
# Instead of single counter key
# Use multiple sharded counters
shard = random.randint(0, shard_count - 1)
counter_key = Schema.COUNTERS.key('total_users', shard)
tr.add(counter_key, fdb_tuple.pack((1,)))
@fdb.transactional
def get_total_count(self, tr, counter_name: str, shard_count: int = 10) -> int:
"""Read sharded counter total"""
total = 0
for shard in range(shard_count):
key = Schema.COUNTERS.key(counter_name, shard)
value = tr[key]
if value.present():
count, = fdb_tuple.unpack(value)
total += count
return total
@fdb.transactional
def use_snapshot_reads(self, tr):
"""Use snapshot reads for better concurrency"""
# Snapshot reads don't conflict with writes
# Good for analytics/reporting queries
users = []
start, end = Schema.USERS.range()
for key, value in tr.snapshot.get_range(start, end):
# Read without adding to conflict range
users.append(value)
return users
Usage Examples
Key Design
Apply foundationdb-patterns skill to design hierarchical key schema with tuple encoding
Document Layer
Apply foundationdb-patterns skill to implement JSON document storage with secondary indexes
Watch Pattern
Apply foundationdb-patterns skill to set up change notifications using watches
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: foundationdb-patterns
FoundationDB Implementation Summary:
- Pattern applied: {key_design|document_layer|watch_pattern|performance_optimization}
- Subspaces created: {N}
- Transactions implemented: {N}
- Indexes created: {N}
Completed:
- [x] Hierarchical key design with tuple encoding
- [x] Subspace organization implemented
- [x] ACID transactions with @fdb.transactional decorator
- [x] Secondary indexes created and maintained
- [x] Atomic counters implemented
- [x] Error handling and retry logic
- [x] Watch patterns for change notifications (if applicable)
- [x] Performance optimizations applied (batching, sharding, snapshot reads)
Outputs:
- Key schema: {description}
- Subspaces: {list}
- Repository methods: {N} CRUD operations
- Indexes: {N} secondary indexes
- Performance: {metric} (e.g., batch size, shard count)
Completion Checklist
Before marking this skill as complete, verify:
- Subspace hierarchy defined with tuple prefixes
- Key design uses tuple encoding (fdb.tuple.pack/unpack)
- All database operations use @fdb.transactional decorator
- Secondary indexes created for query fields
- Index maintenance in create/update/delete operations
- Atomic counters use tr.add() for thread-safe increments
- Range queries use Subspace.range() for prefix scans
- Error handling includes FDB error codes
- Watch patterns implemented for reactive updates (if needed)
- Performance optimizations applied (batching, sharding, snapshot reads)
Failure Indicators
This skill has FAILED if:
- ❌ Keys not using tuple encoding (binary corruption risk)
- ❌ Transactions missing @fdb.transactional decorator
- ❌ Secondary indexes not maintained on updates/deletes
- ❌ Counters using get-modify-set instead of atomic add
- ❌ Range queries not using Subspace for prefix isolation
- ❌ No error handling for FDB-specific errors
- ❌ Hot key contention not addressed (single counter, no sharding)
- ❌ Watch patterns using polling instead of native watches
- ❌ Batch operations exceeding 1MB transaction limit
- ❌ Snapshot reads not used for analytics queries
When NOT to Use
Do NOT use this skill when:
- Simple key-value storage without ACID requirements (use Redis/Memcached)
- Single-machine deployments (SQLite sufficient)
- No need for distributed transactions
- Schema-heavy relational models (use PostgreSQL)
- Document collections without relationships (use MongoDB)
- Time-series data only (use InfluxDB, TimescaleDB)
- Graph traversal primary use case (use Neo4j)
- Full-text search requirements (use Elasticsearch)
Use alternatives instead:
- PostgreSQL for relational schemas with complex joins
- MongoDB for document storage without ACID across collections
- Redis for caching and simple key-value operations
- Specialized databases for domain-specific needs
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Using strings for keys | Binary corruption, no ordering | Always use tuple encoding |
| Missing @fdb.transactional | No automatic retry, not ACID | Decorate all FDB operations |
| Orphaned indexes | Stale data, query inconsistency | Update/delete indexes with data |
| Get-modify-set counters | Race conditions, lost increments | Use atomic tr.add() |
| No subspace isolation | Key collisions, namespace pollution | Always use Subspace prefixes |
| Ignoring FDB error codes | Silent failures, data corruption | Handle specific error codes |
| Single counter hot key | Write contention, throughput limit | Shard counters across keys |
| Polling for changes | High latency, wasted resources | Use native watch() API |
| Large transaction payloads | Transaction failures, slow commits | Batch under 1MB, split if needed |
| Transactional reads for analytics | Write conflicts, poor concurrency | Use snapshot reads for queries |
Principles
This skill embodies:
- #2 First Principles - Understand tuple encoding, subspaces, ACID before implementing
- #3 Keep It Simple - Use FDB primitives (subspace, tuple, @transactional) correctly
- #4 Separation of Concerns - Schema (subspaces) separate from access (repositories)
- #5 Eliminate Ambiguity - Explicit tuple schemas, clear key hierarchies
- #8 No Assumptions - Always check key.present() before unpack
- #11 Inform → Do → Verify - Validate schema, implement, verify with get()
FoundationDB Pattern Principles:
- Always use tuple encoding for keys (never raw strings/bytes)
- Organize keys into hierarchical Subspaces
- Use @fdb.transactional decorator for all database operations
- Maintain secondary indexes atomically with data changes
- Use atomic operations (tr.add) for counters
- Handle FDB-specific error codes (1020 transaction_too_old, etc.)
- Shard hot keys to distribute write load
- Use snapshot reads for analytics to avoid write conflicts
- Keep transactions under 1MB payload
- Leverage watch() API for reactive change notifications
Key Design Hierarchy:
/{tenant_id}/{entity_type}/{entity_id}/{attribute}
Performance Optimization:
- Batch writes in single transaction (atomic, faster)
- Parallel reads with multiple get() calls
- Shard counters to avoid hot keys
- Snapshot reads for analytics (no write conflicts)
Full Standard: CODITECT-STANDARD-AUTOMATION.md
Integration Points
- database-design-patterns - Schema design principles
- data-engineering-patterns - Data pipelines
- multi-tenant-architecture - Tenant isolation