#!/usr/bin/env python3 """ OPT-5a: Knowledge Graph Inference Engine (ADR-151)
Derives new edges from patterns in existing kg_nodes and kg_edges. Each inference rule scans the graph for a specific pattern and creates edges that weren't explicitly observed but can be logically inferred.
Inference Rules:
- DEPENDS_ON: file A depends on file B (derived from cross-file CALLS edges)
- RELATED_TO: components frequently co-invoked in same sessions
- LEARNS_FROM: skill_learning linked to originating session
- IMPLEMENTS: component linked to ADR it references
- SUPERSEDES: newer decision supersedes older same-type decision in session
Usage: python3 scripts/knowledge_graph/inference_engine.py python3 scripts/knowledge_graph/inference_engine.py --dry-run python3 scripts/knowledge_graph/inference_engine.py --rules depends_on,related_to python3 scripts/knowledge_graph/inference_engine.py --stats
Created: 2026-02-05 Author: Claude (Opus 4.6) Track: J (Memory Intelligence) Task: J.3.7.2 """
import argparse import json import logging import re import sqlite3 import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Set, Tuple
Handle imports for both module and direct execution
try: from scripts.core.paths import get_org_db_path, get_sessions_db_path except ModuleNotFoundError: _script_dir = Path(file).resolve().parent _core_root = _script_dir.parent.parent if str(_core_root) not in sys.path: sys.path.insert(0, str(_core_root)) from scripts.core.paths import get_org_db_path, get_sessions_db_path
Configure logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)
Edge tuple: (from_node_id, to_node_id, edge_type, properties)
InferredEdge = Tuple[str, str, str, Dict[str, Any]]
class InferenceEngine: """ Applies inference rules to the knowledge graph to derive new edges.
Each rule is a method that yields InferredEdge tuples. The engine
validates that both endpoints exist, avoids duplicates, and batch
upserts the results.
"""
BATCH_SIZE = 1000
# Available rules and their descriptions
RULES = {
"depends_on": "File dependencies from cross-file function calls",
"related_to": "Component co-occurrence in sessions",
"learns_from": "Skill learnings linked to originating sessions",
"implements": "Components linked to ADRs they reference",
"supersedes": "Newer decisions superseding older same-type decisions",
}
def __init__(
self,
org_db_path: Path,
dry_run: bool = False,
min_co_occurrence: int = 3,
):
self.org_db_path = org_db_path
self.dry_run = dry_run
self.min_co_occurrence = min_co_occurrence
self._conn: Optional[sqlite3.Connection] = None
self._existing_nodes: Optional[Set[str]] = None
self._existing_edges: Optional[Set[str]] = None
self.stats: Dict[str, Dict[str, int]] = {}
def connect(self) -> sqlite3.Connection:
"""Get or create database connection."""
if self._conn is None:
self._conn = sqlite3.connect(str(self.org_db_path))
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA foreign_keys = ON;")
return self._conn
def close(self):
"""Close database connection."""
if self._conn:
self._conn.close()
self._conn = None
def load_existing_nodes(self) -> Set[str]:
"""Load all node IDs for validation."""
if self._existing_nodes is not None:
return self._existing_nodes
conn = self.connect()
cursor = conn.execute("SELECT id FROM kg_nodes")
self._existing_nodes = {row[0] for row in cursor}
logger.info(f"Loaded {len(self._existing_nodes)} existing nodes")
return self._existing_nodes
def load_existing_edges(self) -> Set[str]:
"""Load all edge keys (type:from:to) to avoid duplicates."""
if self._existing_edges is not None:
return self._existing_edges
conn = self.connect()
cursor = conn.execute(
"SELECT edge_type, from_node, to_node FROM kg_edges"
)
self._existing_edges = {
f"{row[0]}:{row[1]}:{row[2]}" for row in cursor
}
logger.info(f"Loaded {len(self._existing_edges)} existing edges")
return self._existing_edges
def edge_exists(self, edge_type: str, from_node: str, to_node: str) -> bool:
"""Check if an edge already exists."""
edges = self.load_existing_edges()
return f"{edge_type}:{from_node}:{to_node}" in edges
def upsert_edges(
self,
conn: sqlite3.Connection,
edges: List[InferredEdge],
) -> Tuple[int, int]:
"""Batch upsert inferred edges."""
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
success = 0
errors = 0
for from_node, to_node, edge_type, properties in edges:
edge_id = f"{edge_type}:{from_node}:{to_node}"
properties["inferred"] = True
properties["inferred_at"] = now
props_json = json.dumps(properties, ensure_ascii=False)
try:
conn.execute("""
INSERT INTO kg_edges (
id, edge_type, from_node, to_node, properties,
created_at
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(from_node, to_node, edge_type) DO UPDATE SET
properties = excluded.properties
""", (edge_id, edge_type, from_node, to_node, props_json, now))
success += 1
except sqlite3.Error as e:
logger.debug(f"Error upserting inferred edge {edge_id}: {e}")
errors += 1
return success, errors
def run(self, rules: Optional[List[str]] = None) -> Dict[str, Dict[str, int]]:
"""
Run inference rules.
Args:
rules: List of rule names (None = all)
Returns:
Stats per rule
"""
if rules is None:
rules = list(self.RULES.keys())
conn = self.connect()
nodes = self.load_existing_nodes()
self.load_existing_edges()
logger.info("=" * 60)
logger.info("ADR-151 Knowledge Graph Inference Engine")
logger.info(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}")
logger.info(f"Rules: {', '.join(rules)}")
logger.info(f"Nodes: {len(nodes)}")
logger.info("=" * 60)
rule_methods = {
"depends_on": self._infer_depends_on,
"related_to": self._infer_related_to,
"learns_from": self._infer_learns_from,
"implements": self._infer_implements,
"supersedes": self._infer_supersedes,
}
for rule_name in rules:
if rule_name not in rule_methods:
logger.warning(f"Unknown rule: {rule_name}")
continue
logger.info(f"\n[{rule_name.upper()}] {self.RULES[rule_name]}")
logger.info("-" * 40)
rule_stats = {
"candidates": 0,
"new_edges": 0,
"skipped_exists": 0,
"skipped_missing": 0,
"errors": 0,
}
batch: List[InferredEdge] = []
try:
for edge in rule_methods[rule_name]():
from_node, to_node, edge_type, props = edge
rule_stats["candidates"] += 1
# Validate endpoints
if from_node not in nodes or to_node not in nodes:
rule_stats["skipped_missing"] += 1
continue
# Skip if already exists
if self.edge_exists(edge_type, from_node, to_node):
rule_stats["skipped_exists"] += 1
continue
if self.dry_run:
rule_stats["new_edges"] += 1
if rule_stats["new_edges"] <= 3:
logger.info(f" [DRY] {edge_type}: {from_node} -> {to_node}")
continue
batch.append(edge)
# Track in existing_edges to avoid duplicates within same run
self._existing_edges.add(f"{edge_type}:{from_node}:{to_node}")
if len(batch) >= self.BATCH_SIZE:
ok, err = self.upsert_edges(conn, batch)
conn.commit()
rule_stats["new_edges"] += ok
rule_stats["errors"] += err
batch = []
# Flush remaining
if batch and not self.dry_run:
ok, err = self.upsert_edges(conn, batch)
conn.commit()
rule_stats["new_edges"] += ok
rule_stats["errors"] += err
except Exception as e:
logger.error(f"Rule {rule_name} failed: {e}")
rule_stats["errors"] += 1
self.stats[rule_name] = rule_stats
logger.info(
f" Result: {rule_stats['new_edges']} new, "
f"{rule_stats['skipped_exists']} existing, "
f"{rule_stats['skipped_missing']} missing nodes, "
f"{rule_stats['errors']} errors"
)
# Summary
total_new = sum(s["new_edges"] for s in self.stats.values())
logger.info(f"\nTotal inferred edges: {total_new}")
self.close()
return self.stats
# ---- Inference Rules ----
def _infer_depends_on(self) -> Generator[InferredEdge, None, None]:
"""
Rule: DEPENDS_ON (file → file)
If function X in file A calls function Y in file B (A ≠ B),
then file A depends on file B.
Aggregates cross-file calls into file-level dependency edges.
"""
conn = self.connect()
# Get all CALLS edges with their source/target file mappings
# function nodes have format "function:{file_path}:{func_name}"
# file nodes have format "file:{file_path}"
cursor = conn.execute("""
SELECT
e.from_node,
e.to_node,
fn_from.properties as from_props,
fn_to.properties as to_props
FROM kg_edges e
JOIN kg_nodes fn_from ON fn_from.id = e.from_node AND fn_from.node_type = 'function'
JOIN kg_nodes fn_to ON fn_to.id = e.to_node AND fn_to.node_type = 'function'
WHERE e.edge_type = 'CALLS'
""")
# Aggregate by file pair
file_deps: Dict[Tuple[str, str], int] = defaultdict(int)
for row in cursor:
from_func = row['from_node'] # function:path:name
to_func = row['to_node']
# Extract file paths from function node IDs
# Format: function:{path}:{name}
from_parts = from_func.split(":", 2)
to_parts = to_func.split(":", 2)
if len(from_parts) < 3 or len(to_parts) < 3:
continue
from_file_path = from_parts[1]
to_file_path = to_parts[1]
# Skip self-dependencies
if from_file_path == to_file_path:
continue
from_file_node = f"file:{from_file_path}"
to_file_node = f"file:{to_file_path}"
file_deps[(from_file_node, to_file_node)] += 1
# Yield dependency edges
for (from_file, to_file), call_count in file_deps.items():
yield (
from_file,
to_file,
"DEPENDS_ON",
{
"rule": "depends_on",
"call_count": call_count,
"description": f"File depends on {to_file.split(':',1)[1]} via {call_count} function call(s)",
},
)
def _infer_related_to(self) -> Generator[InferredEdge, None, None]:
"""
Rule: RELATED_TO (component → component)
If two components are invoked in the same session at least
min_co_occurrence times, they are related.
"""
conn = self.connect()
# Build session → components map from INVOKES edges
cursor = conn.execute("""
SELECT from_node, to_node
FROM kg_edges
WHERE edge_type = 'INVOKES'
""")
session_components: Dict[str, List[str]] = defaultdict(list)
for row in cursor:
session_id = row['from_node']
component_id = row['to_node']
session_components[session_id].append(component_id)
# Count co-occurrences
co_occur: Dict[Tuple[str, str], int] = defaultdict(int)
for session_id, components in session_components.items():
unique = sorted(set(components))
# Pairwise combinations (order-independent)
for i in range(len(unique)):
for j in range(i + 1, len(unique)):
pair = (unique[i], unique[j])
co_occur[pair] += 1
# Yield edges above threshold
for (comp_a, comp_b), count in co_occur.items():
if count >= self.min_co_occurrence:
yield (
comp_a,
comp_b,
"RELATED_TO",
{
"rule": "related_to",
"co_occurrence_count": count,
"description": f"Co-invoked in {count} session(s)",
},
)
def _infer_learns_from(self) -> Generator[InferredEdge, None, None]:
"""
Rule: LEARNS_FROM (skill_learning → session)
If a skill_learning node has a session_id in its properties,
link it to the originating session node.
"""
conn = self.connect()
cursor = conn.execute("""
SELECT id, properties
FROM kg_nodes
WHERE node_type = 'skill_learning'
AND properties IS NOT NULL
""")
for row in cursor:
learning_id = row['id']
try:
props = json.loads(row['properties'])
except (json.JSONDecodeError, TypeError):
continue
# Look for session reference in properties
session_id = props.get('session_id') or props.get('session')
if not session_id:
continue
# Session nodes have format "session:{session_id}"
session_node = f"session:{session_id}"
yield (
learning_id,
session_node,
"LEARNS_FROM",
{
"rule": "learns_from",
"description": f"Learning derived from session",
},
)
def _infer_implements(self) -> Generator[InferredEdge, None, None]:
"""
Rule: IMPLEMENTS (component → adr)
If a component's properties or name reference an ADR (ADR-XXX pattern),
it implements that ADR.
"""
conn = self.connect()
# Load ADR nodes for matching
adr_cursor = conn.execute("""
SELECT id, name, properties
FROM kg_nodes
WHERE node_type = 'adr'
""")
adr_map: Dict[str, str] = {} # ADR number → node_id
for row in adr_cursor:
adr_id = row['id']
name = row['name'] or ""
# Extract ADR number from name or id
match = re.search(r'ADR[- ]?(\d+)', name, re.IGNORECASE)
if match:
adr_map[match.group(1)] = adr_id
# Also try from id (format: adr:ADR-XXX-...)
id_match = re.search(r'ADR[- ]?(\d+)', adr_id, re.IGNORECASE)
if id_match:
adr_map[id_match.group(1)] = adr_id
if not adr_map:
return
# Scan components for ADR references
comp_cursor = conn.execute("""
SELECT id, name, properties
FROM kg_nodes
WHERE node_type = 'component'
""")
for row in comp_cursor:
comp_id = row['id']
searchable = (row['name'] or "") + " " + (row['properties'] or "")
# Find all ADR-XXX references
adr_refs = set(re.findall(r'ADR[- ]?(\d+)', searchable, re.IGNORECASE))
for adr_num in adr_refs:
if adr_num in adr_map:
yield (
comp_id,
adr_map[adr_num],
"IMPLEMENTS",
{
"rule": "implements",
"adr_reference": f"ADR-{adr_num}",
"description": f"Component references ADR-{adr_num}",
},
)
def _infer_supersedes(self) -> Generator[InferredEdge, None, None]:
"""
Rule: SUPERSEDES (decision → decision)
If two decisions share the same decision_type and the newer one's
text references concepts from the older one, the newer supersedes it.
Uses property matching: same decision_type + close temporal proximity
within the same project_path.
"""
conn = self.connect()
# Load decisions grouped by (decision_type, project_path)
cursor = conn.execute("""
SELECT id, properties
FROM kg_nodes
WHERE node_type = 'decision'
AND properties IS NOT NULL
ORDER BY id
""")
# Group by (decision_type, project_path)
decision_groups: Dict[Tuple[str, str], List[Tuple[str, Dict]]] = defaultdict(list)
for row in cursor:
try:
props = json.loads(row['properties'])
except (json.JSONDecodeError, TypeError):
continue
dtype = props.get('decision_type', 'general')
project = props.get('project_path', 'unknown')
decision_groups[(dtype, project)].append((row['id'], props))
# Within each group, newer decisions supersede older ones
# Only create edges for groups with 2+ decisions
for key, decisions in decision_groups.items():
if len(decisions) < 2:
continue
# Sort by created_at if available, else by node_id
decisions.sort(
key=lambda d: d[1].get('created_at', '') or ''
)
# Each decision supersedes the one before it in the same group
for i in range(1, len(decisions)):
newer_id = decisions[i][0]
older_id = decisions[i - 1][0]
yield (
newer_id,
older_id,
"SUPERSEDES",
{
"rule": "supersedes",
"decision_type": key[0],
"project_path": key[1],
"description": f"Later {key[0]} decision in {key[1]}",
},
)
def get_inference_stats(org_db_path: Path) -> Dict[str, int]: """Get count of inferred edges by type.""" conn = sqlite3.connect(str(org_db_path)) try: cursor = conn.execute(""" SELECT edge_type, COUNT(*) as cnt FROM kg_edges WHERE properties LIKE '%"inferred": true%' OR properties LIKE '%"inferred":true%' GROUP BY edge_type ORDER BY cnt DESC """) return {row[0]: row[1] for row in cursor} except sqlite3.OperationalError: return {} finally: conn.close()
def print_stats(org_db_path: Path): """Print inference statistics.""" stats = get_inference_stats(org_db_path)
print("\nInferred Edge Statistics")
print("=" * 40)
if not stats:
print("No inferred edges found")
return
total = 0
for edge_type, count in stats.items():
print(f" {edge_type:20} {count:>8,}")
total += count
print("-" * 40)
print(f" {'TOTAL':20} {total:>8,}")
def main(): parser = argparse.ArgumentParser( description="ADR-151 Knowledge Graph Inference Engine", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Rules: depends_on File dependencies from cross-file function calls related_to Component co-occurrence in sessions (default threshold: 3) learns_from Skill learnings linked to originating sessions implements Components linked to ADRs they reference supersedes Newer decisions superseding older same-type decisions
Examples: # Run all rules python3 scripts/knowledge_graph/inference_engine.py
# Dry run
python3 scripts/knowledge_graph/inference_engine.py --dry-run
# Run specific rules
python3 scripts/knowledge_graph/inference_engine.py --rules depends_on,implements
# Adjust co-occurrence threshold for RELATED_TO
python3 scripts/knowledge_graph/inference_engine.py --min-co-occurrence 5
# Show inference statistics
python3 scripts/knowledge_graph/inference_engine.py --stats
"""
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without writing"
)
parser.add_argument(
"--rules", "-r",
type=str,
help="Comma-separated list of rules to run (default: all)"
)
parser.add_argument(
"--min-co-occurrence",
type=int,
default=3,
help="Minimum co-occurrence count for RELATED_TO (default: 3)"
)
parser.add_argument(
"--stats",
action="store_true",
help="Show inference statistics and exit"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
org_db = get_org_db_path()
if args.stats:
print_stats(org_db)
return 0
rules = None
if args.rules:
rules = [r.strip() for r in args.rules.split(",")]
for r in rules:
if r not in InferenceEngine.RULES:
print(f"Error: Unknown rule '{r}'")
print(f"Available: {', '.join(InferenceEngine.RULES.keys())}")
return 1
engine = InferenceEngine(
org_db_path=org_db,
dry_run=args.dry_run,
min_co_occurrence=args.min_co_occurrence,
)
results = engine.run(rules=rules)
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print_stats(org_db)
return 0
if name == "main": sys.exit(main())