Skip to main content

scripts-kg-index-updater

#!/usr/bin/env python3 """ Knowledge Graph Index Updater

Indexes tool, model, and session data from sessions.db into org.db's knowledge graph (kg_nodes + kg_edges). Designed to run regularly to keep the KG current with the latest session analytics.

Usage: python3 scripts/trajectory/kg_index_updater.py python3 scripts/trajectory/kg_index_updater.py --dry-run python3 scripts/trajectory/kg_index_updater.py --freshness python3 scripts/trajectory/kg_index_updater.py --full-rebuild

Node types created: - tool: Claude Code tools (Read, Write, Bash, Edit, etc.) - llm_model: LLM models (claude-opus-4-5, etc.)

Edge types created: - USES: session -> tool, session -> llm_model - INVOKES: tool -> tool (co-occurrence in same session) """

import argparse import json import sqlite3 import sys import uuid from datetime import datetime, timezone from pathlib import Path

CONTEXT_STORAGE = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" SESSIONS_DB = CONTEXT_STORAGE / "sessions.db" ORG_DB = CONTEXT_STORAGE / "org.db"

NOW_ISO = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

def check_freshness(sessions_db: Path, org_db: Path) -> dict: """Check data freshness across both databases.""" report = {}

if sessions_db.exists():
conn = sqlite3.connect(f"file:{sessions_db}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
for table, col in [
("messages", "extracted_at"),
("tool_analytics", "created_at"),
("token_usage", "timestamp"),
]:
try:
row = conn.execute(
f"SELECT COUNT(*) as cnt, MAX({col}) as latest FROM {table}"
).fetchone()
report[f"sessions.{table}"] = {
"count": row["cnt"],
"latest": row["latest"],
}
except sqlite3.OperationalError:
report[f"sessions.{table}"] = {"count": 0, "latest": None}
conn.close()

if org_db.exists():
conn = sqlite3.connect(f"file:{org_db}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
for table, col in [
("decisions", "created_at"),
("skill_learnings", "analyzed_at"),
("error_solutions", "created_at"),
("kg_nodes", "updated_at"),
("kg_edges", "created_at"),
]:
try:
row = conn.execute(
f"SELECT COUNT(*) as cnt, MAX({col}) as latest FROM {table}"
).fetchone()
report[f"org.{table}"] = {
"count": row["cnt"],
"latest": row["latest"],
}
except sqlite3.OperationalError:
report[f"org.{table}"] = {"count": 0, "latest": None}
conn.close()

return report

def get_tool_stats(sessions_conn: sqlite3.Connection) -> list[dict]: """Get tool usage stats from tool_analytics for KG indexing.""" rows = sessions_conn.execute( """ SELECT tool_name, COUNT(*) as total_calls, COUNT(DISTINCT session_id) as session_count, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures, MIN(created_at) as first_seen, MAX(created_at) as last_seen FROM tool_analytics WHERE tool_name IS NOT NULL AND tool_name != '' GROUP BY tool_name ORDER BY total_calls DESC """ ).fetchall() return [dict(r) for r in rows]

def get_model_stats(sessions_conn: sqlite3.Connection) -> list[dict]: """Get model usage stats from token_usage for KG indexing.""" rows = sessions_conn.execute( """ SELECT model, COUNT(*) as call_count, COUNT(DISTINCT session_id) as session_count, SUM(input_tokens) as total_input, SUM(output_tokens) as total_output, SUM(cache_read_input_tokens) as total_cache_read, SUM(cache_creation_input_tokens) as total_cache_create, SUM(COALESCE(cost_usd, 0)) as total_cost_usd, MIN(timestamp) as first_seen, MAX(timestamp) as last_seen FROM token_usage WHERE model IS NOT NULL AND model != '' GROUP BY model ORDER BY call_count DESC """ ).fetchall() return [dict(r) for r in rows]

def get_session_tool_pairs(sessions_conn: sqlite3.Connection) -> list[dict]: """Get session-to-tool usage pairs for USES edges.""" rows = sessions_conn.execute( """ SELECT session_id, tool_name, COUNT() as call_count FROM tool_analytics WHERE session_id IS NOT NULL AND tool_name IS NOT NULL AND session_id != '' AND tool_name != '' GROUP BY session_id, tool_name HAVING COUNT() >= 5 ORDER BY call_count DESC LIMIT 5000 """ ).fetchall() return [dict(r) for r in rows]

def get_tool_cooccurrence(sessions_conn: sqlite3.Connection) -> list[dict]: """Get tool pairs that co-occur in sessions (for INVOKES edges).""" rows = sessions_conn.execute( """ SELECT a.tool_name as tool_a, b.tool_name as tool_b, COUNT(DISTINCT a.session_id) as shared_sessions FROM (SELECT DISTINCT session_id, tool_name FROM tool_analytics WHERE tool_name IS NOT NULL AND tool_name != '') a JOIN (SELECT DISTINCT session_id, tool_name FROM tool_analytics WHERE tool_name IS NOT NULL AND tool_name != '') b ON a.session_id = b.session_id AND a.tool_name < b.tool_name GROUP BY a.tool_name, b.tool_name HAVING COUNT(DISTINCT a.session_id) >= 10 ORDER BY shared_sessions DESC LIMIT 200 """ ).fetchall() return [dict(r) for r in rows]

def upsert_node( org_conn: sqlite3.Connection, node_id: str, node_type: str, name: str, subtype: str | None, properties: dict, source_table: str = "tool_analytics", ) -> bool: """Insert or update a KG node. Returns True if inserted/updated.""" existing = org_conn.execute( "SELECT id FROM kg_nodes WHERE id = ?", (node_id,) ).fetchone()

props_json = json.dumps(properties, default=str)

if existing:
org_conn.execute(
"""UPDATE kg_nodes
SET properties = ?, updated_at = ?, source_table = ?
WHERE id = ?""",
(props_json, NOW_ISO, source_table, node_id),
)
return False
else:
org_conn.execute(
"""INSERT INTO kg_nodes (id, node_type, subtype, name, properties, source_table, source_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
node_id,
node_type,
subtype,
name,
props_json,
source_table,
node_id,
NOW_ISO,
NOW_ISO,
),
)
return True

def upsert_edge( org_conn: sqlite3.Connection, from_node: str, to_node: str, edge_type: str, properties: dict, ) -> bool: """Insert or update a KG edge. Returns True if inserted.""" existing = org_conn.execute( "SELECT id FROM kg_edges WHERE from_node = ? AND to_node = ? AND edge_type = ?", (from_node, to_node, edge_type), ).fetchone()

props_json = json.dumps(properties, default=str)

if existing:
org_conn.execute(
"UPDATE kg_edges SET properties = ? WHERE id = ?",
(props_json, existing["id"]),
)
return False
else:
edge_id = str(uuid.uuid4())
org_conn.execute(
"""INSERT INTO kg_edges (id, edge_type, from_node, to_node, properties, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(edge_id, edge_type, from_node, to_node, props_json, NOW_ISO),
)
return True

def run_index( sessions_db: Path, org_db: Path, dry_run: bool = False, full_rebuild: bool = False, ) -> dict: """Run the KG index update pipeline.""" stats = { "nodes_created": 0, "nodes_updated": 0, "edges_created": 0, "edges_updated": 0, }

# Connect to sessions.db (read-only)
sessions_conn = sqlite3.connect(f"file:{sessions_db}?mode=ro", uri=True)
sessions_conn.row_factory = sqlite3.Row

# Connect to org.db (read-write for indexing)
org_conn = sqlite3.connect(str(org_db))
org_conn.row_factory = sqlite3.Row

# 1. Index tool nodes
print(" Indexing tool nodes...")
tool_stats = get_tool_stats(sessions_conn)
for t in tool_stats:
node_id = f"tool:{t['tool_name']}"
props = {
"total_calls": t["total_calls"],
"session_count": t["session_count"],
"successes": t["successes"],
"failures": t["failures"],
"success_rate": round(
100.0 * (t["successes"] or 0) / ((t["successes"] or 0) + (t["failures"] or 0)), 1
)
if (t["successes"] or 0) + (t["failures"] or 0) > 0
else 100.0,
"first_seen": t["first_seen"],
"last_seen": t["last_seen"],
"indexed_at": NOW_ISO,
}
if not dry_run:
is_new = upsert_node(org_conn, node_id, "tool", t["tool_name"], "claude_tool", props)
stats["nodes_created" if is_new else "nodes_updated"] += 1
else:
print(f" [dry-run] Would upsert node: {node_id} ({t['total_calls']:,} calls)")

# 2. Index LLM model nodes
print(" Indexing model nodes...")
model_stats = get_model_stats(sessions_conn)
for m in model_stats:
node_id = f"llm_model:{m['model']}"
total_tokens = (
(m["total_input"] or 0)
+ (m["total_output"] or 0)
+ (m["total_cache_read"] or 0)
+ (m["total_cache_create"] or 0)
)
props = {
"call_count": m["call_count"],
"session_count": m["session_count"],
"total_input_tokens": m["total_input"] or 0,
"total_output_tokens": m["total_output"] or 0,
"total_cache_read_tokens": m["total_cache_read"] or 0,
"total_cache_create_tokens": m["total_cache_create"] or 0,
"total_tokens": total_tokens,
"total_cost_usd": round(m["total_cost_usd"] or 0, 4),
"first_seen": m["first_seen"],
"last_seen": m["last_seen"],
"indexed_at": NOW_ISO,
}
if not dry_run:
is_new = upsert_node(
org_conn, node_id, "llm_model", m["model"], "anthropic", props, "token_usage"
)
stats["nodes_created" if is_new else "nodes_updated"] += 1
else:
print(f" [dry-run] Would upsert node: {node_id} ({m['call_count']} calls)")

# 3. Create tool co-occurrence edges (INVOKES)
print(" Indexing tool co-occurrence edges...")
cooccurrences = get_tool_cooccurrence(sessions_conn)
for co in cooccurrences:
from_id = f"tool:{co['tool_a']}"
to_id = f"tool:{co['tool_b']}"
props = {
"shared_sessions": co["shared_sessions"],
"weight": min(1.0, co["shared_sessions"] / 100.0),
"indexed_at": NOW_ISO,
}
if not dry_run:
is_new = upsert_edge(org_conn, from_id, to_id, "INVOKES", props)
stats["edges_created" if is_new else "edges_updated"] += 1
else:
print(
f" [dry-run] Would upsert edge: {co['tool_a']} -> {co['tool_b']} ({co['shared_sessions']} sessions)"
)

# 4. Create session -> tool USES edges (top sessions only)
print(" Indexing session-tool edges...")
session_tools = get_session_tool_pairs(sessions_conn)
for st in session_tools:
session_node = f"session:{st['session_id']}"
tool_node = f"tool:{st['tool_name']}"
# Only create edge if session node exists in KG
exists = org_conn.execute(
"SELECT 1 FROM kg_nodes WHERE id = ?", (session_node,)
).fetchone()
if not exists:
continue
props = {
"call_count": st["call_count"],
"weight": min(1.0, st["call_count"] / 50.0),
"indexed_at": NOW_ISO,
}
if not dry_run:
is_new = upsert_edge(org_conn, session_node, tool_node, "USES", props)
stats["edges_created" if is_new else "edges_updated"] += 1

if not dry_run:
org_conn.commit()

sessions_conn.close()
org_conn.close()

return stats

def main(): parser = argparse.ArgumentParser( description="Knowledge Graph Index Updater - indexes session data into org.db KG" ) parser.add_argument( "--sessions-db", type=Path, default=SESSIONS_DB, help="Path to sessions.db", ) parser.add_argument( "--org-db", type=Path, default=ORG_DB, help="Path to org.db", ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be indexed without writing", ) parser.add_argument( "--full-rebuild", action="store_true", help="Rebuild all indexed nodes/edges from scratch", ) parser.add_argument( "--freshness", action="store_true", help="Check data freshness and exit", )

args = parser.parse_args()

if args.freshness:
print("Data Freshness Report")
print("=" * 60)
report = check_freshness(args.sessions_db, args.org_db)
for key, val in sorted(report.items()):
age = ""
if val["latest"]:
try:
ts = val["latest"].replace("Z", "+00:00")
if "T" not in ts:
ts = ts.replace(" ", "T") + "+00:00"
dt = datetime.fromisoformat(ts)
delta = datetime.now(timezone.utc) - dt
if delta.days > 0:
age = f" ({delta.days}d ago)"
else:
hours = delta.seconds // 3600
age = f" ({hours}h ago)"
except (ValueError, TypeError):
pass
print(f" {key:30s} {val['count']:>10,} rows latest: {val['latest'] or 'N/A'}{age}")
sys.exit(0)

for db_path, label in [(args.sessions_db, "sessions.db"), (args.org_db, "org.db")]:
if not db_path.exists():
print(f"Error: {label} not found at {db_path}", file=sys.stderr)
sys.exit(1)

mode = "DRY RUN" if args.dry_run else "LIVE"
print(f"KG Index Updater [{mode}]")
print(f" sessions.db: {args.sessions_db}")
print(f" org.db: {args.org_db}")

stats = run_index(args.sessions_db, args.org_db, args.dry_run, args.full_rebuild)

print(f"\nResults:")
print(f" Nodes created: {stats['nodes_created']}")
print(f" Nodes updated: {stats['nodes_updated']}")
print(f" Edges created: {stats['edges_created']}")
print(f" Edges updated: {stats['edges_updated']}")
print(f" Total changes: {sum(stats.values())}")

if name == "main": main()