Skip to main content

ADR-030: Hierarchical Knowledge Store

Status

PROPOSED

Date

2026-01-15

Context

After corpus processing, results must be stored in a way that supports:

  1. Multi-granularity retrieval: Query at summary level OR drill down to source
  2. Incremental updates: Add new documents without full reprocessing
  3. Compliance requirements: Audit trail, access control, versioning
  4. Cross-corpus queries: Search across multiple analysis runs

The "unlimited memory" video stores results in flat markdown files—insufficient for enterprise needs.

Requirements

RequirementPriorityRationale
Multi-level hierarchy (raw → chunk → section → master)P0Query flexibility
Sub-100ms retrieval at any levelP0Interactive use
Full audit trail per levelP0Compliance
Atomic updates across levelsP1Consistency
Cross-reference between levelsP1Drill-down capability

Decision

Implement a Hierarchical Knowledge Store backed by FoundationDB with the following structure:

Data Model

┌─────────────────────────────────────────────────────────────────────┐
│ HIERARCHICAL KNOWLEDGE STORE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ LEVEL 3: MASTER │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ corpus_id: UUID │ │
│ │ master_summary: Text (500-2000 tokens) │ │
│ │ key_findings: List[Finding] │ │
│ │ metadata: CorpusMetadata │ │
│ │ created_at: Timestamp │ │
│ │ child_refs: List[SectionRef] │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 1:N │
│ ▼ │
│ LEVEL 2: SECTIONS │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ section_id: UUID │ │
│ │ parent_ref: MasterRef │ │
│ │ section_summary: Text (1000-5000 tokens) │ │
│ │ category: String │ │
│ │ entities: List[Entity] │ │
│ │ child_refs: List[ChunkRef] │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 1:N │
│ ▼ │
│ LEVEL 1: CHUNKS │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ chunk_id: UUID │ │
│ │ parent_ref: SectionRef │ │
│ │ chunk_summary: Text (500-2000 tokens) │ │
│ │ key_quotes: List[Quote] │ │
│ │ entities: List[Entity] │ │
│ │ embedding: Vector[1536] │ │
│ │ child_refs: List[RawRef] │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 1:N │
│ ▼ │
│ LEVEL 0: RAW │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ raw_id: UUID │ │
│ │ parent_ref: ChunkRef │ │
│ │ document_id: UUID │ │
│ │ content: Text (original) │ │
│ │ span: {start: int, end: int} │ │
│ │ preprocessing_applied: PreprocessingRecord │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘

FoundationDB Directory Structure

# Directory layout in FoundationDB
knowledge_store = fdb.directory.create_or_open(('coditect', 'knowledge'))

directories = {
'master': knowledge_store.create_or_open(('master',)),
'sections': knowledge_store.create_or_open(('sections',)),
'chunks': knowledge_store.create_or_open(('chunks',)),
'raw': knowledge_store.create_or_open(('raw',)),
'indexes': knowledge_store.create_or_open(('indexes',)),
'audit': knowledge_store.create_or_open(('audit',)),
}

# Key structure
# master/ {corpus_id} -> MasterRecord
# sections/ {corpus_id}/{section_id} -> SectionRecord
# chunks/ {corpus_id}/{section_id}/{chunk_id} -> ChunkRecord
# raw/ {corpus_id}/{document_id}/{span_start} -> RawRecord
# indexes/ entity/{entity_type}/{entity_value} -> List[ChunkRef]
# indexes/ embedding/{chunk_id} -> Vector
# audit/ {corpus_id}/{timestamp} -> AuditRecord

Core Implementation

@dataclass
class HierarchyLevel(Enum):
MASTER = 3
SECTION = 2
CHUNK = 1
RAW = 0


@dataclass
class KnowledgeNode:
"""Base class for all hierarchy levels"""
id: UUID
corpus_id: UUID
level: HierarchyLevel
parent_ref: Optional[UUID]
child_refs: List[UUID]
created_at: datetime
created_by: str # For audit
version: int


@dataclass
class MasterNode(KnowledgeNode):
"""Top-level corpus summary"""
level: HierarchyLevel = HierarchyLevel.MASTER

summary: str
key_findings: List[Finding]
statistics: CorpusStatistics
processing_metadata: ProcessingMetadata


@dataclass
class SectionNode(KnowledgeNode):
"""Section-level grouping (by category, topic, etc.)"""
level: HierarchyLevel = HierarchyLevel.SECTION

category: str
summary: str
entities: List[Entity]
document_count: int


@dataclass
class ChunkNode(KnowledgeNode):
"""Chunk-level with embeddings for RAG"""
level: HierarchyLevel = HierarchyLevel.CHUNK

summary: str
key_quotes: List[Quote]
entities: List[Entity]
embedding: List[float] # 1536-dim for ada-002 / 3072 for text-embedding-3-large
source_document: UUID


@dataclass
class RawNode(KnowledgeNode):
"""Original text spans"""
level: HierarchyLevel = HierarchyLevel.RAW

content: str
document_id: UUID
span_start: int
span_end: int
preprocessing_record: PreprocessingRecord


class HierarchicalKnowledgeStore:
"""Main store interface"""

def __init__(self, fdb_cluster: str):
self.db = fdb.open(fdb_cluster)
self._init_directories()
self.vector_index = VectorIndex() # For embedding search

@fdb.transactional
def store_hierarchy(
self,
tr: fdb.Transaction,
corpus_id: UUID,
master: MasterNode,
sections: List[SectionNode],
chunks: List[ChunkNode],
raw_nodes: List[RawNode]
) -> None:
"""Atomically store entire hierarchy"""

# Store master
master_key = self.dirs['master'][str(corpus_id)]
tr[master_key] = self._serialize(master)

# Store sections
for section in sections:
section_key = self.dirs['sections'][str(corpus_id)][str(section.id)]
tr[section_key] = self._serialize(section)

# Store chunks with embeddings
for chunk in chunks:
chunk_key = self.dirs['chunks'][str(corpus_id)][str(chunk.parent_ref)][str(chunk.id)]
tr[chunk_key] = self._serialize(chunk)

# Index embedding
self._index_embedding(tr, chunk.id, chunk.embedding)

# Index entities
for entity in chunk.entities:
self._index_entity(tr, entity, chunk.id)

# Store raw
for raw in raw_nodes:
raw_key = self.dirs['raw'][str(corpus_id)][str(raw.document_id)][str(raw.span_start)]
tr[raw_key] = self._serialize(raw)

# Create audit record
self._create_audit_record(tr, corpus_id, "HIERARCHY_CREATED", {
'sections': len(sections),
'chunks': len(chunks),
'raw_nodes': len(raw_nodes)
})

async def query_at_level(
self,
corpus_id: UUID,
level: HierarchyLevel,
query: Optional[str] = None,
filters: Optional[QueryFilters] = None
) -> List[KnowledgeNode]:
"""Query knowledge at specific hierarchy level"""

if level == HierarchyLevel.MASTER:
return [await self._get_master(corpus_id)]

elif level == HierarchyLevel.SECTION:
sections = await self._get_sections(corpus_id)
if filters and filters.category:
sections = [s for s in sections if s.category == filters.category]
return sections

elif level == HierarchyLevel.CHUNK:
if query:
# Semantic search using embeddings
return await self._semantic_search(corpus_id, query, filters)
else:
return await self._get_all_chunks(corpus_id, filters)

else: # RAW
return await self._get_raw_for_chunks(filters.chunk_ids)

async def drill_down(
self,
node_id: UUID,
target_level: HierarchyLevel
) -> List[KnowledgeNode]:
"""Navigate from higher level to lower level"""

node = await self._get_node(node_id)

if node.level.value <= target_level.value:
raise ValueError(f"Cannot drill down from {node.level} to {target_level}")

# Recursive descent
if node.level.value - target_level.value == 1:
# Direct children
return await self._get_children(node)
else:
# Multi-level descent
children = await self._get_children(node)
results = []
for child in children:
results.extend(await self.drill_down(child.id, target_level))
return results

async def roll_up(
self,
node_ids: List[UUID],
target_level: HierarchyLevel
) -> KnowledgeNode:
"""Navigate from lower level to higher level"""

# Find common ancestor at target level
ancestors = set()
for node_id in node_ids:
node = await self._get_node(node_id)
ancestor = await self._find_ancestor_at_level(node, target_level)
ancestors.add(ancestor.id)

if len(ancestors) == 1:
return await self._get_node(ancestors.pop())
else:
# Multiple ancestors - return master
return await self._get_master(node.corpus_id)

async def _semantic_search(
self,
corpus_id: UUID,
query: str,
filters: Optional[QueryFilters],
top_k: int = 10
) -> List[ChunkNode]:
"""Search chunks by semantic similarity"""

# Embed query
query_embedding = await self._embed(query)

# Search vector index
chunk_ids = await self.vector_index.search(
query_embedding,
corpus_filter=corpus_id,
top_k=top_k
)

# Load chunks
chunks = [await self._get_chunk(cid) for cid in chunk_ids]

# Apply additional filters
if filters:
if filters.entity_types:
chunks = [c for c in chunks if self._has_entity_type(c, filters.entity_types)]
if filters.categories:
chunks = [c for c in chunks if c.category in filters.categories]

return chunks

Incremental Updates

class HierarchyUpdater:
"""Handle incremental updates to knowledge hierarchy"""

async def add_documents(
self,
corpus_id: UUID,
new_documents: List[Document]
) -> UpdateResult:
"""Add new documents to existing corpus"""

# Process new documents
new_chunks = await self.processor.process_documents(new_documents)

# Determine affected sections
affected_sections = self._identify_affected_sections(new_chunks)

# Update hierarchy bottom-up
async with self.store.transaction() as tr:
# Add new raw nodes
for doc in new_documents:
await self._add_raw_nodes(tr, corpus_id, doc)

# Add new chunks
for chunk in new_chunks:
await self._add_chunk(tr, corpus_id, chunk)

# Regenerate affected sections
for section_id in affected_sections:
await self._regenerate_section(tr, corpus_id, section_id)

# Regenerate master
await self._regenerate_master(tr, corpus_id)

# Audit
await self._audit_update(tr, corpus_id, new_documents)

return UpdateResult(
documents_added=len(new_documents),
chunks_added=len(new_chunks),
sections_updated=len(affected_sections)
)

async def remove_documents(
self,
corpus_id: UUID,
document_ids: List[UUID]
) -> UpdateResult:
"""Remove documents and update hierarchy"""

# Find all affected nodes
affected = await self._find_affected_nodes(corpus_id, document_ids)

async with self.store.transaction() as tr:
# Remove raw nodes
for raw_id in affected.raw_ids:
await self._remove_raw(tr, raw_id)

# Remove chunks (or regenerate if partially affected)
for chunk_id in affected.chunk_ids:
if await self._chunk_fully_removed(chunk_id, document_ids):
await self._remove_chunk(tr, chunk_id)
else:
await self._regenerate_chunk(tr, chunk_id)

# Regenerate sections
for section_id in affected.section_ids:
await self._regenerate_section(tr, corpus_id, section_id)

# Regenerate master
await self._regenerate_master(tr, corpus_id)

return UpdateResult(documents_removed=len(document_ids))

Compliance Integration

@dataclass
class KnowledgeAuditRecord:
"""21 CFR Part 11 compliant audit record"""

record_id: UUID
corpus_id: UUID
action: Literal["CREATE", "UPDATE", "DELETE", "ACCESS", "EXPORT"]

# What changed
affected_levels: List[HierarchyLevel]
affected_node_ids: List[UUID]

# Who/when
operator_id: str
timestamp: datetime

# Electronic signature
signature_hash: str
signature_meaning: str

# Before/after for updates
before_snapshot: Optional[Dict]
after_snapshot: Optional[Dict]

# Access context
access_reason: Optional[str]
query_text: Optional[str]


class ComplianceAwareStore(HierarchicalKnowledgeStore):
"""Knowledge store with compliance features"""

async def query_with_audit(
self,
corpus_id: UUID,
level: HierarchyLevel,
query: str,
user_context: UserContext,
access_reason: str
) -> Tuple[List[KnowledgeNode], UUID]:
"""Query with automatic audit trail"""

# Check access permissions
await self._check_access(user_context, corpus_id, level)

# Execute query
results = await self.query_at_level(corpus_id, level, query)

# Create audit record
audit_id = await self._create_audit(
corpus_id=corpus_id,
action="ACCESS",
affected_levels=[level],
affected_node_ids=[r.id for r in results],
operator_id=user_context.user_id,
access_reason=access_reason,
query_text=query
)

return results, audit_id

async def export_with_signature(
self,
corpus_id: UUID,
user_context: UserContext,
export_format: str
) -> SignedExport:
"""Export with electronic signature"""

# Get full hierarchy
hierarchy = await self._get_full_hierarchy(corpus_id)

# Format export
export_data = self._format_export(hierarchy, export_format)

# Sign
signature = await self._sign_export(
data=export_data,
signer_id=user_context.user_id,
meaning="APPROVED_FOR_EXPORT"
)

# Audit
await self._create_audit(
corpus_id=corpus_id,
action="EXPORT",
operator_id=user_context.user_id,
signature_hash=signature.hash
)

return SignedExport(
data=export_data,
signature=signature,
audit_trail=await self._get_audit_trail(corpus_id)
)

Query Interface

hierarchy_commands:
- command: "@knowledge:query"
description: Query knowledge hierarchy
parameters:
- corpus_id: string (required)
- level: enum[master, section, chunk, raw] (default: chunk)
- query: string (optional, enables semantic search)
- filters: object (optional)
- access_reason: string (required for compliance)

- command: "@knowledge:drill"
description: Drill down from node to lower level
parameters:
- node_id: string (required)
- target_level: enum[section, chunk, raw]

- command: "@knowledge:rollup"
description: Roll up from nodes to higher level
parameters:
- node_ids: List[string] (required)
- target_level: enum[master, section]

- command: "@knowledge:update"
description: Add or remove documents from corpus
parameters:
- corpus_id: string (required)
- add_documents: List[string] (optional)
- remove_documents: List[string] (optional)

- command: "@knowledge:export"
description: Export hierarchy with signature
parameters:
- corpus_id: string (required)
- format: enum[json, markdown, pdf]
- include_audit: bool (default: true)

Consequences

Positive

  • Multi-granularity access: Query at any level, drill down/roll up as needed
  • Efficient retrieval: O(1) for level access, O(log n) for semantic search
  • Incremental updates: Add documents without full reprocessing
  • Full compliance: Audit trail, signatures, access control
  • Cross-reference: Navigate between levels seamlessly

Negative

  • Storage overhead: 3-5x raw storage for hierarchy + indexes
  • Consistency complexity: Must maintain parent-child relationships
  • Regeneration cost: Updates require partial hierarchy regeneration

Storage Projections

Corpus SizeRaw StorageHierarchy StorageIndex StorageTotal
100 docs10 MB15 MB5 MB30 MB
1,000 docs100 MB150 MB50 MB300 MB
10,000 docs1 GB1.5 GB500 MB3 GB

References

Approval

RoleNameDateDecision
CTOHal Casteel
Data Lead