Skip to main content

Agent Skills Framework Extension

FoundationDB Patterns Skill

When to Use This Skill

Use this skill when implementing foundationdb patterns patterns in your codebase.

How to Use This Skill

  1. Review the patterns and examples below
  2. Apply the relevant patterns to your implementation
  3. Follow the best practices outlined in this skill

Production FoundationDB patterns for distributed ACID transactions, key design, and layer implementations.

Core Capabilities

  1. Key Design - Hierarchical keys, tuple encoding, range queries
  2. Layer Patterns - Document, table, queue implementations
  3. Transactions - ACID guarantees, conflict resolution, retries
  4. Performance - Batching, caching, hot key avoidance
  5. Watch Patterns - Change notifications, triggers, reactive systems

Hierarchical Key Design

# fdb_layers/key_design.py
"""
FoundationDB key design patterns with tuple encoding
"""
import fdb
from fdb import tuple as fdb_tuple
from typing import List, Optional, Any
from dataclasses import dataclass
from enum import Enum

fdb.api_version(710)

class Subspace:
"""Hierarchical namespace for key organization"""

def __init__(self, prefix_tuple: tuple):
self.prefix = fdb_tuple.pack(prefix_tuple)

def key(self, *elements) -> bytes:
"""Create key in this subspace"""
return self.prefix + fdb_tuple.pack(elements)

def pack(self, tuple_elements: tuple) -> bytes:
"""Pack tuple with prefix"""
return self.prefix + fdb_tuple.pack(tuple_elements)

def unpack(self, key: bytes) -> tuple:
"""Unpack key and remove prefix"""
if not key.startswith(self.prefix):
raise ValueError("Key does not belong to this subspace")
return fdb_tuple.unpack(key[len(self.prefix):])

def range(self, prefix_tuple: tuple = ()) -> fdb.KeySelector:
"""Get range for prefix scan"""
prefix = self.prefix + fdb_tuple.pack(prefix_tuple)
return fdb.KeySelector.first_greater_or_equal(prefix), \
fdb.KeySelector.first_greater_or_equal(prefix + b'\xff')

# Example schema design
class Schema:
"""Application schema with subspaces"""

# Top-level subspaces
USERS = Subspace(('users',))
PROJECTS = Subspace(('projects',))
TASKS = Subspace(('tasks',))
INDEXES = Subspace(('indexes',))
COUNTERS = Subspace(('counters',))

# Secondary indexes
USER_BY_EMAIL = Subspace(('indexes', 'user_by_email'))
TASKS_BY_PROJECT = Subspace(('indexes', 'tasks_by_project'))
TASKS_BY_ASSIGNEE = Subspace(('indexes', 'tasks_by_assignee'))

@dataclass
class User:
id: str
email: str
name: str
created_at: int

class UserRepository:
"""User operations with FoundationDB"""

def __init__(self, db: fdb.Database):
self.db = db

@fdb.transactional
def create(self, tr, user: User) -> None:
"""Create user with secondary index"""

# Primary key: ('users', user_id)
user_key = Schema.USERS.key(user.id)
user_data = fdb_tuple.pack((user.email, user.name, user.created_at))

# Check if user exists
if tr[user_key].present():
raise ValueError(f"User {user.id} already exists")

# Check email uniqueness via index
email_key = Schema.USER_BY_EMAIL.key(user.email)
if tr[email_key].present():
raise ValueError(f"Email {user.email} already exists")

# Write user
tr[user_key] = user_data

# Write secondary index: email -> user_id
tr[email_key] = fdb_tuple.pack((user.id,))

# Increment user count
counter_key = Schema.COUNTERS.key('total_users')
self._increment_counter(tr, counter_key)

@fdb.transactional
def find_by_id(self, tr, user_id: str) -> Optional[User]:
"""Get user by ID"""
user_key = Schema.USERS.key(user_id)
value = tr[user_key]

if not value.present():
return None

email, name, created_at = fdb_tuple.unpack(value)
return User(id=user_id, email=email, name=name, created_at=created_at)

@fdb.transactional
def find_by_email(self, tr, email: str) -> Optional[User]:
"""Get user by email using secondary index"""

# Lookup email index
email_key = Schema.USER_BY_EMAIL.key(email)
value = tr[email_key]

if not value.present():
return None

user_id, = fdb_tuple.unpack(value)

# Get user data
return self.find_by_id(tr, user_id)

@fdb.transactional
def delete(self, tr, user_id: str) -> bool:
"""Delete user and indexes"""

# Get user for email
user = self.find_by_id(tr, user_id)
if not user:
return False

# Delete primary key
user_key = Schema.USERS.key(user_id)
del tr[user_key]

# Delete secondary index
email_key = Schema.USER_BY_EMAIL.key(user.email)
del tr[email_key]

# Decrement counter
counter_key = Schema.COUNTERS.key('total_users')
self._decrement_counter(tr, counter_key)

return True

@fdb.transactional
def list_all(self, tr, limit: int = 100) -> List[User]:
"""List users with pagination"""
users = []

# Range read over users subspace
start, end = Schema.USERS.range()

for key, value in tr.get_range(start, end, limit=limit):
user_id, = Schema.USERS.unpack(key)
email, name, created_at = fdb_tuple.unpack(value)

users.append(User(
id=user_id,
email=email,
name=name,
created_at=created_at
))

return users

def _increment_counter(self, tr, key: bytes, amount: int = 1):
"""Atomic counter increment"""
# Use FoundationDB's atomic add operation
tr.add(key, fdb_tuple.pack((amount,)))

def _decrement_counter(self, tr, key: bytes, amount: int = 1):
"""Atomic counter decrement"""
tr.add(key, fdb_tuple.pack((-amount,)))

Document Layer Implementation

# fdb_layers/document_layer.py
"""
Document layer implementation on FoundationDB
"""
import fdb
from fdb import tuple as fdb_tuple
import json
from typing import Dict, List, Optional, Any
from uuid import uuid4

class DocumentLayer:
"""JSON document storage with indexing"""

def __init__(self, db: fdb.Database, namespace: str):
self.db = db
self.docs = Subspace((namespace, 'docs'))
self.indexes = Subspace((namespace, 'indexes'))

@fdb.transactional
def insert(self, tr, collection: str, document: Dict[str, Any]) -> str:
"""Insert document with auto-generated ID"""

doc_id = str(uuid4())

# Primary key: (collection, doc_id)
doc_key = self.docs.key(collection, doc_id)

# Serialize document
doc_bytes = json.dumps(document).encode('utf-8')
tr[doc_key] = doc_bytes

# Create indexes for indexed fields
self._update_indexes(tr, collection, doc_id, document)

return doc_id

@fdb.transactional
def find_by_id(
self,
tr,
collection: str,
doc_id: str
) -> Optional[Dict[str, Any]]:
"""Get document by ID"""

doc_key = self.docs.key(collection, doc_id)
value = tr[doc_key]

if not value.present():
return None

return json.loads(value.decode('utf-8'))

@fdb.transactional
def find_by_index(
self,
tr,
collection: str,
field: str,
value: Any,
limit: int = 100
) -> List[Dict[str, Any]]:
"""Query documents by indexed field"""

# Index key: (collection, field, value, doc_id)
index_prefix = self.indexes.key(collection, field, value)
start = fdb.KeySelector.first_greater_or_equal(index_prefix)
end = fdb.KeySelector.first_greater_or_equal(index_prefix + b'\xff')

results = []
for key, _ in tr.get_range(start, end, limit=limit):
# Extract doc_id from index key
*_, doc_id = self.indexes.unpack(key)

# Fetch document
doc = self.find_by_id(tr, collection, doc_id)
if doc:
results.append(doc)

return results

@fdb.transactional
def update(
self,
tr,
collection: str,
doc_id: str,
updates: Dict[str, Any]
) -> bool:
"""Update document fields"""

# Get existing document
doc = self.find_by_id(tr, collection, doc_id)
if not doc:
return False

# Remove old indexes
self._remove_indexes(tr, collection, doc_id, doc)

# Apply updates
doc.update(updates)

# Write updated document
doc_key = self.docs.key(collection, doc_id)
doc_bytes = json.dumps(doc).encode('utf-8')
tr[doc_key] = doc_bytes

# Create new indexes
self._update_indexes(tr, collection, doc_id, doc)

return True

@fdb.transactional
def delete(self, tr, collection: str, doc_id: str) -> bool:
"""Delete document and indexes"""

# Get document for index cleanup
doc = self.find_by_id(tr, collection, doc_id)
if not doc:
return False

# Delete document
doc_key = self.docs.key(collection, doc_id)
del tr[doc_key]

# Delete indexes
self._remove_indexes(tr, collection, doc_id, doc)

return True

def _update_indexes(
self,
tr,
collection: str,
doc_id: str,
document: Dict[str, Any],
indexed_fields: List[str] = ['status', 'assignee', 'priority']
):
"""Create indexes for specified fields"""
for field in indexed_fields:
if field in document:
value = document[field]
index_key = self.indexes.key(collection, field, value, doc_id)
tr[index_key] = b'' # Empty value, key is the index

def _remove_indexes(
self,
tr,
collection: str,
doc_id: str,
document: Dict[str, Any],
indexed_fields: List[str] = ['status', 'assignee', 'priority']
):
"""Remove indexes for document"""
for field in indexed_fields:
if field in document:
value = document[field]
index_key = self.indexes.key(collection, field, value, doc_id)
del tr[index_key]

Watch Pattern for Change Notifications

# fdb_layers/watch_pattern.py
"""
Watch pattern for reactive change notifications
"""
import fdb
from fdb import tuple as fdb_tuple
import asyncio
from typing import Callable, Any

class WatchManager:
"""Manage FoundationDB watches for change notifications"""

def __init__(self, db: fdb.Database):
self.db = db
self.watchers = {}

async def watch_key(
self,
key: bytes,
callback: Callable[[bytes, Any], None]
):
"""Watch a key for changes"""

while True:
try:
@fdb.transactional
def get_and_watch(tr):
value = tr[key]
watch = tr.watch(key)
return value, watch

current_value, watch = get_and_watch(self.db)

# Call callback with current value
await callback(key, current_value)

# Wait for change
await watch

except fdb.FDBError as e:
if e.code == 1020: # transaction_too_old
continue
raise

async def watch_range(
self,
start_key: bytes,
end_key: bytes,
callback: Callable[[List[tuple]], None],
poll_interval: int = 5
):
"""Watch a range of keys for changes (polling)"""

last_snapshot = {}

while True:
@fdb.transactional
def get_range(tr):
return {
k: v for k, v in
tr.get_range(start_key, end_key)
}

current_snapshot = get_range(self.db)

# Detect changes
if current_snapshot != last_snapshot:
changes = []

# Find new/updated keys
for k, v in current_snapshot.items():
if k not in last_snapshot or last_snapshot[k] != v:
changes.append(('update', k, v))

# Find deleted keys
for k in last_snapshot:
if k not in current_snapshot:
changes.append(('delete', k, None))

if changes:
await callback(changes)

last_snapshot = current_snapshot

# Wait before next poll
await asyncio.sleep(poll_interval)

# Example usage
async def main():
db = fdb.open()

watch_mgr = WatchManager(db)

async def on_user_change(key, value):
print(f"User changed: {key} = {value}")

user_key = Schema.USERS.key('user-123')
await watch_mgr.watch_key(user_key, on_user_change)

if __name__ == '__main__':
asyncio.run(main())

Performance Optimization Patterns

# fdb_layers/performance.py
"""
FoundationDB performance optimization patterns
"""
import fdb
from fdb import tuple as fdb_tuple
from typing import List, Dict, Any
import time

class PerformancePatterns:
"""Performance best practices for FDB"""

def __init__(self, db: fdb.Database):
self.db = db

@fdb.transactional
def batch_write(self, tr, items: List[tuple]):
"""Batch writes in single transaction"""

# Write all items atomically
for key, value in items:
tr[key] = value

# Limit transaction size to ~1MB
# Split large batches across multiple transactions

@fdb.transactional
def parallel_reads(self, tr, keys: List[bytes]) -> Dict[bytes, Any]:
"""Issue parallel reads"""

# FDB automatically parallelizes multiple get() calls
futures = {key: tr[key] for key in keys}

# Await all futures
return {k: future.value for k, future in futures.items()}

@fdb.transactional
def avoid_hot_keys(self, tr, shard_count: int = 10):
"""Shard counters to avoid hot keys"""

import random

# Instead of single counter key
# Use multiple sharded counters

shard = random.randint(0, shard_count - 1)
counter_key = Schema.COUNTERS.key('total_users', shard)

tr.add(counter_key, fdb_tuple.pack((1,)))

@fdb.transactional
def get_total_count(self, tr, counter_name: str, shard_count: int = 10) -> int:
"""Read sharded counter total"""

total = 0
for shard in range(shard_count):
key = Schema.COUNTERS.key(counter_name, shard)
value = tr[key]

if value.present():
count, = fdb_tuple.unpack(value)
total += count

return total

@fdb.transactional
def use_snapshot_reads(self, tr):
"""Use snapshot reads for better concurrency"""

# Snapshot reads don't conflict with writes
# Good for analytics/reporting queries

users = []
start, end = Schema.USERS.range()

for key, value in tr.snapshot.get_range(start, end):
# Read without adding to conflict range
users.append(value)

return users

Usage Examples

Key Design

Apply foundationdb-patterns skill to design hierarchical key schema with tuple encoding

Document Layer

Apply foundationdb-patterns skill to implement JSON document storage with secondary indexes

Watch Pattern

Apply foundationdb-patterns skill to set up change notifications using watches

Success Output

When successful, this skill MUST output:

✅ SKILL COMPLETE: foundationdb-patterns

FoundationDB Implementation Summary:
- Pattern applied: {key_design|document_layer|watch_pattern|performance_optimization}
- Subspaces created: {N}
- Transactions implemented: {N}
- Indexes created: {N}

Completed:
- [x] Hierarchical key design with tuple encoding
- [x] Subspace organization implemented
- [x] ACID transactions with @fdb.transactional decorator
- [x] Secondary indexes created and maintained
- [x] Atomic counters implemented
- [x] Error handling and retry logic
- [x] Watch patterns for change notifications (if applicable)
- [x] Performance optimizations applied (batching, sharding, snapshot reads)

Outputs:
- Key schema: {description}
- Subspaces: {list}
- Repository methods: {N} CRUD operations
- Indexes: {N} secondary indexes
- Performance: {metric} (e.g., batch size, shard count)

Completion Checklist

Before marking this skill as complete, verify:

  • Subspace hierarchy defined with tuple prefixes
  • Key design uses tuple encoding (fdb.tuple.pack/unpack)
  • All database operations use @fdb.transactional decorator
  • Secondary indexes created for query fields
  • Index maintenance in create/update/delete operations
  • Atomic counters use tr.add() for thread-safe increments
  • Range queries use Subspace.range() for prefix scans
  • Error handling includes FDB error codes
  • Watch patterns implemented for reactive updates (if needed)
  • Performance optimizations applied (batching, sharding, snapshot reads)

Failure Indicators

This skill has FAILED if:

  • ❌ Keys not using tuple encoding (binary corruption risk)
  • ❌ Transactions missing @fdb.transactional decorator
  • ❌ Secondary indexes not maintained on updates/deletes
  • ❌ Counters using get-modify-set instead of atomic add
  • ❌ Range queries not using Subspace for prefix isolation
  • ❌ No error handling for FDB-specific errors
  • ❌ Hot key contention not addressed (single counter, no sharding)
  • ❌ Watch patterns using polling instead of native watches
  • ❌ Batch operations exceeding 1MB transaction limit
  • ❌ Snapshot reads not used for analytics queries

When NOT to Use

Do NOT use this skill when:

  • Simple key-value storage without ACID requirements (use Redis/Memcached)
  • Single-machine deployments (SQLite sufficient)
  • No need for distributed transactions
  • Schema-heavy relational models (use PostgreSQL)
  • Document collections without relationships (use MongoDB)
  • Time-series data only (use InfluxDB, TimescaleDB)
  • Graph traversal primary use case (use Neo4j)
  • Full-text search requirements (use Elasticsearch)

Use alternatives instead:

  • PostgreSQL for relational schemas with complex joins
  • MongoDB for document storage without ACID across collections
  • Redis for caching and simple key-value operations
  • Specialized databases for domain-specific needs

Anti-Patterns (Avoid)

Anti-PatternProblemSolution
Using strings for keysBinary corruption, no orderingAlways use tuple encoding
Missing @fdb.transactionalNo automatic retry, not ACIDDecorate all FDB operations
Orphaned indexesStale data, query inconsistencyUpdate/delete indexes with data
Get-modify-set countersRace conditions, lost incrementsUse atomic tr.add()
No subspace isolationKey collisions, namespace pollutionAlways use Subspace prefixes
Ignoring FDB error codesSilent failures, data corruptionHandle specific error codes
Single counter hot keyWrite contention, throughput limitShard counters across keys
Polling for changesHigh latency, wasted resourcesUse native watch() API
Large transaction payloadsTransaction failures, slow commitsBatch under 1MB, split if needed
Transactional reads for analyticsWrite conflicts, poor concurrencyUse snapshot reads for queries

Principles

This skill embodies:

  • #2 First Principles - Understand tuple encoding, subspaces, ACID before implementing
  • #3 Keep It Simple - Use FDB primitives (subspace, tuple, @transactional) correctly
  • #4 Separation of Concerns - Schema (subspaces) separate from access (repositories)
  • #5 Eliminate Ambiguity - Explicit tuple schemas, clear key hierarchies
  • #8 No Assumptions - Always check key.present() before unpack
  • #11 Inform → Do → Verify - Validate schema, implement, verify with get()

FoundationDB Pattern Principles:

  1. Always use tuple encoding for keys (never raw strings/bytes)
  2. Organize keys into hierarchical Subspaces
  3. Use @fdb.transactional decorator for all database operations
  4. Maintain secondary indexes atomically with data changes
  5. Use atomic operations (tr.add) for counters
  6. Handle FDB-specific error codes (1020 transaction_too_old, etc.)
  7. Shard hot keys to distribute write load
  8. Use snapshot reads for analytics to avoid write conflicts
  9. Keep transactions under 1MB payload
  10. Leverage watch() API for reactive change notifications

Key Design Hierarchy:

/{tenant_id}/{entity_type}/{entity_id}/{attribute}

Performance Optimization:

  • Batch writes in single transaction (atomic, faster)
  • Parallel reads with multiple get() calls
  • Shard counters to avoid hot keys
  • Snapshot reads for analytics (no write conflicts)

Full Standard: CODITECT-STANDARD-AUTOMATION.md


Integration Points

  • database-design-patterns - Schema design principles
  • data-engineering-patterns - Data pipelines
  • multi-tenant-architecture - Tenant isolation