ADR-030: Hierarchical Knowledge Store
Status
PROPOSED
Date
2026-01-15
Context
After corpus processing, results must be stored in a way that supports:
- Multi-granularity retrieval: Query at summary level OR drill down to source
- Incremental updates: Add new documents without full reprocessing
- Compliance requirements: Audit trail, access control, versioning
- Cross-corpus queries: Search across multiple analysis runs
The "unlimited memory" video stores results in flat markdown files—insufficient for enterprise needs.
Requirements
| Requirement | Priority | Rationale |
|---|---|---|
| Multi-level hierarchy (raw → chunk → section → master) | P0 | Query flexibility |
| Sub-100ms retrieval at any level | P0 | Interactive use |
| Full audit trail per level | P0 | Compliance |
| Atomic updates across levels | P1 | Consistency |
| Cross-reference between levels | P1 | Drill-down capability |
Decision
Implement a Hierarchical Knowledge Store backed by FoundationDB with the following structure:
Data Model
┌─────────────────────────────────────────────────────────────────────┐
│ HIERARCHICAL KNOWLEDGE STORE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ LEVEL 3: MASTER │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ corpus_id: UUID │ │
│ │ master_summary: Text (500-2000 tokens) │ │
│ │ key_findings: List[Finding] │ │
│ │ metadata: CorpusMetadata │ │
│ │ created_at: Timestamp │ │
│ │ child_refs: List[SectionRef] │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 1:N │
│ ▼ │
│ LEVEL 2: SECTIONS │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ section_id: UUID │ │
│ │ parent_ref: MasterRef │ │
│ │ section_summary: Text (1000-5000 tokens) │ │
│ │ category: String │ │
│ │ entities: List[Entity] │ │
│ │ child_refs: List[ChunkRef] │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 1:N │
│ ▼ │
│ LEVEL 1: CHUNKS │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ chunk_id: UUID │ │
│ │ parent_ref: SectionRef │ │
│ │ chunk_summary: Text (500-2000 tokens) │ │
│ │ key_quotes: List[Quote] │ │
│ │ entities: List[Entity] │ │
│ │ embedding: Vector[1536] │ │
│ │ child_refs: List[RawRef] │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 1:N │
│ ▼ │
│ LEVEL 0: RAW │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ raw_id: UUID │ │
│ │ parent_ref: ChunkRef │ │
│ │ document_id: UUID │ │
│ │ content: Text (original) │ │
│ │ span: {start: int, end: int} │ │
│ │ preprocessing_applied: PreprocessingRecord │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
FoundationDB Directory Structure
# Directory layout in FoundationDB
knowledge_store = fdb.directory.create_or_open(('coditect', 'knowledge'))
directories = {
'master': knowledge_store.create_or_open(('master',)),
'sections': knowledge_store.create_or_open(('sections',)),
'chunks': knowledge_store.create_or_open(('chunks',)),
'raw': knowledge_store.create_or_open(('raw',)),
'indexes': knowledge_store.create_or_open(('indexes',)),
'audit': knowledge_store.create_or_open(('audit',)),
}
# Key structure
# master/ {corpus_id} -> MasterRecord
# sections/ {corpus_id}/{section_id} -> SectionRecord
# chunks/ {corpus_id}/{section_id}/{chunk_id} -> ChunkRecord
# raw/ {corpus_id}/{document_id}/{span_start} -> RawRecord
# indexes/ entity/{entity_type}/{entity_value} -> List[ChunkRef]
# indexes/ embedding/{chunk_id} -> Vector
# audit/ {corpus_id}/{timestamp} -> AuditRecord
Core Implementation
@dataclass
class HierarchyLevel(Enum):
MASTER = 3
SECTION = 2
CHUNK = 1
RAW = 0
@dataclass
class KnowledgeNode:
"""Base class for all hierarchy levels"""
id: UUID
corpus_id: UUID
level: HierarchyLevel
parent_ref: Optional[UUID]
child_refs: List[UUID]
created_at: datetime
created_by: str # For audit
version: int
@dataclass
class MasterNode(KnowledgeNode):
"""Top-level corpus summary"""
level: HierarchyLevel = HierarchyLevel.MASTER
summary: str
key_findings: List[Finding]
statistics: CorpusStatistics
processing_metadata: ProcessingMetadata
@dataclass
class SectionNode(KnowledgeNode):
"""Section-level grouping (by category, topic, etc.)"""
level: HierarchyLevel = HierarchyLevel.SECTION
category: str
summary: str
entities: List[Entity]
document_count: int
@dataclass
class ChunkNode(KnowledgeNode):
"""Chunk-level with embeddings for RAG"""
level: HierarchyLevel = HierarchyLevel.CHUNK
summary: str
key_quotes: List[Quote]
entities: List[Entity]
embedding: List[float] # 1536-dim for ada-002 / 3072 for text-embedding-3-large
source_document: UUID
@dataclass
class RawNode(KnowledgeNode):
"""Original text spans"""
level: HierarchyLevel = HierarchyLevel.RAW
content: str
document_id: UUID
span_start: int
span_end: int
preprocessing_record: PreprocessingRecord
class HierarchicalKnowledgeStore:
"""Main store interface"""
def __init__(self, fdb_cluster: str):
self.db = fdb.open(fdb_cluster)
self._init_directories()
self.vector_index = VectorIndex() # For embedding search
@fdb.transactional
def store_hierarchy(
self,
tr: fdb.Transaction,
corpus_id: UUID,
master: MasterNode,
sections: List[SectionNode],
chunks: List[ChunkNode],
raw_nodes: List[RawNode]
) -> None:
"""Atomically store entire hierarchy"""
# Store master
master_key = self.dirs['master'][str(corpus_id)]
tr[master_key] = self._serialize(master)
# Store sections
for section in sections:
section_key = self.dirs['sections'][str(corpus_id)][str(section.id)]
tr[section_key] = self._serialize(section)
# Store chunks with embeddings
for chunk in chunks:
chunk_key = self.dirs['chunks'][str(corpus_id)][str(chunk.parent_ref)][str(chunk.id)]
tr[chunk_key] = self._serialize(chunk)
# Index embedding
self._index_embedding(tr, chunk.id, chunk.embedding)
# Index entities
for entity in chunk.entities:
self._index_entity(tr, entity, chunk.id)
# Store raw
for raw in raw_nodes:
raw_key = self.dirs['raw'][str(corpus_id)][str(raw.document_id)][str(raw.span_start)]
tr[raw_key] = self._serialize(raw)
# Create audit record
self._create_audit_record(tr, corpus_id, "HIERARCHY_CREATED", {
'sections': len(sections),
'chunks': len(chunks),
'raw_nodes': len(raw_nodes)
})
async def query_at_level(
self,
corpus_id: UUID,
level: HierarchyLevel,
query: Optional[str] = None,
filters: Optional[QueryFilters] = None
) -> List[KnowledgeNode]:
"""Query knowledge at specific hierarchy level"""
if level == HierarchyLevel.MASTER:
return [await self._get_master(corpus_id)]
elif level == HierarchyLevel.SECTION:
sections = await self._get_sections(corpus_id)
if filters and filters.category:
sections = [s for s in sections if s.category == filters.category]
return sections
elif level == HierarchyLevel.CHUNK:
if query:
# Semantic search using embeddings
return await self._semantic_search(corpus_id, query, filters)
else:
return await self._get_all_chunks(corpus_id, filters)
else: # RAW
return await self._get_raw_for_chunks(filters.chunk_ids)
async def drill_down(
self,
node_id: UUID,
target_level: HierarchyLevel
) -> List[KnowledgeNode]:
"""Navigate from higher level to lower level"""
node = await self._get_node(node_id)
if node.level.value <= target_level.value:
raise ValueError(f"Cannot drill down from {node.level} to {target_level}")
# Recursive descent
if node.level.value - target_level.value == 1:
# Direct children
return await self._get_children(node)
else:
# Multi-level descent
children = await self._get_children(node)
results = []
for child in children:
results.extend(await self.drill_down(child.id, target_level))
return results
async def roll_up(
self,
node_ids: List[UUID],
target_level: HierarchyLevel
) -> KnowledgeNode:
"""Navigate from lower level to higher level"""
# Find common ancestor at target level
ancestors = set()
for node_id in node_ids:
node = await self._get_node(node_id)
ancestor = await self._find_ancestor_at_level(node, target_level)
ancestors.add(ancestor.id)
if len(ancestors) == 1:
return await self._get_node(ancestors.pop())
else:
# Multiple ancestors - return master
return await self._get_master(node.corpus_id)
async def _semantic_search(
self,
corpus_id: UUID,
query: str,
filters: Optional[QueryFilters],
top_k: int = 10
) -> List[ChunkNode]:
"""Search chunks by semantic similarity"""
# Embed query
query_embedding = await self._embed(query)
# Search vector index
chunk_ids = await self.vector_index.search(
query_embedding,
corpus_filter=corpus_id,
top_k=top_k
)
# Load chunks
chunks = [await self._get_chunk(cid) for cid in chunk_ids]
# Apply additional filters
if filters:
if filters.entity_types:
chunks = [c for c in chunks if self._has_entity_type(c, filters.entity_types)]
if filters.categories:
chunks = [c for c in chunks if c.category in filters.categories]
return chunks
Incremental Updates
class HierarchyUpdater:
"""Handle incremental updates to knowledge hierarchy"""
async def add_documents(
self,
corpus_id: UUID,
new_documents: List[Document]
) -> UpdateResult:
"""Add new documents to existing corpus"""
# Process new documents
new_chunks = await self.processor.process_documents(new_documents)
# Determine affected sections
affected_sections = self._identify_affected_sections(new_chunks)
# Update hierarchy bottom-up
async with self.store.transaction() as tr:
# Add new raw nodes
for doc in new_documents:
await self._add_raw_nodes(tr, corpus_id, doc)
# Add new chunks
for chunk in new_chunks:
await self._add_chunk(tr, corpus_id, chunk)
# Regenerate affected sections
for section_id in affected_sections:
await self._regenerate_section(tr, corpus_id, section_id)
# Regenerate master
await self._regenerate_master(tr, corpus_id)
# Audit
await self._audit_update(tr, corpus_id, new_documents)
return UpdateResult(
documents_added=len(new_documents),
chunks_added=len(new_chunks),
sections_updated=len(affected_sections)
)
async def remove_documents(
self,
corpus_id: UUID,
document_ids: List[UUID]
) -> UpdateResult:
"""Remove documents and update hierarchy"""
# Find all affected nodes
affected = await self._find_affected_nodes(corpus_id, document_ids)
async with self.store.transaction() as tr:
# Remove raw nodes
for raw_id in affected.raw_ids:
await self._remove_raw(tr, raw_id)
# Remove chunks (or regenerate if partially affected)
for chunk_id in affected.chunk_ids:
if await self._chunk_fully_removed(chunk_id, document_ids):
await self._remove_chunk(tr, chunk_id)
else:
await self._regenerate_chunk(tr, chunk_id)
# Regenerate sections
for section_id in affected.section_ids:
await self._regenerate_section(tr, corpus_id, section_id)
# Regenerate master
await self._regenerate_master(tr, corpus_id)
return UpdateResult(documents_removed=len(document_ids))
Compliance Integration
@dataclass
class KnowledgeAuditRecord:
"""21 CFR Part 11 compliant audit record"""
record_id: UUID
corpus_id: UUID
action: Literal["CREATE", "UPDATE", "DELETE", "ACCESS", "EXPORT"]
# What changed
affected_levels: List[HierarchyLevel]
affected_node_ids: List[UUID]
# Who/when
operator_id: str
timestamp: datetime
# Electronic signature
signature_hash: str
signature_meaning: str
# Before/after for updates
before_snapshot: Optional[Dict]
after_snapshot: Optional[Dict]
# Access context
access_reason: Optional[str]
query_text: Optional[str]
class ComplianceAwareStore(HierarchicalKnowledgeStore):
"""Knowledge store with compliance features"""
async def query_with_audit(
self,
corpus_id: UUID,
level: HierarchyLevel,
query: str,
user_context: UserContext,
access_reason: str
) -> Tuple[List[KnowledgeNode], UUID]:
"""Query with automatic audit trail"""
# Check access permissions
await self._check_access(user_context, corpus_id, level)
# Execute query
results = await self.query_at_level(corpus_id, level, query)
# Create audit record
audit_id = await self._create_audit(
corpus_id=corpus_id,
action="ACCESS",
affected_levels=[level],
affected_node_ids=[r.id for r in results],
operator_id=user_context.user_id,
access_reason=access_reason,
query_text=query
)
return results, audit_id
async def export_with_signature(
self,
corpus_id: UUID,
user_context: UserContext,
export_format: str
) -> SignedExport:
"""Export with electronic signature"""
# Get full hierarchy
hierarchy = await self._get_full_hierarchy(corpus_id)
# Format export
export_data = self._format_export(hierarchy, export_format)
# Sign
signature = await self._sign_export(
data=export_data,
signer_id=user_context.user_id,
meaning="APPROVED_FOR_EXPORT"
)
# Audit
await self._create_audit(
corpus_id=corpus_id,
action="EXPORT",
operator_id=user_context.user_id,
signature_hash=signature.hash
)
return SignedExport(
data=export_data,
signature=signature,
audit_trail=await self._get_audit_trail(corpus_id)
)
Query Interface
hierarchy_commands:
- command: "@knowledge:query"
description: Query knowledge hierarchy
parameters:
- corpus_id: string (required)
- level: enum[master, section, chunk, raw] (default: chunk)
- query: string (optional, enables semantic search)
- filters: object (optional)
- access_reason: string (required for compliance)
- command: "@knowledge:drill"
description: Drill down from node to lower level
parameters:
- node_id: string (required)
- target_level: enum[section, chunk, raw]
- command: "@knowledge:rollup"
description: Roll up from nodes to higher level
parameters:
- node_ids: List[string] (required)
- target_level: enum[master, section]
- command: "@knowledge:update"
description: Add or remove documents from corpus
parameters:
- corpus_id: string (required)
- add_documents: List[string] (optional)
- remove_documents: List[string] (optional)
- command: "@knowledge:export"
description: Export hierarchy with signature
parameters:
- corpus_id: string (required)
- format: enum[json, markdown, pdf]
- include_audit: bool (default: true)
Consequences
Positive
- Multi-granularity access: Query at any level, drill down/roll up as needed
- Efficient retrieval: O(1) for level access, O(log n) for semantic search
- Incremental updates: Add documents without full reprocessing
- Full compliance: Audit trail, signatures, access control
- Cross-reference: Navigate between levels seamlessly
Negative
- Storage overhead: 3-5x raw storage for hierarchy + indexes
- Consistency complexity: Must maintain parent-child relationships
- Regeneration cost: Updates require partial hierarchy regeneration
Storage Projections
| Corpus Size | Raw Storage | Hierarchy Storage | Index Storage | Total |
|---|---|---|---|---|
| 100 docs | 10 MB | 15 MB | 5 MB | 30 MB |
| 1,000 docs | 100 MB | 150 MB | 50 MB | 300 MB |
| 10,000 docs | 1 GB | 1.5 GB | 500 MB | 3 GB |
References
- ADR-027: Corpus Processing Subsystem Architecture
- ADR-001: FoundationDB as Core Database
- ADR-022: Compliance Framework (21 CFR Part 11)
- Hierarchical Summarization - Anthropic
- Pieces.app Hierarchical Memory
Approval
| Role | Name | Date | Decision |
|---|---|---|---|
| CTO | Hal Casteel | ||
| Data Lead |