#!/usr/bin/env python3 """ ADR-151 Context Graph Builder Service (Phase 5: CP-34 through CP-39)
Main builder class that orchestrates context graph construction from the knowledge graph backbone. Creates task-specific subgraphs for agent context windows.
Usage: from scripts.context_graph import ContextGraphBuilder
# Basic usage
builder = ContextGraphBuilder()
graph = builder.build(
task_description="Find decisions about database architecture",
token_budget=4000,
)
# With explicit seeds
graph = builder.build(
task_description="Analyze error solutions",
seed_nodes=["error_solution:42", "decision:15"],
seed_strategy="anchor",
)
# Serialize for LLM context
context_text = builder.serialize_for_context(graph)
Created: 2026-02-03 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.25.2 (CP-34, CP-35) """
import hashlib import json import logging import re import sqlite3 import time from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
Handle both module import and direct script execution
try: from scripts.core.paths import get_org_db_path, get_sessions_db_path except ImportError: import sys _script_dir = Path(file).resolve().parent _core_root = _script_dir.parent.parent if str(_core_root) not in sys.path: sys.path.insert(0, str(_core_root)) from scripts.core.paths import get_org_db_path, get_sessions_db_path
from scripts.context_graph.algorithms import ( ContextGraph, GraphNode, GraphEdge, select_seed_nodes, bfs_expand, compute_relevance_score, ) from scripts.context_graph.pruning import prune_graph from scripts.context_graph.checkpointer import ( SQLiteContextGraphCheckpointer, ContextGraphCheckpointer, )
logger = logging.getLogger(name)
=============================================================================
Builder Configuration
=============================================================================
@dataclass class BuilderConfig: """Configuration for ContextGraphBuilder."""
# Token and size limits
token_budget: int = 4000
max_nodes: int = 128
max_depth: int = 3
max_seeds: int = 5
# Relevance settings
relevance_threshold: float = 0.3
decay_factor: float = 0.8
# Seed strategy
default_seed_strategy: str = "semantic" # 'anchor', 'semantic', 'policy_first'
# Node type priorities for pruning
type_priorities: Dict[str, int] = field(default_factory=lambda: {
"decision": 100,
"adr": 95,
"policy": 90,
"error_solution": 85,
"skill_learning": 80,
"component": 70,
"function": 50,
"file": 40,
"session": 30,
"track": 60,
})
# Persistence settings
enable_checkpointing: bool = True
checkpoint_ttl_hours: int = 24
# Multi-tenant
tenant_id: Optional[str] = None
project_id: Optional[str] = None
=============================================================================
Main Builder Class
=============================================================================
class ContextGraphBuilder: """ Builder service for creating task-specific context graphs.
Workflow:
1. Select seed nodes based on task description
2. BFS expand from seeds through knowledge graph
3. Apply relevance scoring and pruning
4. Optionally persist to sessions.db
5. Serialize for LLM context window
ADR-118 Compliance:
- Reads from org.db (kg_nodes, kg_edges)
- Writes to sessions.db (context_graphs, context_graph_nodes)
"""
VERSION = "1.0.0"
def __init__(
self,
org_db_path: Optional[Path] = None,
sessions_db_path: Optional[Path] = None,
config: Optional[BuilderConfig] = None,
):
"""
Initialize the context graph builder.
Args:
org_db_path: Path to org.db (defaults to ADR-118 location)
sessions_db_path: Path to sessions.db (defaults to ADR-118 location)
config: BuilderConfig instance (defaults to sensible defaults)
"""
self.org_db_path = org_db_path or get_org_db_path()
self.sessions_db_path = sessions_db_path or get_sessions_db_path()
self.config = config or BuilderConfig()
# ADR-159: Auto-detect project scope from environment if not set in config
if not self.config.project_id:
import os
self.config.project_id = os.environ.get('CODITECT_PROJECT')
self._org_conn: Optional[sqlite3.Connection] = None
self._sessions_conn: Optional[sqlite3.Connection] = None
self._checkpointer: Optional[ContextGraphCheckpointer] = None
# Build statistics
self.last_build_stats: Dict[str, Any] = {}
def _get_org_conn(self) -> sqlite3.Connection:
"""Get or create connection to org.db."""
if self._org_conn is None:
self._org_conn = sqlite3.connect(str(self.org_db_path))
self._org_conn.row_factory = sqlite3.Row
return self._org_conn
def _get_sessions_conn(self) -> sqlite3.Connection:
"""Get or create connection to sessions.db."""
if self._sessions_conn is None:
self._sessions_conn = sqlite3.connect(str(self.sessions_db_path))
self._sessions_conn.row_factory = sqlite3.Row
return self._sessions_conn
def _get_checkpointer(self) -> ContextGraphCheckpointer:
"""Get or create checkpointer instance."""
if self._checkpointer is None:
self._checkpointer = SQLiteContextGraphCheckpointer(self.sessions_db_path)
return self._checkpointer
def close(self):
"""Close all database connections."""
if self._org_conn:
self._org_conn.close()
self._org_conn = None
if self._sessions_conn:
self._sessions_conn.close()
self._sessions_conn = None
if self._checkpointer:
self._checkpointer.close()
self._checkpointer = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
# =========================================================================
# Main Build Method
# =========================================================================
def build(
self,
task_description: str,
seed_nodes: Optional[List[str]] = None,
seed_strategy: Optional[str] = None,
token_budget: Optional[int] = None,
max_nodes: Optional[int] = None,
max_depth: Optional[int] = None,
relevance_threshold: Optional[float] = None,
node_types: Optional[List[str]] = None,
edge_types: Optional[List[str]] = None,
persist: bool = False,
session_id: Optional[str] = None,
) -> ContextGraph:
"""
Build a context graph for a task.
Args:
task_description: Natural language description of the task
seed_nodes: Explicit node IDs to use as seeds (for 'anchor' strategy)
seed_strategy: 'anchor', 'semantic', or 'policy_first'
token_budget: Maximum tokens for serialized context
max_nodes: Maximum nodes in graph
max_depth: BFS expansion depth limit
relevance_threshold: Minimum relevance score for inclusion
node_types: Filter seed nodes by type
edge_types: Filter edges by type during expansion
persist: If True, save to sessions.db
session_id: Session ID for persistence
Returns:
ContextGraph with nodes and edges
"""
start_time = time.time()
# Apply defaults from config
seed_strategy = seed_strategy or self.config.default_seed_strategy
token_budget = token_budget or self.config.token_budget
max_nodes = max_nodes or self.config.max_nodes
max_depth = max_depth or self.config.max_depth
relevance_threshold = relevance_threshold or self.config.relevance_threshold
logger.info(f"Building context graph for: {task_description[:100]}...")
logger.info(f"Strategy: {seed_strategy}, Budget: {token_budget}, Max nodes: {max_nodes}")
conn = self._get_org_conn()
# Step 1: Select seed nodes
if seed_strategy == "anchor" and seed_nodes:
seeds = [(node_id, 1.0) for node_id in seed_nodes[:self.config.max_seeds]]
else:
seeds = select_seed_nodes(
conn=conn,
task_description=task_description,
strategy=seed_strategy,
anchor_node_ids=seed_nodes,
node_types=node_types,
max_seeds=self.config.max_seeds,
)
if not seeds:
logger.warning("No seed nodes found, returning empty graph")
return self._create_empty_graph(task_description, seed_strategy, token_budget)
logger.info(f"Selected {len(seeds)} seed nodes")
# Step 2: BFS expansion
graph = bfs_expand(
conn=conn,
seed_nodes=seeds,
max_depth=max_depth,
max_nodes=max_nodes * 2, # Over-fetch for pruning
relevance_threshold=relevance_threshold * 0.5, # Lenient for expansion
edge_types=edge_types,
decay_factor=self.config.decay_factor,
)
# Update graph metadata
graph.task_description = task_description
graph.seed_strategy = seed_strategy
graph.token_budget = token_budget
graph.max_depth = max_depth
graph.max_nodes = max_nodes
# J.25.4.4: Record governance policies applied during graph construction
if seed_strategy == "policy_first":
graph.policies_applied = [
{"node_id": sid, "relevance": score, "role": "seed"}
for sid, score in seeds
]
logger.info(f"Expanded to {graph.node_count} nodes, {graph.edge_count} edges")
# J.25.4.1: Apply governance overlay (inject relevant policy nodes)
self._apply_governance_overlay(graph, task_description, conn)
# Step 3: Pruning
graph = prune_graph(
graph=graph,
token_budget=token_budget,
relevance_threshold=relevance_threshold,
max_depth=max_depth,
preserve_seeds=True,
remove_disconnected=True,
)
# Calculate build time
build_time_ms = int((time.time() - start_time) * 1000)
# J.25.4.3: PHI compliance scan
graph.phi_node_count = self._count_phi_nodes(graph)
if graph.phi_node_count > 0:
logger.info(f"PHI compliance: {graph.phi_node_count} nodes with potential PHI indicators")
# Step 4: Persist if requested
if persist:
self._persist_graph(graph, session_id)
# Update statistics with token breakdown (CP-42)
tokens_by_type = self._get_tokens_by_type(graph)
budget_utilization = (
(graph.tokens_estimated / token_budget * 100)
if token_budget > 0 else 0
)
self.last_build_stats = {
"task_description": task_description[:100],
"seed_strategy": seed_strategy,
"seed_count": len(seeds),
"node_count": graph.node_count,
"edge_count": graph.edge_count,
"tokens_estimated": graph.tokens_estimated,
"token_budget": token_budget,
"budget_utilization_pct": round(budget_utilization, 1),
"tokens_by_type": tokens_by_type,
"build_time_ms": build_time_ms,
"phi_node_count": graph.phi_node_count,
"policies_applied_count": len(graph.policies_applied),
}
logger.info(
f"Build complete: {graph.node_count} nodes, {graph.edge_count} edges, "
f"~{graph.tokens_estimated} tokens in {build_time_ms}ms"
)
return graph
def _create_empty_graph(
self,
task_description: str,
seed_strategy: str,
token_budget: int,
) -> ContextGraph:
"""Create an empty graph when no seeds found."""
graph_id = f"cg:{int(time.time())}:empty"
return ContextGraph(
id=graph_id,
task_description=task_description,
seed_strategy=seed_strategy,
token_budget=token_budget,
)
# =========================================================================
# Persistence Methods
# =========================================================================
def _persist_graph(
self,
graph: ContextGraph,
session_id: Optional[str] = None,
):
"""Persist graph to sessions.db context_graphs tables."""
conn = self._get_sessions_conn()
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
try:
# Insert into context_graphs
conn.execute("""
INSERT OR REPLACE INTO context_graphs (
id, name, task_description, seed_nodes, seed_strategy,
token_budget, max_depth, max_nodes, relevance_threshold,
node_count, edge_count, tokens_estimated,
tenant_id, project_id, session_id,
created_at, build_time_ms, builder_version,
policies_applied, phi_node_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
graph.id,
None, # name
graph.task_description,
json.dumps(graph.seed_nodes),
graph.seed_strategy,
graph.token_budget,
graph.max_depth,
graph.max_nodes,
self.config.relevance_threshold,
graph.node_count,
graph.edge_count,
graph.tokens_estimated,
self.config.tenant_id,
self.config.project_id,
session_id,
now,
self.last_build_stats.get("build_time_ms", 0),
self.VERSION,
json.dumps(graph.policies_applied) if graph.policies_applied else None,
graph.phi_node_count,
))
# Insert nodes into context_graph_nodes
for node_id, node in graph.nodes.items():
conn.execute("""
INSERT OR REPLACE INTO context_graph_nodes (
context_graph_id, node_id, relevance_score, depth,
is_seed, include_properties, token_estimate, included_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
graph.id,
node_id,
node.relevance_score,
node.depth,
1 if node.is_seed else 0,
1, # include_properties
node.token_estimate,
now,
))
conn.commit()
logger.info(f"Persisted graph {graph.id} to sessions.db")
except sqlite3.Error as e:
logger.error(f"Failed to persist graph: {e}")
conn.rollback()
raise
def load_graph(self, graph_id: str) -> Optional[ContextGraph]:
"""
Load a previously persisted context graph.
Args:
graph_id: The graph ID to load
Returns:
ContextGraph if found, None otherwise
"""
sessions_conn = self._get_sessions_conn()
org_conn = self._get_org_conn()
# Load graph metadata
cursor = sessions_conn.execute("""
SELECT id, task_description, seed_nodes, seed_strategy,
token_budget, max_depth, max_nodes, policies_applied,
phi_node_count
FROM context_graphs
WHERE id = ?
""", (graph_id,))
row = cursor.fetchone()
if not row:
return None
graph = ContextGraph(
id=row[0],
task_description=row[1],
seed_nodes=json.loads(row[2]) if row[2] else [],
seed_strategy=row[3],
token_budget=row[4],
max_depth=row[5],
max_nodes=row[6],
policies_applied=json.loads(row[7]) if row[7] else [],
phi_node_count=row[8] if row[8] is not None else 0,
)
# Load nodes
cursor = sessions_conn.execute("""
SELECT node_id, relevance_score, depth, is_seed, token_estimate
FROM context_graph_nodes
WHERE context_graph_id = ?
""", (graph_id,))
node_ids = []
node_metadata = {}
for row in cursor:
node_ids.append(row[0])
node_metadata[row[0]] = {
"relevance_score": row[1],
"depth": row[2],
"is_seed": row[3] == 1,
"token_estimate": row[4],
}
# Fetch node data from org.db
if node_ids:
placeholders = ",".join("?" * len(node_ids))
cursor = org_conn.execute(f"""
SELECT id, node_type, name, subtype, properties
FROM kg_nodes
WHERE id IN ({placeholders})
""", node_ids)
for row in cursor:
node_id = row[0]
metadata = node_metadata.get(node_id, {})
properties = None
if row[4]:
try:
properties = json.loads(row[4])
except json.JSONDecodeError:
pass
graph.nodes[node_id] = GraphNode(
id=node_id,
node_type=row[1],
name=row[2],
subtype=row[3],
properties=properties,
relevance_score=metadata.get("relevance_score", 1.0),
depth=metadata.get("depth", 0),
is_seed=metadata.get("is_seed", False),
token_estimate=metadata.get("token_estimate", 0),
)
# Rebuild edges from kg_edges for nodes in graph
if len(graph.nodes) > 1:
node_id_list = list(graph.nodes.keys())
placeholders = ",".join("?" * len(node_id_list))
cursor = org_conn.execute(f"""
SELECT from_node, to_node, edge_type, properties
FROM kg_edges
WHERE from_node IN ({placeholders}) AND to_node IN ({placeholders})
""", node_id_list + node_id_list)
for row in cursor:
graph.edges.append(GraphEdge(
from_node=row[0],
to_node=row[1],
edge_type=row[2],
properties=json.loads(row[3]) if row[3] else None,
))
logger.info(f"Loaded graph {graph_id}: {graph.node_count} nodes, {graph.edge_count} edges")
return graph
def record_usage(
self,
graph_id: str,
session_id: Optional[str] = None,
message_id: Optional[str] = None,
agent_id: Optional[str] = None,
tokens_used: Optional[int] = None,
retrieval_time_ms: Optional[int] = None,
):
"""Record usage of a context graph for analytics."""
conn = self._get_sessions_conn()
try:
conn.execute("""
INSERT INTO context_graph_usage (
context_graph_id, session_id, message_id, agent_id,
tokens_used, retrieval_time_ms
) VALUES (?, ?, ?, ?, ?, ?)
""", (
graph_id,
session_id,
message_id,
agent_id,
tokens_used,
retrieval_time_ms,
))
conn.commit()
except sqlite3.Error as e:
logger.error(f"Failed to record usage: {e}")
# =========================================================================
# Serialization Methods
# =========================================================================
def serialize_for_context(
self,
graph: ContextGraph,
format: str = "markdown",
include_edges: bool = True,
max_tokens: Optional[int] = None,
) -> str:
"""
Serialize context graph for LLM context window.
Args:
graph: ContextGraph to serialize
format: 'markdown', 'json', or 'text'
include_edges: Include relationship descriptions
max_tokens: Optional additional token limit
Returns:
Serialized string for LLM context
"""
if format == "json":
return json.dumps(graph.to_dict(), indent=2)
elif format == "markdown":
return self._serialize_markdown(graph, include_edges)
else: # text
return self._serialize_text(graph, include_edges)
def _serialize_markdown(self, graph: ContextGraph, include_edges: bool) -> str:
"""Serialize graph to markdown format."""
lines = [
f"# Context Graph",
f"",
f"**Task:** {graph.task_description}",
f"**Nodes:** {graph.node_count} | **Edges:** {graph.edge_count}",
f"",
]
# Group nodes by type
nodes_by_type: Dict[str, List[GraphNode]] = {}
for node in graph.nodes.values():
if node.node_type not in nodes_by_type:
nodes_by_type[node.node_type] = []
nodes_by_type[node.node_type].append(node)
# Output by type, sorted by relevance
for node_type in sorted(nodes_by_type.keys()):
nodes = sorted(nodes_by_type[node_type], key=lambda n: -n.relevance_score)
lines.append(f"## {node_type.replace('_', ' ').title()}s")
lines.append("")
for node in nodes[:10]: # Limit per type
seed_marker = " [SEED]" if node.is_seed else ""
lines.append(f"### {node.name}{seed_marker}")
lines.append(f"*Relevance: {node.relevance_score:.2f}*")
if node.properties:
for key, value in list(node.properties.items())[:5]:
if isinstance(value, str) and len(value) > 200:
value = value[:200] + "..."
lines.append(f"- **{key}:** {value}")
lines.append("")
# Add edges if requested
if include_edges and graph.edges:
lines.append("## Relationships")
lines.append("")
# Group by edge type
edges_by_type: Dict[str, List[GraphEdge]] = {}
for edge in graph.edges:
if edge.edge_type not in edges_by_type:
edges_by_type[edge.edge_type] = []
edges_by_type[edge.edge_type].append(edge)
for edge_type, edges in sorted(edges_by_type.items()):
lines.append(f"### {edge_type}")
for edge in edges[:10]:
from_name = graph.nodes.get(edge.from_node, GraphNode(id="", node_type="", name=edge.from_node)).name
to_name = graph.nodes.get(edge.to_node, GraphNode(id="", node_type="", name=edge.to_node)).name
lines.append(f"- {from_name} -> {to_name}")
lines.append("")
return "\n".join(lines)
def _serialize_text(self, graph: ContextGraph, include_edges: bool) -> str:
"""Serialize graph to plain text format."""
lines = [
f"Context Graph: {graph.task_description}",
f"Nodes: {graph.node_count}, Edges: {graph.edge_count}",
"",
]
# Sort nodes by relevance
sorted_nodes = sorted(graph.nodes.values(), key=lambda n: -n.relevance_score)
for node in sorted_nodes:
seed_marker = " [SEED]" if node.is_seed else ""
lines.append(f"- [{node.node_type}] {node.name}{seed_marker} (rel: {node.relevance_score:.2f})")
if include_edges and graph.edges:
lines.append("")
lines.append("Relationships:")
for edge in graph.edges[:20]:
lines.append(f" {edge.from_node} --{edge.edge_type}--> {edge.to_node}")
return "\n".join(lines)
# =========================================================================
# J.25.4.3: PHI Compliance Methods
# =========================================================================
# PHI indicator patterns (HIPAA Safe Harbor categories)
PHI_PATTERNS = [
# Direct identifiers
"patient", "ssn", "social_security", "mrn", "medical_record",
"dob", "date_of_birth", "birthdate", "birth_date",
# Contact info
"phone_number", "fax_number", "email_address",
"street_address", "zip_code", "postal_code",
# Health data
"diagnosis", "treatment", "medication", "prescription",
"lab_result", "vital_sign", "allergy", "immunization",
"health_plan", "insurance_id", "beneficiary",
# Biometric
"fingerprint", "retinal", "voiceprint", "biometric",
# Device/account
"device_serial", "ip_address", "mac_address",
"account_number", "certificate_number", "license_number",
# Photos/images
"face_image", "photo_id", "full_face",
]
def _count_phi_nodes(self, graph: ContextGraph) -> int:
"""
J.25.4.3: Count nodes that may contain PHI indicators.
Scans node properties and names for HIPAA Safe Harbor identifiers.
This is a heuristic scan - not a definitive PHI classification.
Returns:
Count of nodes with potential PHI content
"""
phi_count = 0
phi_patterns_lower = self.PHI_PATTERNS
for node in graph.nodes.values():
if self._node_has_phi_indicators(node, phi_patterns_lower):
phi_count += 1
return phi_count
def _node_has_phi_indicators(self, node: GraphNode, patterns: List[str]) -> bool:
"""Check if a node's content contains PHI indicators."""
# Check node name
name_lower = (node.name or "").lower()
for pattern in patterns:
if pattern in name_lower:
return True
# Check node properties
if node.properties:
props_str = json.dumps(node.properties).lower()
for pattern in patterns:
if pattern in props_str:
return True
return False
# =========================================================================
# J.25.4.1: Governance Overlay
# =========================================================================
# Mapping of policy scopes to node types and task keywords that trigger them
POLICY_SCOPE_TRIGGERS = {
"all_agents": {
"always_apply": True,
"description": "Universal policies that apply to all context graphs",
},
"file_operations": {
"node_types": {"file", "component"},
"keywords": {"file", "write", "edit", "create", "delete", "move", "rename", "path"},
},
"slash_commands": {
"node_types": {"component"},
"keywords": {"command", "slash", "invoke", "execute"},
"subtypes": {"command"},
},
"tool_calls": {
"node_types": {"function", "component"},
"keywords": {"tool", "bash", "script", "execute", "run"},
},
"session_logs": {
"keywords": {"session", "log", "export", "cx", "cxq"},
},
"hooks": {
"node_types": {"component"},
"keywords": {"hook", "governance", "enforce", "validate"},
"subtypes": {"hook"},
},
}
def _apply_governance_overlay(
self,
graph: ContextGraph,
task_description: str,
conn: sqlite3.Connection,
) -> None:
"""
J.25.4.1: Apply governance policy overlay to a context graph.
For ALL build strategies, this method:
1. Fetches policy nodes from the knowledge graph
2. Scores each policy's relevance to the task and graph contents
3. Injects applicable policy nodes into the graph
4. Creates GOVERNED_BY edges from affected nodes to policies
5. Records applied policies in graph.policies_applied
Policy matching uses scope-based triggers:
- "all_agents" scope: always included (e.g., Safety Directive)
- Scoped policies: matched by node types present in graph
and keywords in the task description
Args:
graph: ContextGraph to augment (modified in place)
task_description: Task description for keyword matching
conn: Connection to org.db containing kg_nodes
"""
# Fetch all policy nodes
cursor = conn.execute("""
SELECT id, name, subtype, properties
FROM kg_nodes
WHERE node_type = 'policy'
""")
policy_rows = cursor.fetchall()
if not policy_rows:
logger.debug("No policy nodes in knowledge graph, skipping governance overlay")
return
# Analyze graph contents for scope matching
graph_node_types = {n.node_type for n in graph.nodes.values()}
graph_subtypes = {n.subtype for n in graph.nodes.values() if n.subtype}
task_tokens = set(re.findall(r'\b\w+\b', task_description.lower()))
# Score and select applicable policies
applicable_policies = []
for row in policy_rows:
policy_id = row[0]
policy_name = row[1]
policy_subtype = row[2]
policy_props_raw = row[3]
# Skip if already in graph (e.g., from policy_first seeds)
if policy_id in graph.nodes:
continue
policy_props = {}
if policy_props_raw:
try:
policy_props = json.loads(policy_props_raw)
except (json.JSONDecodeError, TypeError):
pass
scope = policy_props.get("scope", "")
relevance = self._score_policy_relevance(
scope=scope,
policy_name=policy_name,
policy_subtype=policy_subtype,
graph_node_types=graph_node_types,
graph_subtypes=graph_subtypes,
task_tokens=task_tokens,
)
if relevance > 0:
applicable_policies.append((policy_id, policy_name, policy_subtype, policy_props, relevance))
if not applicable_policies:
logger.debug("No applicable policies for this graph")
return
# Sort by relevance descending, limit to avoid bloating the graph
applicable_policies.sort(key=lambda x: -x[4])
max_policy_nodes = min(len(applicable_policies), 10)
selected = applicable_policies[:max_policy_nodes]
logger.info(f"J.25.4.1: Applying {len(selected)} governance policy nodes to context graph")
# Inject policy nodes and create GOVERNED_BY edges
for policy_id, policy_name, policy_subtype, policy_props, relevance in selected:
# Fetch full node from KG
node_cursor = conn.execute("""
SELECT id, node_type, name, subtype, properties
FROM kg_nodes WHERE id = ?
""", (policy_id,))
node_row = node_cursor.fetchone()
if not node_row:
continue
properties = None
if node_row[4]:
try:
properties = json.loads(node_row[4])
except (json.JSONDecodeError, TypeError):
pass
# Add policy node to graph with governance flag
# Note: NOT marked as seed so pruning can remove them under tight budgets
policy_node = GraphNode(
id=node_row[0],
node_type=node_row[1],
name=node_row[2],
subtype=node_row[3],
properties=properties,
relevance_score=round(relevance, 4),
depth=0, # Governance nodes are top-level
is_seed=False, # Prunable under tight token budgets
token_estimate=self._estimate_policy_tokens(policy_name, properties),
)
graph.nodes[policy_id] = policy_node
# Create GOVERNED_BY edges from affected graph nodes to this policy
scope = policy_props.get("scope", "")
affected_node_ids = self._find_governed_nodes(graph, scope, policy_subtype)
for affected_id in affected_node_ids[:5]: # Limit edges per policy
graph.edges.append(GraphEdge(
from_node=affected_id,
to_node=policy_id,
edge_type="GOVERNED_BY",
properties={"source": "governance_overlay"},
))
# Record in policies_applied
graph.policies_applied.append({
"node_id": policy_id,
"name": policy_name,
"subtype": policy_subtype,
"scope": scope,
"relevance": round(relevance, 4),
"role": "overlay",
"governed_nodes": len(affected_node_ids),
})
logger.info(
f"J.25.4.1: Governance overlay complete - "
f"{len(selected)} policies, {len(graph.policies_applied)} total applied"
)
def _score_policy_relevance(
self,
scope: str,
policy_name: str,
policy_subtype: Optional[str],
graph_node_types: set,
graph_subtypes: set,
task_tokens: set,
) -> float:
"""
Score a policy's relevance to the current context graph.
Returns:
Relevance score 0.0-1.0 (0.0 means not applicable)
"""
triggers = self.POLICY_SCOPE_TRIGGERS.get(scope, {})
# Universal policies always apply
if triggers.get("always_apply"):
return 0.9
score = 0.0
# Check node type overlap
trigger_types = triggers.get("node_types", set())
if trigger_types and graph_node_types & trigger_types:
score += 0.5
# Check subtype overlap
trigger_subtypes = triggers.get("subtypes", set())
if trigger_subtypes and graph_subtypes & trigger_subtypes:
score += 0.3
# Check keyword overlap with task description
trigger_keywords = triggers.get("keywords", set())
if trigger_keywords and task_tokens & trigger_keywords:
score += 0.4
# Bonus for directive policies (highest enforcement)
if policy_subtype == "directive":
score += 0.1
# Unknown scope - check name-based heuristic
if not triggers and scope:
# Check if any scope word appears in task tokens
scope_tokens = set(scope.lower().replace("_", " ").split())
if scope_tokens & task_tokens:
score += 0.3
return min(1.0, score)
def _find_governed_nodes(
self,
graph: ContextGraph,
scope: str,
policy_subtype: Optional[str],
) -> List[str]:
"""
Find nodes in the graph that should be governed by a policy.
Returns:
List of node IDs that the policy governs
"""
triggers = self.POLICY_SCOPE_TRIGGERS.get(scope, {})
governed = []
for node_id, node in graph.nodes.items():
if node.node_type == "policy":
continue # Don't govern other policies
# Universal policies govern all non-policy nodes
if triggers.get("always_apply"):
governed.append(node_id)
continue
# Check node type match
trigger_types = triggers.get("node_types", set())
if trigger_types and node.node_type in trigger_types:
governed.append(node_id)
continue
# Check subtype match
trigger_subtypes = triggers.get("subtypes", set())
if trigger_subtypes and node.subtype in trigger_subtypes:
governed.append(node_id)
continue
return governed
@staticmethod
def _estimate_policy_tokens(name: str, properties: Optional[dict]) -> int:
"""Estimate tokens for a policy node."""
text_len = len(name)
if properties:
# Only count key fields, not full properties (to save budget)
for key in ("rule_text", "summary", "principles"):
if key in properties:
val = properties[key]
if isinstance(val, str):
text_len += min(len(val), 500) # Cap per field
elif isinstance(val, list):
text_len += min(sum(len(str(v)) for v in val), 500)
return max(20, text_len // 4)
# =========================================================================
# Utility Methods
# =========================================================================
def _get_tokens_by_type(self, graph: ContextGraph) -> Dict[str, Dict[str, int]]:
"""
CP-42: Get token breakdown by node type.
Returns:
Dict mapping node_type to {count, tokens, avg_tokens}
"""
type_stats: Dict[str, Dict[str, int]] = {}
for node in graph.nodes.values():
node_type = node.node_type
if node_type not in type_stats:
type_stats[node_type] = {"count": 0, "tokens": 0}
type_stats[node_type]["count"] += 1
type_stats[node_type]["tokens"] += node.token_estimate
# Calculate averages
for node_type, stats in type_stats.items():
if stats["count"] > 0:
stats["avg_tokens"] = round(stats["tokens"] / stats["count"], 1)
else:
stats["avg_tokens"] = 0
# Sort by tokens descending
return dict(sorted(
type_stats.items(),
key=lambda x: x[1]["tokens"],
reverse=True
))
def get_token_report(self, graph: ContextGraph) -> str:
"""
CP-42: Generate human-readable token usage report.
Args:
graph: ContextGraph to analyze
Returns:
Formatted token report string
"""
tokens_by_type = self._get_tokens_by_type(graph)
total_tokens = graph.tokens_estimated
budget = graph.token_budget
lines = [
"TOKEN BUDGET REPORT",
"=" * 50,
f"Total Tokens: {total_tokens:,} / {budget:,} ({total_tokens/budget*100:.1f}%)",
"",
"Breakdown by Node Type:",
"-" * 50,
f"{'Type':<20} {'Count':>8} {'Tokens':>10} {'Avg':>8} {'Pct':>8}",
"-" * 50,
]
for node_type, stats in tokens_by_type.items():
pct = (stats["tokens"] / total_tokens * 100) if total_tokens > 0 else 0
lines.append(
f"{node_type:<20} {stats['count']:>8} {stats['tokens']:>10,} "
f"{stats['avg_tokens']:>8.1f} {pct:>7.1f}%"
)
lines.extend([
"-" * 50,
f"{'TOTAL':<20} {graph.node_count:>8} {total_tokens:>10,} "
f"{total_tokens/graph.node_count if graph.node_count > 0 else 0:>8.1f} {'100.0':>7}%",
])
return "\n".join(lines)
def get_stats(self) -> Dict[str, Any]:
"""Get statistics about the knowledge graph."""
conn = self._get_org_conn()
stats = {}
# Node counts by type
cursor = conn.execute("""
SELECT node_type, COUNT(*) as count
FROM kg_nodes
GROUP BY node_type
ORDER BY count DESC
""")
stats["nodes_by_type"] = {row[0]: row[1] for row in cursor}
# Edge counts by type
cursor = conn.execute("""
SELECT edge_type, COUNT(*) as count
FROM kg_edges
GROUP BY edge_type
ORDER BY count DESC
""")
stats["edges_by_type"] = {row[0]: row[1] for row in cursor}
# Totals
cursor = conn.execute("SELECT COUNT(*) FROM kg_nodes")
stats["total_nodes"] = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(*) FROM kg_edges")
stats["total_edges"] = cursor.fetchone()[0]
# Context graph stats from sessions.db
try:
sessions_conn = self._get_sessions_conn()
cursor = sessions_conn.execute("SELECT COUNT(*) FROM context_graphs")
stats["total_context_graphs"] = cursor.fetchone()[0]
cursor = sessions_conn.execute("SELECT COUNT(*) FROM context_graph_usage")
stats["total_graph_usages"] = cursor.fetchone()[0]
except sqlite3.OperationalError:
stats["total_context_graphs"] = 0
stats["total_graph_usages"] = 0
return stats
=============================================================================
CLI for Testing
=============================================================================
def main(): """CLI for testing context graph builder.""" import argparse
parser = argparse.ArgumentParser(
description="ADR-151 Context Graph Builder"
)
parser.add_argument(
"task",
nargs="?",
default="Find all decisions related to database architecture",
help="Task description for building context graph"
)
parser.add_argument(
"--strategy", "-s",
choices=["anchor", "semantic", "policy_first"],
default="semantic",
help="Seed selection strategy"
)
parser.add_argument(
"--budget", "-b",
type=int,
default=4000,
help="Token budget"
)
parser.add_argument(
"--max-nodes", "-n",
type=int,
default=64,
help="Maximum nodes"
)
parser.add_argument(
"--max-depth", "-d",
type=int,
default=3,
help="Maximum BFS depth"
)
parser.add_argument(
"--persist", "-p",
action="store_true",
help="Persist graph to database"
)
parser.add_argument(
"--format", "-f",
choices=["markdown", "json", "text"],
default="markdown",
help="Output format"
)
parser.add_argument(
"--stats",
action="store_true",
help="Show knowledge graph statistics"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Verbose output"
)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
with ContextGraphBuilder() as builder:
if args.stats:
stats = builder.get_stats()
print(json.dumps(stats, indent=2))
return 0
graph = builder.build(
task_description=args.task,
seed_strategy=args.strategy,
token_budget=args.budget,
max_nodes=args.max_nodes,
max_depth=args.max_depth,
persist=args.persist,
)
output = builder.serialize_for_context(graph, format=args.format)
print(output)
print("\n---")
print(f"Build stats: {builder.last_build_stats}")
return 0
if name == "main": import sys sys.exit(main())