Skip to main content

#!/usr/bin/env python3 """ OPT-5a: Knowledge Graph Inference Engine (ADR-151)

Derives new edges from patterns in existing kg_nodes and kg_edges. Each inference rule scans the graph for a specific pattern and creates edges that weren't explicitly observed but can be logically inferred.

Inference Rules:

  1. DEPENDS_ON: file A depends on file B (derived from cross-file CALLS edges)
  2. RELATED_TO: components frequently co-invoked in same sessions
  3. LEARNS_FROM: skill_learning linked to originating session
  4. IMPLEMENTS: component linked to ADR it references
  5. SUPERSEDES: newer decision supersedes older same-type decision in session

Usage: python3 scripts/knowledge_graph/inference_engine.py python3 scripts/knowledge_graph/inference_engine.py --dry-run python3 scripts/knowledge_graph/inference_engine.py --rules depends_on,related_to python3 scripts/knowledge_graph/inference_engine.py --stats

Created: 2026-02-05 Author: Claude (Opus 4.6) Track: J (Memory Intelligence) Task: J.3.7.2 """

import argparse import json import logging import re import sqlite3 import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Set, Tuple

Handle imports for both module and direct execution

try: from scripts.core.paths import get_org_db_path, get_sessions_db_path except ModuleNotFoundError: _script_dir = Path(file).resolve().parent _core_root = _script_dir.parent.parent if str(_core_root) not in sys.path: sys.path.insert(0, str(_core_root)) from scripts.core.paths import get_org_db_path, get_sessions_db_path

Configure logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)

Edge tuple: (from_node_id, to_node_id, edge_type, properties)

InferredEdge = Tuple[str, str, str, Dict[str, Any]]

class InferenceEngine: """ Applies inference rules to the knowledge graph to derive new edges.

Each rule is a method that yields InferredEdge tuples. The engine
validates that both endpoints exist, avoids duplicates, and batch
upserts the results.
"""

BATCH_SIZE = 1000

# Available rules and their descriptions
RULES = {
"depends_on": "File dependencies from cross-file function calls",
"related_to": "Component co-occurrence in sessions",
"learns_from": "Skill learnings linked to originating sessions",
"implements": "Components linked to ADRs they reference",
"supersedes": "Newer decisions superseding older same-type decisions",
}

def __init__(
self,
org_db_path: Path,
dry_run: bool = False,
min_co_occurrence: int = 3,
):
self.org_db_path = org_db_path
self.dry_run = dry_run
self.min_co_occurrence = min_co_occurrence
self._conn: Optional[sqlite3.Connection] = None
self._existing_nodes: Optional[Set[str]] = None
self._existing_edges: Optional[Set[str]] = None

self.stats: Dict[str, Dict[str, int]] = {}

def connect(self) -> sqlite3.Connection:
"""Get or create database connection."""
if self._conn is None:
self._conn = sqlite3.connect(str(self.org_db_path))
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA foreign_keys = ON;")
return self._conn

def close(self):
"""Close database connection."""
if self._conn:
self._conn.close()
self._conn = None

def load_existing_nodes(self) -> Set[str]:
"""Load all node IDs for validation."""
if self._existing_nodes is not None:
return self._existing_nodes
conn = self.connect()
cursor = conn.execute("SELECT id FROM kg_nodes")
self._existing_nodes = {row[0] for row in cursor}
logger.info(f"Loaded {len(self._existing_nodes)} existing nodes")
return self._existing_nodes

def load_existing_edges(self) -> Set[str]:
"""Load all edge keys (type:from:to) to avoid duplicates."""
if self._existing_edges is not None:
return self._existing_edges
conn = self.connect()
cursor = conn.execute(
"SELECT edge_type, from_node, to_node FROM kg_edges"
)
self._existing_edges = {
f"{row[0]}:{row[1]}:{row[2]}" for row in cursor
}
logger.info(f"Loaded {len(self._existing_edges)} existing edges")
return self._existing_edges

def edge_exists(self, edge_type: str, from_node: str, to_node: str) -> bool:
"""Check if an edge already exists."""
edges = self.load_existing_edges()
return f"{edge_type}:{from_node}:{to_node}" in edges

def upsert_edges(
self,
conn: sqlite3.Connection,
edges: List[InferredEdge],
) -> Tuple[int, int]:
"""Batch upsert inferred edges."""
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
success = 0
errors = 0

for from_node, to_node, edge_type, properties in edges:
edge_id = f"{edge_type}:{from_node}:{to_node}"
properties["inferred"] = True
properties["inferred_at"] = now
props_json = json.dumps(properties, ensure_ascii=False)

try:
conn.execute("""
INSERT INTO kg_edges (
id, edge_type, from_node, to_node, properties,
created_at
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(from_node, to_node, edge_type) DO UPDATE SET
properties = excluded.properties
""", (edge_id, edge_type, from_node, to_node, props_json, now))
success += 1
except sqlite3.Error as e:
logger.debug(f"Error upserting inferred edge {edge_id}: {e}")
errors += 1

return success, errors

def run(self, rules: Optional[List[str]] = None) -> Dict[str, Dict[str, int]]:
"""
Run inference rules.

Args:
rules: List of rule names (None = all)

Returns:
Stats per rule
"""
if rules is None:
rules = list(self.RULES.keys())

conn = self.connect()
nodes = self.load_existing_nodes()
self.load_existing_edges()

logger.info("=" * 60)
logger.info("ADR-151 Knowledge Graph Inference Engine")
logger.info(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}")
logger.info(f"Rules: {', '.join(rules)}")
logger.info(f"Nodes: {len(nodes)}")
logger.info("=" * 60)

rule_methods = {
"depends_on": self._infer_depends_on,
"related_to": self._infer_related_to,
"learns_from": self._infer_learns_from,
"implements": self._infer_implements,
"supersedes": self._infer_supersedes,
}

for rule_name in rules:
if rule_name not in rule_methods:
logger.warning(f"Unknown rule: {rule_name}")
continue

logger.info(f"\n[{rule_name.upper()}] {self.RULES[rule_name]}")
logger.info("-" * 40)

rule_stats = {
"candidates": 0,
"new_edges": 0,
"skipped_exists": 0,
"skipped_missing": 0,
"errors": 0,
}

batch: List[InferredEdge] = []

try:
for edge in rule_methods[rule_name]():
from_node, to_node, edge_type, props = edge
rule_stats["candidates"] += 1

# Validate endpoints
if from_node not in nodes or to_node not in nodes:
rule_stats["skipped_missing"] += 1
continue

# Skip if already exists
if self.edge_exists(edge_type, from_node, to_node):
rule_stats["skipped_exists"] += 1
continue

if self.dry_run:
rule_stats["new_edges"] += 1
if rule_stats["new_edges"] <= 3:
logger.info(f" [DRY] {edge_type}: {from_node} -> {to_node}")
continue

batch.append(edge)
# Track in existing_edges to avoid duplicates within same run
self._existing_edges.add(f"{edge_type}:{from_node}:{to_node}")

if len(batch) >= self.BATCH_SIZE:
ok, err = self.upsert_edges(conn, batch)
conn.commit()
rule_stats["new_edges"] += ok
rule_stats["errors"] += err
batch = []

# Flush remaining
if batch and not self.dry_run:
ok, err = self.upsert_edges(conn, batch)
conn.commit()
rule_stats["new_edges"] += ok
rule_stats["errors"] += err

except Exception as e:
logger.error(f"Rule {rule_name} failed: {e}")
rule_stats["errors"] += 1

self.stats[rule_name] = rule_stats
logger.info(
f" Result: {rule_stats['new_edges']} new, "
f"{rule_stats['skipped_exists']} existing, "
f"{rule_stats['skipped_missing']} missing nodes, "
f"{rule_stats['errors']} errors"
)

# Summary
total_new = sum(s["new_edges"] for s in self.stats.values())
logger.info(f"\nTotal inferred edges: {total_new}")

self.close()
return self.stats

# ---- Inference Rules ----

def _infer_depends_on(self) -> Generator[InferredEdge, None, None]:
"""
Rule: DEPENDS_ON (file → file)

If function X in file A calls function Y in file B (A ≠ B),
then file A depends on file B.

Aggregates cross-file calls into file-level dependency edges.
"""
conn = self.connect()

# Get all CALLS edges with their source/target file mappings
# function nodes have format "function:{file_path}:{func_name}"
# file nodes have format "file:{file_path}"
cursor = conn.execute("""
SELECT
e.from_node,
e.to_node,
fn_from.properties as from_props,
fn_to.properties as to_props
FROM kg_edges e
JOIN kg_nodes fn_from ON fn_from.id = e.from_node AND fn_from.node_type = 'function'
JOIN kg_nodes fn_to ON fn_to.id = e.to_node AND fn_to.node_type = 'function'
WHERE e.edge_type = 'CALLS'
""")

# Aggregate by file pair
file_deps: Dict[Tuple[str, str], int] = defaultdict(int)

for row in cursor:
from_func = row['from_node'] # function:path:name
to_func = row['to_node']

# Extract file paths from function node IDs
# Format: function:{path}:{name}
from_parts = from_func.split(":", 2)
to_parts = to_func.split(":", 2)

if len(from_parts) < 3 or len(to_parts) < 3:
continue

from_file_path = from_parts[1]
to_file_path = to_parts[1]

# Skip self-dependencies
if from_file_path == to_file_path:
continue

from_file_node = f"file:{from_file_path}"
to_file_node = f"file:{to_file_path}"

file_deps[(from_file_node, to_file_node)] += 1

# Yield dependency edges
for (from_file, to_file), call_count in file_deps.items():
yield (
from_file,
to_file,
"DEPENDS_ON",
{
"rule": "depends_on",
"call_count": call_count,
"description": f"File depends on {to_file.split(':',1)[1]} via {call_count} function call(s)",
},
)

def _infer_related_to(self) -> Generator[InferredEdge, None, None]:
"""
Rule: RELATED_TO (component → component)

If two components are invoked in the same session at least
min_co_occurrence times, they are related.
"""
conn = self.connect()

# Build session → components map from INVOKES edges
cursor = conn.execute("""
SELECT from_node, to_node
FROM kg_edges
WHERE edge_type = 'INVOKES'
""")

session_components: Dict[str, List[str]] = defaultdict(list)
for row in cursor:
session_id = row['from_node']
component_id = row['to_node']
session_components[session_id].append(component_id)

# Count co-occurrences
co_occur: Dict[Tuple[str, str], int] = defaultdict(int)
for session_id, components in session_components.items():
unique = sorted(set(components))
# Pairwise combinations (order-independent)
for i in range(len(unique)):
for j in range(i + 1, len(unique)):
pair = (unique[i], unique[j])
co_occur[pair] += 1

# Yield edges above threshold
for (comp_a, comp_b), count in co_occur.items():
if count >= self.min_co_occurrence:
yield (
comp_a,
comp_b,
"RELATED_TO",
{
"rule": "related_to",
"co_occurrence_count": count,
"description": f"Co-invoked in {count} session(s)",
},
)

def _infer_learns_from(self) -> Generator[InferredEdge, None, None]:
"""
Rule: LEARNS_FROM (skill_learning → session)

If a skill_learning node has a session_id in its properties,
link it to the originating session node.
"""
conn = self.connect()

cursor = conn.execute("""
SELECT id, properties
FROM kg_nodes
WHERE node_type = 'skill_learning'
AND properties IS NOT NULL
""")

for row in cursor:
learning_id = row['id']
try:
props = json.loads(row['properties'])
except (json.JSONDecodeError, TypeError):
continue

# Look for session reference in properties
session_id = props.get('session_id') or props.get('session')
if not session_id:
continue

# Session nodes have format "session:{session_id}"
session_node = f"session:{session_id}"

yield (
learning_id,
session_node,
"LEARNS_FROM",
{
"rule": "learns_from",
"description": f"Learning derived from session",
},
)

def _infer_implements(self) -> Generator[InferredEdge, None, None]:
"""
Rule: IMPLEMENTS (component → adr)

If a component's properties or name reference an ADR (ADR-XXX pattern),
it implements that ADR.
"""
conn = self.connect()

# Load ADR nodes for matching
adr_cursor = conn.execute("""
SELECT id, name, properties
FROM kg_nodes
WHERE node_type = 'adr'
""")

adr_map: Dict[str, str] = {} # ADR number → node_id
for row in adr_cursor:
adr_id = row['id']
name = row['name'] or ""
# Extract ADR number from name or id
match = re.search(r'ADR[- ]?(\d+)', name, re.IGNORECASE)
if match:
adr_map[match.group(1)] = adr_id
# Also try from id (format: adr:ADR-XXX-...)
id_match = re.search(r'ADR[- ]?(\d+)', adr_id, re.IGNORECASE)
if id_match:
adr_map[id_match.group(1)] = adr_id

if not adr_map:
return

# Scan components for ADR references
comp_cursor = conn.execute("""
SELECT id, name, properties
FROM kg_nodes
WHERE node_type = 'component'
""")

for row in comp_cursor:
comp_id = row['id']
searchable = (row['name'] or "") + " " + (row['properties'] or "")

# Find all ADR-XXX references
adr_refs = set(re.findall(r'ADR[- ]?(\d+)', searchable, re.IGNORECASE))

for adr_num in adr_refs:
if adr_num in adr_map:
yield (
comp_id,
adr_map[adr_num],
"IMPLEMENTS",
{
"rule": "implements",
"adr_reference": f"ADR-{adr_num}",
"description": f"Component references ADR-{adr_num}",
},
)

def _infer_supersedes(self) -> Generator[InferredEdge, None, None]:
"""
Rule: SUPERSEDES (decision → decision)

If two decisions share the same decision_type and the newer one's
text references concepts from the older one, the newer supersedes it.

Uses property matching: same decision_type + close temporal proximity
within the same project_path.
"""
conn = self.connect()

# Load decisions grouped by (decision_type, project_path)
cursor = conn.execute("""
SELECT id, properties
FROM kg_nodes
WHERE node_type = 'decision'
AND properties IS NOT NULL
ORDER BY id
""")

# Group by (decision_type, project_path)
decision_groups: Dict[Tuple[str, str], List[Tuple[str, Dict]]] = defaultdict(list)

for row in cursor:
try:
props = json.loads(row['properties'])
except (json.JSONDecodeError, TypeError):
continue

dtype = props.get('decision_type', 'general')
project = props.get('project_path', 'unknown')
decision_groups[(dtype, project)].append((row['id'], props))

# Within each group, newer decisions supersede older ones
# Only create edges for groups with 2+ decisions
for key, decisions in decision_groups.items():
if len(decisions) < 2:
continue

# Sort by created_at if available, else by node_id
decisions.sort(
key=lambda d: d[1].get('created_at', '') or ''
)

# Each decision supersedes the one before it in the same group
for i in range(1, len(decisions)):
newer_id = decisions[i][0]
older_id = decisions[i - 1][0]

yield (
newer_id,
older_id,
"SUPERSEDES",
{
"rule": "supersedes",
"decision_type": key[0],
"project_path": key[1],
"description": f"Later {key[0]} decision in {key[1]}",
},
)

def get_inference_stats(org_db_path: Path) -> Dict[str, int]: """Get count of inferred edges by type.""" conn = sqlite3.connect(str(org_db_path)) try: cursor = conn.execute(""" SELECT edge_type, COUNT(*) as cnt FROM kg_edges WHERE properties LIKE '%"inferred": true%' OR properties LIKE '%"inferred":true%' GROUP BY edge_type ORDER BY cnt DESC """) return {row[0]: row[1] for row in cursor} except sqlite3.OperationalError: return {} finally: conn.close()

def print_stats(org_db_path: Path): """Print inference statistics.""" stats = get_inference_stats(org_db_path)

print("\nInferred Edge Statistics")
print("=" * 40)

if not stats:
print("No inferred edges found")
return

total = 0
for edge_type, count in stats.items():
print(f" {edge_type:20} {count:>8,}")
total += count

print("-" * 40)
print(f" {'TOTAL':20} {total:>8,}")

def main(): parser = argparse.ArgumentParser( description="ADR-151 Knowledge Graph Inference Engine", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Rules: depends_on File dependencies from cross-file function calls related_to Component co-occurrence in sessions (default threshold: 3) learns_from Skill learnings linked to originating sessions implements Components linked to ADRs they reference supersedes Newer decisions superseding older same-type decisions

Examples: # Run all rules python3 scripts/knowledge_graph/inference_engine.py

# Dry run
python3 scripts/knowledge_graph/inference_engine.py --dry-run

# Run specific rules
python3 scripts/knowledge_graph/inference_engine.py --rules depends_on,implements

# Adjust co-occurrence threshold for RELATED_TO
python3 scripts/knowledge_graph/inference_engine.py --min-co-occurrence 5

# Show inference statistics
python3 scripts/knowledge_graph/inference_engine.py --stats
"""
)

parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without writing"
)
parser.add_argument(
"--rules", "-r",
type=str,
help="Comma-separated list of rules to run (default: all)"
)
parser.add_argument(
"--min-co-occurrence",
type=int,
default=3,
help="Minimum co-occurrence count for RELATED_TO (default: 3)"
)
parser.add_argument(
"--stats",
action="store_true",
help="Show inference statistics and exit"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)

args = parser.parse_args()

if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

org_db = get_org_db_path()

if args.stats:
print_stats(org_db)
return 0

rules = None
if args.rules:
rules = [r.strip() for r in args.rules.split(",")]
for r in rules:
if r not in InferenceEngine.RULES:
print(f"Error: Unknown rule '{r}'")
print(f"Available: {', '.join(InferenceEngine.RULES.keys())}")
return 1

engine = InferenceEngine(
org_db_path=org_db,
dry_run=args.dry_run,
min_co_occurrence=args.min_co_occurrence,
)

results = engine.run(rules=rules)

if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print_stats(org_db)

return 0

if name == "main": sys.exit(main())