Skip to main content

#!/usr/bin/env python3 """ Trajectory Dashboard Data Adapter

Extracts tool_analytics data from sessions.db and knowledge base data from org.db, transforms into chart-ready JSON for the trajectory dashboard.

Usage: python3 scripts/trajectory/dashboard_data_adapter.py python3 scripts/trajectory/dashboard_data_adapter.py --timeframe month python3 scripts/trajectory/dashboard_data_adapter.py --output path/to/data.json python3 scripts/trajectory/dashboard_data_adapter.py --test """

import argparse import glob as globmod import json import os import re import sqlite3 import sys import tempfile from datetime import datetime, timezone from pathlib import Path

Default paths

CONTEXT_STORAGE = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" SESSIONS_DB = CONTEXT_STORAGE / "sessions.db" PROJECTS_DB = CONTEXT_STORAGE / "projects.db" ORG_DB = CONTEXT_STORAGE / "org.db" DEFAULT_OUTPUT = ( Path(file).resolve().parent.parent.parent / "tools" / "trajectory-dashboard" / "src" / "generated" / "data.json" )

TIMEFRAME_DAYS = { "today": 1, "week": 7, "sprint": 14, "month": 30, "all": 3650, }

def write_json_atomic(data: dict, output_path: Path, indent: int | None = None) -> None: """Write JSON to file atomically with post-write validation.

Writes to a temp file in the same directory, validates the JSON is well-formed,
then renames (atomic on POSIX). This prevents corrupt data.json from partial writes
or concurrent invocations.
"""
output_path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
suffix=".json.tmp", dir=str(output_path.parent)
)
try:
with os.fdopen(fd, "w") as f:
json.dump(data, f, indent=indent)
# Validate: re-read and parse to confirm well-formed JSON
with open(tmp_path, "r") as f:
json.load(f)
os.replace(tmp_path, str(output_path))
except Exception:
# Clean up temp file on failure
try:
os.unlink(tmp_path)
except OSError:
pass
raise

def ensure_indexes(db_path: Path) -> None: """Create performance indexes if they don't already exist.""" conn = sqlite3.connect(str(db_path)) conn.execute( "CREATE INDEX IF NOT EXISTS idx_tool_analytics_created_at ON tool_analytics(created_at)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_tool_analytics_status_created ON tool_analytics(status, created_at)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_messages_extracted_at ON messages(extracted_at)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_token_economics_created_at ON token_economics(created_at)" ) conn.commit() conn.close()

def get_connection(db_path: Path) -> sqlite3.Connection: """Open a connection to sessions.db, creating indexes first if needed.""" if not db_path.exists(): print(f"Error: Database not found at {db_path}", file=sys.stderr) sys.exit(1) ensure_indexes(db_path) conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) conn.row_factory = sqlite3.Row return conn

def get_summary(conn: sqlite3.Connection, days: int) -> dict: """Compute aggregate session summary metrics from tool_analytics + token_usage + messages.""" cutoff = f"-{days} days"

# Tool analytics summary with full status breakdown and I/O sizes
ta_row = conn.execute(
"""
SELECT
COUNT(*) as total_events,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures,
SUM(CASE WHEN status = 'invoked' THEN 1 ELSE 0 END) as invoked,
COUNT(DISTINCT tool_name) as active_tools,
COUNT(DISTINCT agent_name) as active_agents,
COALESCE(SUM(COALESCE(input_size_bytes, input_size, 0)), 0) as total_input_bytes,
COALESCE(SUM(COALESCE(output_size_bytes, output_size, 0)), 0) as total_output_bytes
FROM tool_analytics
WHERE created_at > datetime('now', ?)
""",
(cutoff,),
).fetchone()

# Session count from messages (most complete source; uses extracted_at column)
session_row = conn.execute(
"""
SELECT COUNT(DISTINCT session_id) as total_sessions
FROM messages
WHERE extracted_at > datetime('now', ?)
""",
(cutoff,),
).fetchone()

# Token totals and cost from token_economics table
token_row = conn.execute(
"""
SELECT
COALESCE(SUM(token_input + token_cache_read + token_cache_write + token_output), 0) as total_tokens,
COALESCE(SUM(cost_total_usd), 0) as total_cost_usd
FROM token_economics
WHERE created_at > datetime('now', ?)
""",
(cutoff,),
).fetchone()

total_events = ta_row["total_events"] or 0
successes = ta_row["successes"] or 0
failures = ta_row["failures"] or 0
invoked = ta_row["invoked"] or 0
completed = successes + failures
total_sessions = session_row["total_sessions"] or 0

return {
"totalSessions": total_sessions,
"totalEvents": total_events,
"totalTokens": token_row["total_tokens"] or 0,
"totalCostUsd": round(float(token_row["total_cost_usd"] or 0), 2),
"totalInputBytes": ta_row["total_input_bytes"] or 0,
"totalOutputBytes": ta_row["total_output_bytes"] or 0,
"successCount": successes,
"failedCount": failures,
"invokedCount": invoked,
"successRate": round(100.0 * successes / completed, 1) if completed else 0,
"activeTools": ta_row["active_tools"] or 0,
"activeAgents": ta_row["active_agents"] or 0,
}

def get_timeline(conn: sqlite3.Connection, days: int) -> list: """Get time-series data points grouped by date, merging tool_analytics and token_usage.""" cutoff = f"-{days} days"

# Tool calls per day
ta_rows = conn.execute(
"""
SELECT
DATE(created_at) as date,
COUNT(DISTINCT session_id) as sessions,
COUNT(*) as tool_calls,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as errors
FROM tool_analytics
WHERE created_at > datetime('now', ?)
GROUP BY DATE(created_at)
ORDER BY date
""",
(cutoff,),
).fetchall()

# Token usage and cost per day
token_by_date = {}
cost_by_date = {}
try:
tk_rows = conn.execute(
"""
SELECT
DATE(created_at) as date,
SUM(token_input + token_cache_read + token_cache_write + token_output) as tokens,
COALESCE(SUM(cost_total_usd), 0) as cost_usd
FROM token_economics
WHERE created_at > datetime('now', ?)
GROUP BY DATE(created_at)
""",
(cutoff,),
).fetchall()
for row in tk_rows:
token_by_date[row["date"]] = row["tokens"] or 0
cost_by_date[row["date"]] = round(float(row["cost_usd"] or 0), 2)
except sqlite3.OperationalError:
pass

return [
{
"date": row["date"],
"sessions": row["sessions"],
"tokens": token_by_date.get(row["date"], 0),
"costUsd": cost_by_date.get(row["date"], 0),
"toolCalls": row["tool_calls"],
"errors": row["errors"],
}
for row in ta_rows
]

def get_agent_activity(conn: sqlite3.Connection, days: int) -> list: """Get agent dispatch counts, I/O throughput, and token/cost data.""" cutoff = f"-{days} days" rows = conn.execute( """ SELECT COALESCE(agent_name, 'unknown') as agent_name, COUNT() as dispatches, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures, SUM(CASE WHEN status = 'invoked' THEN 1 ELSE 0 END) as invoked, COALESCE(SUM(COALESCE(input_size_bytes, input_size, 0)), 0) as total_input_bytes, COALESCE(SUM(COALESCE(output_size_bytes, output_size, 0)), 0) as total_output_bytes FROM tool_analytics WHERE created_at > datetime('now', ?) AND agent_name IS NOT NULL AND agent_name != '' GROUP BY agent_name HAVING COUNT() >= 5 ORDER BY dispatches DESC LIMIT 15 """, (cutoff,), ).fetchall()

# Batch-fetch token/cost data per agent from token_economics
agent_tokens: dict[str, dict] = {}
try:
tk_rows = conn.execute(
"""
SELECT
agent_name,
SUM(token_input + token_cache_read + token_cache_write + token_output) as total_tokens,
COALESCE(SUM(cost_total_usd), 0) as total_cost_usd
FROM token_economics
WHERE created_at > datetime('now', ?)
AND agent_name IS NOT NULL AND agent_name != ''
GROUP BY agent_name
""",
(cutoff,),
).fetchall()
for r in tk_rows:
agent_tokens[r["agent_name"]] = {
"totalTokens": r["total_tokens"] or 0,
"totalCostUsd": round(float(r["total_cost_usd"] or 0), 4),
}
except sqlite3.OperationalError:
pass

agents = []
for row in rows:
agent_name = row["agent_name"]
# Get top tools for this agent
top_tools_rows = conn.execute(
"""
SELECT tool_name, COUNT(*) as cnt
FROM tool_analytics
WHERE agent_name = ? AND created_at > datetime('now', ?)
AND tool_name IS NOT NULL AND tool_name != ''
GROUP BY tool_name
ORDER BY cnt DESC
LIMIT 3
""",
(agent_name, cutoff),
).fetchall()
top_tools = [r["tool_name"] for r in top_tools_rows]

# Get top task_ids for this agent
top_task_rows = conn.execute(
"""
SELECT task_id, COUNT(*) as cnt
FROM tool_analytics
WHERE agent_name = ? AND created_at > datetime('now', ?)
AND task_id IS NOT NULL AND task_id != ''
GROUP BY task_id
ORDER BY cnt DESC
LIMIT 3
""",
(agent_name, cutoff),
).fetchall()
top_task_ids = [r["task_id"] for r in top_task_rows]

# Infer role from tool usage
inferred_role = _infer_agent_role(top_tools)

# Token/cost from token_economics
tk = agent_tokens.get(agent_name, {})
successes = row["successes"] or 0
failures = row["failures"] or 0
completed = successes + failures

agents.append({
"agentName": agent_name,
"dispatches": row["dispatches"],
"successCount": successes,
"failedCount": failures,
"invokedCount": row["invoked"] or 0,
"successRate": round(100.0 * successes / completed, 1) if completed else 0,
"totalTokens": tk.get("totalTokens", 0),
"totalCostUsd": tk.get("totalCostUsd", 0),
"totalInputBytes": row["total_input_bytes"] or 0,
"totalOutputBytes": row["total_output_bytes"] or 0,
"topTools": top_tools,
"topTaskIds": top_task_ids,
"inferredRole": inferred_role,
})
return agents

def _infer_agent_role(top_tools: list[str]) -> str: """Infer agent role from its most-used tools.""" if not top_tools: return "General Purpose" primary = (top_tools[0] or "").lower() if primary in ("bash",): return "DevOps/Build" if primary in ("edit",): return "Code Author" if primary in ("read", "grep", "glob"): return "Explorer/Researcher" if primary in ("websearch", "webfetch"): return "Web Researcher" if primary in ("write",): return "Content Creator" if primary in ("task",): return "Orchestrator" return "General Purpose"

def get_token_usage(conn: sqlite3.Connection, days: int) -> list: """Get token usage and cost by model from the token_economics table.""" try: rows = conn.execute( """ SELECT COALESCE(model_name, 'unknown') as model, COUNT(*) as call_count, SUM(token_input + token_cache_read + token_cache_write) as prompt_tokens, SUM(token_output) as completion_tokens, COALESCE(SUM(cost_input_usd), 0) as cost_input_usd, COALESCE(SUM(cost_output_usd), 0) as cost_output_usd, COALESCE(SUM(cost_cache_usd), 0) as cost_cache_usd, COALESCE(SUM(cost_total_usd), 0) as cost_total_usd FROM token_economics WHERE created_at > datetime('now', ?) GROUP BY model_name ORDER BY call_count DESC """, (f"-{days} days",), ).fetchall()

    return [
{
"model": row["model"],
"promptTokens": row["prompt_tokens"] or 0,
"completionTokens": row["completion_tokens"] or 0,
"totalTokens": (row["prompt_tokens"] or 0) + (row["completion_tokens"] or 0),
"callCount": row["call_count"],
"costInputUsd": round(float(row["cost_input_usd"] or 0), 2),
"costOutputUsd": round(float(row["cost_output_usd"] or 0), 2),
"costCacheUsd": round(float(row["cost_cache_usd"] or 0), 2),
"costTotalUsd": round(float(row["cost_total_usd"] or 0), 2),
}
for row in rows
if (row["prompt_tokens"] or 0) + (row["completion_tokens"] or 0) > 0
]
except sqlite3.OperationalError:
return []

def get_error_distribution(conn: sqlite3.Connection, days: int) -> list: """Get error type distribution with recovery information.""" rows = conn.execute( """ SELECT COALESCE(error_type, 'unknown') as error_type, COUNT(*) as count, MAX(created_at) as last_seen FROM tool_analytics WHERE created_at > datetime('now', ?) AND status = 'failed' AND error_type IS NOT NULL AND error_type != '' GROUP BY error_type ORDER BY count DESC LIMIT 10 """, (f"-{days} days",), ).fetchall()

return [
{
"errorType": row["error_type"],
"count": row["count"],
"recoverableCount": None, # Recovery tracking not yet instrumented
"lastSeen": row["last_seen"],
}
for row in rows
]

def get_tool_performance(conn: sqlite3.Connection, days: int) -> list: """Get tool performance metrics: call counts, I/O throughput, status breakdown, category.""" cutoff = f"-{days} days"

rows = conn.execute(
"""
SELECT
tool_name,
COALESCE(tool_category, 'unknown') as tool_category,
COUNT(*) as call_count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures,
SUM(CASE WHEN status = 'invoked' THEN 1 ELSE 0 END) as invoked,
ROUND(AVG(COALESCE(input_size_bytes, input_size, 0)), 0) as avg_input_bytes,
ROUND(AVG(COALESCE(output_size_bytes, output_size, 0)), 0) as avg_output_bytes,
COALESCE(SUM(COALESCE(input_size_bytes, input_size, 0)), 0) as total_input_bytes,
COALESCE(SUM(COALESCE(output_size_bytes, output_size, 0)), 0) as total_output_bytes
FROM tool_analytics
WHERE created_at > datetime('now', ?)
GROUP BY tool_name
HAVING COUNT(*) >= 10
ORDER BY call_count DESC
LIMIT 15
""",
(cutoff,),
).fetchall()

result = []
for row in rows:
successes = row["successes"] or 0
failures = row["failures"] or 0
completed = successes + failures
success_rate = round(100.0 * successes / completed, 1) if completed else 0

result.append(
{
"toolName": row["tool_name"],
"toolCategory": row["tool_category"],
"callCount": row["call_count"],
"successCount": successes,
"failedCount": failures,
"invokedCount": row["invoked"] or 0,
"successRate": success_rate,
"avgInputBytes": int(row["avg_input_bytes"] or 0),
"avgOutputBytes": int(row["avg_output_bytes"] or 0),
"totalInputBytes": row["total_input_bytes"] or 0,
"totalOutputBytes": row["total_output_bytes"] or 0,
}
)

return result

─── org.db Knowledge Base Integration ───────────────────────────────────────

def get_org_connection(db_path: Path) -> sqlite3.Connection | None: """Open a read-only connection to org.db. Returns None if unavailable.""" if not db_path.exists(): return None try: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) conn.row_factory = sqlite3.Row return conn except sqlite3.Error: return None

def get_knowledge_summary(org_conn: sqlite3.Connection) -> dict: """Get aggregate counts from all knowledge base tables in org.db.""" counts = {} for table, col in [ ("decisions", "created_at"), ("skill_learnings", "analyzed_at"), ("error_solutions", "created_at"), ("kg_nodes", "created_at"), ("kg_edges", "created_at"), ]: try: row = org_conn.execute( f"SELECT COUNT(*) as cnt, MAX({col}) as latest FROM {table}" ).fetchone() counts[table] = {"count": row["cnt"] or 0, "latest": row["latest"]} except sqlite3.OperationalError: counts[table] = {"count": 0, "latest": None}

return {
"totalDecisions": counts["decisions"]["count"],
"totalSkillLearnings": counts["skill_learnings"]["count"],
"totalErrorSolutions": counts["error_solutions"]["count"],
"totalKgNodes": counts["kg_nodes"]["count"],
"totalKgEdges": counts["kg_edges"]["count"],
"latestDecision": counts["decisions"]["latest"],
"latestKgUpdate": counts["kg_nodes"]["latest"],
}

def get_knowledge_graph_distribution(org_conn: sqlite3.Connection) -> dict: """Get KG node and edge type distributions for visualization.""" node_types = [] try: rows = org_conn.execute( "SELECT node_type, COUNT(*) as count FROM kg_nodes GROUP BY node_type ORDER BY count DESC" ).fetchall() node_types = [{"type": r["node_type"], "count": r["count"]} for r in rows] except sqlite3.OperationalError: pass

edge_types = []
try:
rows = org_conn.execute(
"SELECT edge_type, COUNT(*) as count FROM kg_edges GROUP BY edge_type ORDER BY count DESC"
).fetchall()
edge_types = [{"type": r["edge_type"], "count": r["count"]} for r in rows]
except sqlite3.OperationalError:
pass

return {"nodeTypes": node_types, "edgeTypes": edge_types}

def get_decision_activity(org_conn: sqlite3.Connection, days: int) -> list: """Get recent decisions grouped by type.""" try: rows = org_conn.execute( """ SELECT COALESCE(decision_type, 'general') as decision_type, COUNT(*) as count, ROUND(AVG(confidence), 2) as avg_confidence, MAX(created_at) as latest FROM decisions WHERE created_at > datetime('now', ?) GROUP BY decision_type ORDER BY count DESC """, (f"-{days} days",), ).fetchall() return [ { "decisionType": r["decision_type"], "count": r["count"], "avgConfidence": float(r["avg_confidence"] or 0), "latest": r["latest"], } for r in rows ] except sqlite3.OperationalError: return []

def get_skill_effectiveness(org_conn: sqlite3.Connection, days: int) -> dict: """Get skill learning success vs needs_work rates.""" try: row = org_conn.execute( """ SELECT COUNT(*) as total, SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, SUM(CASE WHEN outcome = 'needs_work' THEN 1 ELSE 0 END) as needs_work, ROUND(AVG(COALESCE(effectiveness_score, 0)), 2) as avg_effectiveness FROM skill_learnings WHERE analyzed_at > datetime('now', ?) """, (f"-{days} days",), ).fetchone() total = row["total"] or 0 successes = row["successes"] or 0 return { "total": total, "successes": successes, "needsWork": row["needs_work"] or 0, "successRate": round(100.0 * successes / total, 1) if total else 0, "avgEffectiveness": float(row["avg_effectiveness"] or 0), } except sqlite3.OperationalError: return {"total": 0, "successes": 0, "needsWork": 0, "successRate": 0, "avgEffectiveness": 0}

def get_track_activity(conn: sqlite3.Connection, org_conn: sqlite3.Connection | None, days: int) -> list: """Get project track activity by parsing task_id prefixes from tool_analytics and enriching with track metadata from kg_nodes.""" cutoff = f"-{days} days"

# Get track metadata from kg_nodes (node_type='track')
track_meta: dict[str, dict] = {}
if org_conn:
try:
rows = org_conn.execute(
"SELECT id, name, properties FROM kg_nodes WHERE node_type = 'track'"
).fetchall()
for r in rows:
props = {}
try:
props = json.loads(r["properties"]) if r["properties"] else {}
except (json.JSONDecodeError, TypeError):
pass
letter = props.get("track_letter", "")
if letter:
track_meta[letter.upper()] = {
"trackName": props.get("track_name", r["name"] or ""),
"domain": props.get("domain", ""),
"status": props.get("status", "active"),
}
except sqlite3.OperationalError:
pass

# Parse task_ids from tool_analytics to extract track letters
# Task ID format: Track.Section.Task (e.g., A.9.1, J.29.5, H.8.1.6)
try:
rows = conn.execute(
"""
SELECT task_id, COUNT(*) as tool_calls,
COUNT(DISTINCT agent_name) as agents,
COUNT(DISTINCT session_id) as sessions
FROM tool_analytics
WHERE created_at > datetime('now', ?)
AND task_id IS NOT NULL AND task_id != ''
GROUP BY task_id
ORDER BY tool_calls DESC
""",
(cutoff,),
).fetchall()
except sqlite3.OperationalError:
return []

# Group by track letter
tracks: dict[str, dict] = {}
for row in rows:
task_id = row["task_id"]
# Extract track letter (first segment before '.')
parts = task_id.split(".")
if not parts:
continue
letter = parts[0].upper()
# Must be alphabetic (A-AK pattern)
if not letter or not letter[0].isalpha():
continue

if letter not in tracks:
meta = track_meta.get(letter, {})
tracks[letter] = {
"trackLetter": letter,
"trackName": meta.get("trackName", f"Track {letter}"),
"domain": meta.get("domain", ""),
"status": meta.get("status", "active"),
"toolCalls": 0,
"agents": set(),
"sessions": set(),
"tasks": [],
}
tracks[letter]["toolCalls"] += row["tool_calls"]
tracks[letter]["agents"].add(row["agents"])
tracks[letter]["sessions"].add(row["sessions"])
tracks[letter]["tasks"].append({
"taskId": task_id,
"toolCalls": row["tool_calls"],
"agents": row["agents"],
"sessions": row["sessions"],
})

# Convert sets to counts and sort
result = []
for letter in sorted(tracks.keys()):
t = tracks[letter]
result.append({
"trackLetter": t["trackLetter"],
"trackName": t["trackName"],
"domain": t["domain"],
"status": t["status"],
"toolCalls": t["toolCalls"],
"taskCount": len(t["tasks"]),
"tasks": sorted(t["tasks"], key=lambda x: x["toolCalls"], reverse=True)[:20],
})
return sorted(result, key=lambda x: x["toolCalls"], reverse=True)

def get_skill_breakdown(org_conn: sqlite3.Connection, days: int) -> list: """Get per-skill-name effectiveness breakdown from skill_learnings. Note: Skill learnings are cumulative — always query ALL time regardless of dashboard timeframe.""" try: rows = org_conn.execute( """ SELECT COALESCE(skill_name, 'unknown') as skill_name, COUNT() as total, SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, SUM(CASE WHEN outcome = 'needs_work' THEN 1 ELSE 0 END) as needs_work, ROUND(AVG(COALESCE(effectiveness_score, 0)), 2) as avg_effectiveness FROM skill_learnings GROUP BY skill_name HAVING COUNT() >= 2 ORDER BY total DESC LIMIT 30 """, ).fetchall() return [ { "skillName": r["skill_name"], "total": r["total"], "successes": r["successes"] or 0, "needsWork": r["needs_work"] or 0, "avgEffectiveness": float(r["avg_effectiveness"] or 0), } for r in rows ] except sqlite3.OperationalError: return []

def get_error_solutions_summary(org_conn: sqlite3.Connection, days: int) -> list: """Get top error solutions from org.db.""" try: rows = org_conn.execute( """ SELECT COALESCE(error_type, 'unknown') as error_type, COUNT(*) as count, SUM(success_count) as total_successes, MAX(created_at) as latest FROM error_solutions WHERE created_at > datetime('now', ?) GROUP BY error_type ORDER BY count DESC LIMIT 10 """, (f"-{days} days",), ).fetchall() return [ { "errorType": r["error_type"], "solutionCount": r["count"], "totalSuccesses": r["total_successes"] or 0, "latest": r["latest"], } for r in rows ] except sqlite3.OperationalError: return []

def parse_track_files(tracks_dirs: list[Path]) -> list[dict]: """Parse TRACK markdown files from pilot-tracks/ and tracks/ to extract task descriptions, completion status, and progress percentages.""" tracks: dict[str, dict] = {}

for tracks_dir in tracks_dirs:
if not tracks_dir.exists():
continue
for fpath in sorted(tracks_dir.glob("TRACK-*.md")) + sorted(tracks_dir.glob("track-*.md")):
content = fpath.read_text(encoding="utf-8", errors="replace")

# Extract track letter from filename or frontmatter
track_letter = ""
# Try frontmatter: track: A
m = re.search(r'^track:\s*([A-Za-z]+)', content, re.MULTILINE)
if m:
track_letter = m.group(1).upper()
else:
# Try filename: TRACK-A-... or track-a-...
fname = fpath.stem.upper()
m2 = re.match(r'TRACK-([A-Z]+)-', fname)
if m2:
track_letter = m2.group(1)
if not track_letter:
continue

# Extract track name from title or H1
track_name = ""
m = re.search(r'^#\s+(?:Track\s+[A-Za-z]+[:\s]*)?(.+)', content, re.MULTILINE)
if m:
track_name = m.group(1).strip().rstrip('#').strip()
if not track_name:
m = re.search(r'^track_name:\s*(.+)', content, re.MULTILINE)
if m:
track_name = m.group(1).strip().strip("'\"")

# Extract progress percentage
percent = 0.0
m = re.search(r'(?:progress|status)[:\s]*.*?(\d+(?:\.\d+)?)\s*%', content, re.IGNORECASE)
if m:
percent = float(m.group(1))

# Extract primary agent
primary_agent = ""
m = re.search(r'primary(?:\s+agent)?[:\s]+`?([a-z][\w-]+)`?', content, re.IGNORECASE)
if m:
primary_agent = m.group(1)

# Extract repository references (e.g., **Repository:** `submodules/products/...`)
repositories: list[str] = []
for rm in re.finditer(r'\*\*Repository:\*\*\s*`([^`]+)`', content):
repo_path = rm.group(1).strip().rstrip('/')
if repo_path:
repositories.append(repo_path)

# Parse task checkboxes: - [x] A.1.1: Description or - [ ] A.1.2: Description
tasks = []
for tm in re.finditer(
r'^-\s+\[([ xX])\]\s+([A-Za-z]+\.\d[\w.]*)[:\s]+(.+?)(?:\s*[✅⚠️🟡🔴].*)?$',
content, re.MULTILINE
):
completed = tm.group(1).lower() == 'x'
tasks.append({
"taskId": tm.group(2),
"description": tm.group(3).strip(),
"status": "completed" if completed else "pending",
"toolCalls": 0,
"agents": 0,
"sessions": 0,
})

completed_count = sum(1 for t in tasks if t["status"] == "completed")
total_count = len(tasks)

# If no tasks parsed, try to get counts from the progress line
if total_count == 0 and percent > 0:
m = re.search(r'(\d+)/(\d+)\s*tasks', content, re.IGNORECASE)
if m:
completed_count = int(m.group(1))
total_count = int(m.group(2))

# Recalculate percent from tasks if we have them
if total_count > 0 and not percent:
percent = round(100.0 * completed_count / total_count, 1)

# Merge or create track entry
if track_letter in tracks:
# Merge tasks from multiple files for same track
existing = tracks[track_letter]
existing["tasks"].extend(tasks)
existing["completedTasks"] = sum(1 for t in existing["tasks"] if t["status"] == "completed")
existing["totalTasks"] = len(existing["tasks"])
if existing["totalTasks"] > 0:
existing["percentComplete"] = round(100.0 * existing["completedTasks"] / existing["totalTasks"], 1)
else:
tracks[track_letter] = {
"trackLetter": track_letter,
"trackName": track_name,
"percentComplete": percent,
"completedTasks": completed_count,
"totalTasks": total_count,
"primaryAgent": primary_agent,
"repositories": repositories,
"tasks": tasks,
}

return sorted(tracks.values(), key=lambda x: x["trackLetter"])

def parse_session_logs(logs_dir: Path) -> list[dict]: """Parse session log files and group by month for digest views.""" months: dict[str, dict] = {}

if not logs_dir.exists():
return []

# Find all SESSION-LOG-*.md files recursively
for fpath in sorted(logs_dir.rglob("SESSION-LOG-*.md")):
# Extract date from filename
m = re.search(r'SESSION-LOG-(\d{4}-\d{2}-\d{2})\.md$', fpath.name)
if not m:
continue
date_str = m.group(1)
month_key = date_str[:7] # "2026-01"

content = fpath.read_text(encoding="utf-8", errors="replace")
file_size = fpath.stat().st_size

# Count entries (## or ### headers after frontmatter)
entry_count = len(re.findall(r'^#{2,3}\s+\d{4}-\d{2}-\d{2}', content, re.MULTILINE))
if entry_count == 0:
entry_count = max(1, len(re.findall(r'^#{2,3}\s+', content, re.MULTILINE)))

# Extract task IDs mentioned
task_ids = list(set(re.findall(r'\[([A-Z]+\.\d[\w.]*)\]', content)))

# Extract key topics from headers
topics = []
for hm in re.finditer(r'^#{2,3}\s+\d{4}-\d{2}-\d{2}[T\s\w:.-]*\s*-?\s*(?:\[[^\]]*\]\s*)?(.+)', content, re.MULTILINE):
topic = hm.group(1).strip()
if topic and len(topic) > 5:
topics.append(topic[:120])

# Strip YAML frontmatter for content — keep the markdown body
body = content
if body.startswith("---"):
end = body.find("---", 3)
if end != -1:
body = body[end + 3:].lstrip("\n")

log_entry = {
"date": date_str,
"fileName": fpath.name,
"entryCount": entry_count,
"tasksCompleted": task_ids[:10],
"keyTopics": topics[:5],
"sizeKb": round(file_size / 1024, 1),
"content": body,
"filePath": str(fpath),
}

if month_key not in months:
months[month_key] = {
"month": month_key,
"logs": {}, # keyed by date for dedup
"totalEntries": 0,
}
# Deduplicate by date — merge entries from multiple machine UUIDs
if date_str in months[month_key]["logs"]:
existing = months[month_key]["logs"][date_str]
existing["entryCount"] += entry_count
existing["sizeKb"] = round(existing["sizeKb"] + log_entry["sizeKb"], 1)
existing["tasksCompleted"] = list(set(existing["tasksCompleted"] + task_ids[:10]))[:10]
existing["keyTopics"] = list(dict.fromkeys(existing["keyTopics"] + topics[:5]))[:5]
# Merge content from multiple UUID dirs
if body.strip():
existing["content"] = (existing.get("content") or "") + "\n\n---\n\n" + body
else:
months[month_key]["logs"][date_str] = log_entry
months[month_key]["totalEntries"] += entry_count

# Convert logs dicts to sorted lists and sort months descending
for m in months.values():
m["logs"] = sorted(m["logs"].values(), key=lambda x: x["date"], reverse=True)

return sorted(months.values(), key=lambda x: x["month"], reverse=True)

def generate_executive_summary(track_details: list[dict], session_logs: list[dict]) -> dict: """Generate executive summary from parsed track data.""" total_tasks = 0 completed_tasks = 0 completed_tracks = 0 in_progress_tracks = 0 not_started_tracks = 0 deferred_tracks = 0 top_active: list[dict] = []

for t in track_details:
pct = t.get("percentComplete", 0)
total = t.get("totalTasks", 0)
done = t.get("completedTasks", 0)
total_tasks += total
completed_tasks += done

if pct >= 95:
completed_tracks += 1
elif pct > 0:
in_progress_tracks += 1
else:
# Check if deferred (G, K, L, M)
if t["trackLetter"] in ("G", "K", "L", "M"):
deferred_tracks += 1
else:
not_started_tracks += 1

if total > 0:
top_active.append({
"trackLetter": t["trackLetter"],
"trackName": t.get("trackName", ""),
"percentComplete": pct,
"taskCount": total,
})

# Sort by activity (% complete * task count gives weighted importance)
top_active.sort(key=lambda x: x["taskCount"], reverse=True)

overall_pct = round(100.0 * completed_tasks / total_tasks, 1) if total_tasks else 0

# Session activity for health score
total_log_entries = sum(m["totalEntries"] for m in session_logs)

return {
"overallPercentComplete": overall_pct,
"totalTracks": len(track_details),
"completedTracks": completed_tracks,
"inProgressTracks": in_progress_tracks,
"notStartedTracks": not_started_tracks,
"deferredTracks": deferred_tracks,
"totalTasks": total_tasks,
"completedTasks": completed_tasks,
"remainingTasks": total_tasks - completed_tasks,
"topActiveTracks": top_active[:8],
"totalSessionLogEntries": total_log_entries,
"healthScore": "on-track" if overall_pct > 60 else ("attention" if overall_pct > 30 else "at-risk"),
}

def get_project_registry(logs_dir: Path | None = None) -> list[dict]: """Build project registry from projects.db + session log directories.

Discovers all known projects by merging:
1. projects.db registered projects
2. sessions.db projects table
3. Session log directory structure (PILOT, CUST-avivatec-fpa, etc.)
Per-project stats come from parsing session log files in each project dir.
"""
projects: dict[str, dict] = {}

# Source 1: projects.db
if PROJECTS_DB.exists():
try:
pconn = sqlite3.connect(str(PROJECTS_DB))
pconn.row_factory = sqlite3.Row
for row in pconn.execute(
"SELECT project_uuid, name, path, project_type, status, description, "
"primary_language, framework, created_at, updated_at FROM projects"
):
pid = row["project_uuid"]
projects[pid] = {
"projectId": pid,
"name": row["name"],
"path": row["path"] or "",
"projectType": row["project_type"] or "standalone",
"status": row["status"] or "active",
"description": row["description"] or "",
"language": row["primary_language"] or "",
"framework": row["framework"] or "",
"source": "projects.db",
"sessionLogEntries": 0,
"sessionLogDays": 0,
"firstActivity": row["created_at"] or "",
"lastActivity": row["updated_at"] or "",
"sessionLogMonths": [],
}
pconn.close()
except Exception:
pass

# Source 2: sessions.db projects table
if SESSIONS_DB.exists():
try:
sconn = sqlite3.connect(str(SESSIONS_DB))
sconn.row_factory = sqlite3.Row
for row in sconn.execute("SELECT id, name, description, status, owner FROM projects"):
pid = row["id"]
if pid not in projects:
projects[pid] = {
"projectId": pid,
"name": row["name"] or pid,
"path": "",
"projectType": "standalone",
"status": row["status"] or "active",
"description": row["description"] or "",
"language": "",
"framework": "",
"source": "sessions.db",
"sessionLogEntries": 0,
"sessionLogDays": 0,
"firstActivity": "",
"lastActivity": "",
"sessionLogMonths": [],
}
sconn.close()
except Exception:
pass

# Known aliases: session log directory name -> projects.db project name
# PILOT is the session log directory for the coditect-core project
DIR_ALIASES: dict[str, str] = {
"PILOT": "coditect-core",
}

# Source 3: Session log directories
if logs_dir and logs_dir.exists():
projects_root = logs_dir / "projects"
if projects_root.exists():
for proj_dir in sorted(projects_root.iterdir()):
if not proj_dir.is_dir() or proj_dir.name.startswith("."):
continue
dir_name = proj_dir.name # e.g., "PILOT", "CUST-avivatec-fpa"

# Parse session logs for this project specifically
months_data = parse_session_logs(proj_dir)

total_entries = 0
total_days = 0
first_date = ""
last_date = ""
for month in months_data:
total_entries += month["totalEntries"]
total_days += len(month["logs"])
for log in month["logs"]:
d = log["date"]
if not first_date or d < first_date:
first_date = d
if not last_date or d > last_date:
last_date = d

# Try to find matching project: check alias first, then fuzzy name
found = False
alias_target = DIR_ALIASES.get(dir_name, "").lower()
for p in projects.values():
pname = p["name"].lower().replace(" ", "-")
if alias_target and alias_target == pname:
found = True
elif dir_name.lower() in pname or pname in dir_name.lower():
found = True
if found:
p["sessionLogEntries"] += total_entries
p["sessionLogDays"] += total_days
p["sessionLogMonths"] = (p.get("sessionLogMonths") or []) + months_data
if first_date and (not p["firstActivity"] or first_date < p["firstActivity"]):
p["firstActivity"] = first_date
if last_date and (not p["lastActivity"] or last_date > p["lastActivity"]):
p["lastActivity"] = last_date
break

if not found:
projects[dir_name] = {
"projectId": dir_name,
"name": dir_name,
"path": str(proj_dir),
"projectType": "standalone",
"status": "active",
"description": "",
"language": "",
"framework": "",
"source": "session-logs",
"sessionLogEntries": total_entries,
"sessionLogDays": total_days,
"firstActivity": first_date,
"lastActivity": last_date,
"sessionLogMonths": months_data,
}

# Scan project directories for artifacts and README intros
for p in projects.values():
raw_path = p.get("path", "")
proj_path = Path(raw_path) if raw_path else None
if proj_path and proj_path.exists() and proj_path.is_dir():
p["artifacts"] = scan_project_artifacts(proj_path)
p["intro"] = read_project_readme_intro(proj_path)
else:
p["intro"] = ""

return sorted(projects.values(), key=lambda x: x.get("lastActivity", ""), reverse=True)

def read_project_readme_intro(proj_path: Path) -> str: """Extract the first substantive paragraph from a project's README.md.

Skips YAML frontmatter, headings, badge lines, blank lines, and copyright
notices to find the first real prose paragraph that describes the project.
Returns empty string if no README or no qualifying paragraph found.
"""
readme = proj_path / "README.md"
if not readme.exists():
return ""
try:
lines = readme.read_text(encoding="utf-8", errors="replace").splitlines()
except Exception:
return ""

# Skip YAML frontmatter
i = 0
if lines and lines[0].strip() == "---":
i = 1
while i < len(lines) and lines[i].strip() != "---":
i += 1
i += 1 # skip closing ---

# Find first paragraph that is not a heading, badge, copyright, blank, or HR
para_lines: list[str] = []
while i < len(lines):
line = lines[i].strip()
i += 1
# Skip blanks, headings, badges, copyright, HRs, HTML
if not line:
if para_lines:
break # end of paragraph
continue
if line.startswith("#") or line.startswith("![") or line.startswith("---"):
if para_lines:
break
continue
if line.startswith("**Copyright") or line.startswith("**Developed by"):
if para_lines:
break
continue
if line.startswith("<") or line.startswith("|"):
if para_lines:
break
continue
para_lines.append(line)

return " ".join(para_lines).strip()

def scan_project_artifacts(proj_path: Path) -> dict: """Scan a project directory and inventory source files, docs, ADRs, tests, commits.""" artifacts: dict = { "sourceFiles": {}, # extension -> count "totalSourceFiles": 0, "totalSourceLines": 0, "docs": [], # list of {name, path, sizeKb} "adrs": [], # list of {name, title} "testCount": 0, "crates": [], # Rust crates/packages "commits": [], # last 10 git commits "migrations": [], # DB migration files "infrastructure": [], # tf, yaml, Dockerfile etc. }

skip_dirs = {".git", "node_modules", "target", ".next", "dist", "build",
"__pycache__", ".venv", "venv", "coverage-report", ".coditect"}

source_exts = {".rs", ".ts", ".tsx", ".js", ".jsx", ".py", ".go", ".java", ".sql"}
infra_exts = {".tf", ".yaml", ".yml", ".toml"}

for fpath in proj_path.rglob("*"):
# Skip excluded directories
parts = fpath.parts
if any(p in skip_dirs for p in parts):
continue
if not fpath.is_file():
continue

ext = fpath.suffix.lower()
rel = str(fpath.relative_to(proj_path))

# Source files
if ext in source_exts:
artifacts["sourceFiles"][ext] = artifacts["sourceFiles"].get(ext, 0) + 1
artifacts["totalSourceFiles"] += 1
try:
artifacts["totalSourceLines"] += sum(1 for _ in fpath.open(encoding="utf-8", errors="replace"))
except Exception:
pass

# Markdown docs
if ext == ".md" and not fpath.name.startswith("."):
artifacts["docs"].append({
"name": fpath.name,
"path": rel,
"sizeKb": round(fpath.stat().st_size / 1024, 1),
})

# ADRs (in any adrs/ directory)
if "adrs" in rel.lower() and ext == ".md":
title = fpath.stem.replace("-", " ").replace("_", " ")
artifacts["adrs"].append({"name": fpath.name, "title": title})

# Infrastructure files
if ext in infra_exts or fpath.name in ("Dockerfile", "docker-compose.yml", "Makefile"):
artifacts["infrastructure"].append({"name": fpath.name, "path": rel})

# Migration files
if "migration" in rel.lower() and ext == ".sql":
artifacts["migrations"].append(fpath.name)

# Count tests: Rust #[test], Python def test_, JS it(/test(
test_count = 0
for fpath in proj_path.rglob("*"):
parts = fpath.parts
if any(p in skip_dirs for p in parts):
continue
if not fpath.is_file():
continue
ext = fpath.suffix.lower()
if ext not in source_exts:
continue
try:
content = fpath.read_text(encoding="utf-8", errors="replace")
if ext == ".rs":
test_count += content.count("#[test]")
elif ext == ".py":
test_count += len(re.findall(r'\bdef test_', content))
elif ext in (".ts", ".tsx", ".js", ".jsx"):
test_count += len(re.findall(r'\b(?:it|test)\s*\(', content))
except Exception:
pass
artifacts["testCount"] = test_count

# Rust crates
for cargo in proj_path.rglob("Cargo.toml"):
parts = cargo.parts
if any(p in skip_dirs for p in parts):
continue
try:
content = cargo.read_text(encoding="utf-8", errors="replace")
name_match = re.search(r'name\s*=\s*"([^"]+)"', content)
if name_match:
artifacts["crates"].append(name_match.group(1))
except Exception:
pass

# Git commits (last 10)
import subprocess
try:
result = subprocess.run(
["git", "log", "--oneline", "-10"],
cwd=str(proj_path), capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split("\n"):
if line.strip():
parts_line = line.strip().split(" ", 1)
if len(parts_line) == 2:
artifacts["commits"].append({
"hash": parts_line[0],
"message": parts_line[1][:120],
})
except Exception:
pass

# Deduplicate ADRs (may appear both in docs and adrs)
seen_adrs = set()
unique_adrs = []
for a in artifacts["adrs"]:
if a["name"] not in seen_adrs:
seen_adrs.add(a["name"])
unique_adrs.append(a)
artifacts["adrs"] = unique_adrs

return artifacts

─── Project Visibility Config (J.29.19) ──────────────────────────────────────

def load_project_visibility_config(config_path: Path | None = None) -> dict: """Load project visibility configuration.

Config file (dashboard-projects.json) format:
{
"visibleProjects": ["coditect-core", "PILOT", ...],
"hiddenProjects": ["BUILDER-OS", ".claude", ...],
"showDiscovered": false
}

If visibleProjects is set, only those projects are shown (allowlist).
If hiddenProjects is set, those projects are excluded (denylist).
visibleProjects takes precedence over hiddenProjects.
showDiscovered controls whether auto-discovered CWD projects appear.
"""
if not config_path:
# Auto-detect: look next to data.json in the dashboard public dir
script_dir = Path(__file__).resolve().parent
config_path = script_dir.parent.parent / "tools" / "trajectory-dashboard" / "public" / "dashboard-projects.json"

if config_path and config_path.exists():
try:
with open(config_path) as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {}

def filter_projects(projects: list[dict], include_discovered: bool = False, config: dict | None = None) -> list[dict]: """Filter project list based on visibility config and discovery flag.

Rules (in priority order):
1. If config has 'visibleProjects' list, only show those (by name, case-insensitive)
2. If config has 'hiddenProjects' list, exclude those
3. If config has 'showDiscovered', use that; otherwise use include_discovered flag
4. Default: exclude projectType="discovered" unless flagged
"""
if not config:
config = {}

visible = config.get("visibleProjects")
hidden = config.get("hiddenProjects")
show_discovered = config.get("showDiscovered", include_discovered)

result = []
for proj in projects:
name = proj.get("name", "")
name_lower = name.lower()
proj_type = proj.get("projectType", "")

# Rule 1: allowlist takes precedence
if visible is not None:
if name_lower in [v.lower() for v in visible]:
result.append(proj)
continue

# Rule 2: denylist
if hidden is not None:
if name_lower in [h.lower() for h in hidden]:
continue

# Rule 3/4: filter discovered projects
if proj_type == "discovered" and not show_discovered:
continue

result.append(proj)

return result

─── Project-Scoped BI (J.29.18) ──────────────────────────────────────────────

def _normalize_cwd_to_project(cwd: str) -> str: """Normalize a working directory path to a project name.

Strategy:
- ~/PROJECTS/coditect-rollout-master/submodules/{cat}/{repo}/... → {repo}
- ~/PROJECTS/coditect-rollout-master/... (not submodules) → coditect-rollout-master
- ~/PROJECTS/{project}/... → {project}
- ~/Downloads/{folder}/... → {folder}
- Anything else → last meaningful directory component
"""
# Normalize to remove trailing slashes
cwd = cwd.rstrip("/")
home = str(Path.home())

# Pattern: ~/PROJECTS/coditect-rollout-master/submodules/{category}/{repo}[/...]
rollout_sub = f"{home}/PROJECTS/coditect-rollout-master/submodules/"
if cwd.startswith(rollout_sub):
remainder = cwd[len(rollout_sub):]
parts = remainder.split("/")
if len(parts) >= 2:
return parts[1] # e.g., "coditect-core", "coditect-cloud-infra"
elif len(parts) == 1:
return parts[0] # category-level cwd

# Pattern: ~/PROJECTS/coditect-rollout-master[/...]
rollout = f"{home}/PROJECTS/coditect-rollout-master"
if cwd.startswith(rollout):
return "coditect-rollout-master"

# Pattern: ~/PROJECTS/{project}[/...]
projects = f"{home}/PROJECTS/"
if cwd.startswith(projects):
remainder = cwd[len(projects):]
return remainder.split("/")[0]

# Pattern: ~/Downloads/{folder}[/...]
downloads = f"{home}/Downloads/"
if cwd.startswith(downloads):
remainder = cwd[len(downloads):]
return remainder.split("/")[0] if remainder else "Downloads"

# Fallback: last directory component
return Path(cwd).name or "unknown"

def build_session_project_map(conn: sqlite3.Connection) -> dict[str, str]: """Build session_id → project_name mapping from entries.cwd.

For each session, picks the cwd with the most entries as the primary project.
Returns a dict mapping session_id to normalized project name.
"""
rows = conn.execute(
"""
SELECT session_id, cwd, COUNT(*) as cnt
FROM entries
WHERE cwd IS NOT NULL AND cwd <> ''
GROUP BY session_id, cwd
ORDER BY session_id, cnt DESC
"""
).fetchall()

# Pick top cwd per session (first row for each session_id since ordered by cnt DESC)
session_map: dict[str, str] = {}
for row in rows:
sid = row["session_id"]
if sid not in session_map:
session_map[sid] = _normalize_cwd_to_project(row["cwd"])
return session_map

def _build_project_session_sets(session_map: dict[str, str]) -> dict[str, set[str]]: """Invert session→project map to project→set[session_id].""" project_sessions: dict[str, set[str]] = {} for sid, proj in session_map.items(): project_sessions.setdefault(proj, set()).add(sid) return project_sessions

def get_project_scoped_analytics(conn: sqlite3.Connection, session_ids: set[str], days: int) -> dict: """Compute scoped analytics for a specific set of session_ids.

Returns the same shape as the top-level summary/timeline/agents/tools/tokens/errors
but filtered to only the given sessions.
"""
if not session_ids:
return {
"summary": _empty_summary(),
"timeline": [],
"agentActivity": [],
"tokenUsage": [],
"errorDistribution": [],
"toolPerformance": [],
}

cutoff = f"-{days} days"

# Create a temp table with session IDs for efficient joins
conn.execute("CREATE TEMP TABLE IF NOT EXISTS _proj_sessions (sid TEXT PRIMARY KEY)")
conn.execute("DELETE FROM _proj_sessions")
conn.executemany("INSERT OR IGNORE INTO _proj_sessions VALUES (?)",
[(s,) for s in session_ids])

# --- Summary ---
ta_row = conn.execute(
"""
SELECT
COUNT(*) as total_events,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures,
SUM(CASE WHEN status = 'invoked' THEN 1 ELSE 0 END) as invoked,
COUNT(DISTINCT tool_name) as active_tools,
COUNT(DISTINCT agent_name) as active_agents,
COALESCE(SUM(COALESCE(input_size_bytes, input_size, 0)), 0) as total_input_bytes,
COALESCE(SUM(COALESCE(output_size_bytes, output_size, 0)), 0) as total_output_bytes,
MIN(created_at) as first_event,
MAX(created_at) as last_event
FROM tool_analytics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
""",
(cutoff,),
).fetchone()

session_row = conn.execute(
"""
SELECT COUNT(DISTINCT session_id) as total_sessions
FROM tool_analytics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
""",
(cutoff,),
).fetchone()

token_row = conn.execute(
"""
SELECT
COALESCE(SUM(token_input + token_cache_read + token_cache_write + token_output), 0) as total_tokens,
COALESCE(SUM(cost_total_usd), 0) as total_cost_usd
FROM token_economics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
""",
(cutoff,),
).fetchone()

total_events = ta_row["total_events"] or 0
successes = ta_row["successes"] or 0
failures = ta_row["failures"] or 0
invoked = ta_row["invoked"] or 0
completed = successes + failures
total_sessions = session_row["total_sessions"] or 0

summary = {
"totalSessions": total_sessions,
"totalEvents": total_events,
"totalTokens": token_row["total_tokens"] or 0,
"totalCostUsd": round(float(token_row["total_cost_usd"] or 0), 2),
"totalInputBytes": ta_row["total_input_bytes"] or 0,
"totalOutputBytes": ta_row["total_output_bytes"] or 0,
"successCount": successes,
"failedCount": failures,
"invokedCount": invoked,
"successRate": round(100.0 * successes / completed, 1) if completed else 0,
"activeTools": ta_row["active_tools"] or 0,
"activeAgents": ta_row["active_agents"] or 0,
"firstEvent": ta_row["first_event"] or "",
"lastEvent": ta_row["last_event"] or "",
}

# --- Timeline ---
ta_timeline = conn.execute(
"""
SELECT
DATE(created_at) as date,
COUNT(DISTINCT session_id) as sessions,
COUNT(*) as tool_calls,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as errors
FROM tool_analytics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
GROUP BY DATE(created_at)
ORDER BY date
""",
(cutoff,),
).fetchall()

token_by_date: dict[str, int] = {}
cost_by_date: dict[str, float] = {}
try:
tk_rows = conn.execute(
"""
SELECT
DATE(created_at) as date,
SUM(token_input + token_cache_read + token_cache_write + token_output) as tokens,
COALESCE(SUM(cost_total_usd), 0) as cost_usd
FROM token_economics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
GROUP BY DATE(created_at)
""",
(cutoff,),
).fetchall()
for row in tk_rows:
token_by_date[row["date"]] = row["tokens"] or 0
cost_by_date[row["date"]] = round(float(row["cost_usd"] or 0), 2)
except sqlite3.OperationalError:
pass

timeline = [
{
"date": row["date"],
"sessions": row["sessions"],
"tokens": token_by_date.get(row["date"], 0),
"costUsd": cost_by_date.get(row["date"], 0),
"toolCalls": row["tool_calls"],
"errors": row["errors"],
}
for row in ta_timeline
]

# --- Top Agents (limit 10) ---
agent_rows = conn.execute(
"""
SELECT
COALESCE(agent_name, 'unknown') as agent_name,
COUNT(*) as dispatches,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures
FROM tool_analytics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
AND agent_name IS NOT NULL AND agent_name <> ''
GROUP BY agent_name
HAVING COUNT(*) >= 3
ORDER BY dispatches DESC
LIMIT 10
""",
(cutoff,),
).fetchall()

agents = []
for row in agent_rows:
s = row["successes"] or 0
f = row["failures"] or 0
c = s + f
agents.append({
"agentName": row["agent_name"],
"dispatches": row["dispatches"],
"successCount": s,
"failedCount": f,
"successRate": round(100.0 * s / c, 1) if c else 0,
})

# --- Top Tools (limit 10) ---
tool_rows = conn.execute(
"""
SELECT
tool_name,
COALESCE(tool_category, 'unknown') as tool_category,
COUNT(*) as call_count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successes,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures
FROM tool_analytics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
GROUP BY tool_name
HAVING COUNT(*) >= 5
ORDER BY call_count DESC
LIMIT 10
""",
(cutoff,),
).fetchall()

tools = []
for row in tool_rows:
s = row["successes"] or 0
f = row["failures"] or 0
c = s + f
tools.append({
"toolName": row["tool_name"],
"toolCategory": row["tool_category"],
"callCount": row["call_count"],
"successCount": s,
"failedCount": f,
"successRate": round(100.0 * s / c, 1) if c else 0,
})

# --- Token Usage by Model ---
token_usage = []
try:
tk_model_rows = conn.execute(
"""
SELECT
COALESCE(model_name, 'unknown') as model,
COUNT(*) as call_count,
SUM(token_input + token_cache_read + token_cache_write) as prompt_tokens,
SUM(token_output) as completion_tokens,
COALESCE(SUM(cost_total_usd), 0) as cost_total_usd
FROM token_economics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
GROUP BY model_name
ORDER BY call_count DESC
""",
(cutoff,),
).fetchall()
for row in tk_model_rows:
pt = row["prompt_tokens"] or 0
ct = row["completion_tokens"] or 0
if pt + ct > 0:
token_usage.append({
"model": row["model"],
"promptTokens": pt,
"completionTokens": ct,
"totalTokens": pt + ct,
"callCount": row["call_count"],
"costTotalUsd": round(float(row["cost_total_usd"] or 0), 2),
})
except sqlite3.OperationalError:
pass

# --- Error Distribution ---
err_rows = conn.execute(
"""
SELECT
COALESCE(error_type, 'unknown') as error_type,
COUNT(*) as count,
MAX(created_at) as last_seen
FROM tool_analytics
WHERE created_at > datetime('now', ?)
AND session_id IN (SELECT sid FROM _proj_sessions)
AND status = 'failed'
AND error_type IS NOT NULL AND error_type <> ''
GROUP BY error_type
ORDER BY count DESC
LIMIT 10
""",
(cutoff,),
).fetchall()

errors = [
{"errorType": row["error_type"], "count": row["count"], "lastSeen": row["last_seen"]}
for row in err_rows
]

# Cleanup temp table
conn.execute("DROP TABLE IF EXISTS _proj_sessions")

# Extract date range from first/last event
first_date = (summary.get("firstEvent") or "")[:10] or None
last_date = (summary.get("lastEvent") or "")[:10] or None

return {
"summary": summary,
"timeline": timeline,
"agentActivity": agents,
"tokenUsage": token_usage,
"errorDistribution": errors,
"toolPerformance": tools,
"sessionCount": len(session_ids),
"firstDate": first_date,
"lastDate": last_date,
}

def _empty_summary() -> dict: """Return a zeroed-out summary for projects with no session data.""" return { "totalSessions": 0, "totalEvents": 0, "totalTokens": 0, "totalCostUsd": 0, "totalInputBytes": 0, "totalOutputBytes": 0, "successCount": 0, "failedCount": 0, "invokedCount": 0, "successRate": 0, "activeTools": 0, "activeAgents": 0, "firstEvent": "", "lastEvent": "", }

def build_dashboard_data(conn: sqlite3.Connection, timeframe: str, org_conn: sqlite3.Connection | None = None, tracks_dirs: list[Path] | None = None, logs_dir: Path | None = None, include_discovered: bool = False, project_config: Path | None = None) -> dict: """Build complete dashboard data structure including org.db knowledge base, TRACK file details, session logs, and executive summary.""" days = TIMEFRAME_DAYS.get(timeframe, 7)

data = {
"timeFrame": timeframe,
"generatedAt": datetime.now(timezone.utc).isoformat(),
"summary": get_summary(conn, days),
"timeline": get_timeline(conn, days),
"agentActivity": get_agent_activity(conn, days),
"tokenUsage": get_token_usage(conn, days),
"errorDistribution": get_error_distribution(conn, days),
"toolPerformance": get_tool_performance(conn, days),
}

# Track activity from task_ids (works with or without org_conn)
track_activity = get_track_activity(conn, org_conn, days)
data["trackActivity"] = track_activity

# Parse TRACK files for descriptions, % complete, task details
track_details: list[dict] = []
if tracks_dirs:
track_details = parse_track_files(tracks_dirs)
# Merge tool call data from track_activity into track_details
activity_by_letter = {t["trackLetter"]: t for t in track_activity}
for td in track_details:
letter = td["trackLetter"]
if letter in activity_by_letter:
act = activity_by_letter[letter]
td["toolCalls"] = act.get("toolCalls", 0)
# Merge tool call counts into individual tasks
task_calls = {t["taskId"]: t for t in act.get("tasks", [])}
for task in td["tasks"]:
if task["taskId"] in task_calls:
tc = task_calls[task["taskId"]]
task["toolCalls"] = tc.get("toolCalls", 0)
task["agents"] = tc.get("agents", 0)
task["sessions"] = tc.get("sessions", 0)
data["trackDetails"] = track_details

# Parse session logs
session_logs: list[dict] = []
if logs_dir:
session_logs = parse_session_logs(logs_dir)
data["sessionLogs"] = session_logs

# Executive summary
data["executiveSummary"] = generate_executive_summary(track_details, session_logs)

# Project registry (projects.db + session log dirs)
data["projects"] = get_project_registry(logs_dir)

# Wire track-to-project associations via repository path matching
if track_details and data["projects"]:
for proj in data["projects"]:
associated: list[str] = []
proj_path = proj.get("path", "")
if not proj_path:
continue
for td in track_details:
for repo in td.get("repositories", []):
# Match: project path ends with the track's repository reference
if proj_path.rstrip("/").endswith(repo.rstrip("/")):
associated.append(td["trackLetter"])
break
if associated:
proj["associatedTrackLetters"] = sorted(set(associated))

# ─── Per-Project Scoped Analytics (J.29.18) ─────────────────────────────
# Build session→project map and attach scoped analytics to each project
session_map = build_session_project_map(conn)
project_sessions = _build_project_session_sets(session_map)

# Open a single writable connection for temp table operations
rw_conn = sqlite3.connect(str(SESSIONS_DB))
rw_conn.row_factory = sqlite3.Row

empty_analytics = {
"summary": _empty_summary(),
"timeline": [],
"agentActivity": [],
"tokenUsage": [],
"errorDistribution": [],
"toolPerformance": [],
}

for proj in data.get("projects", []):
proj_name = proj.get("name", "")
proj_path = proj.get("path", "")

# Match project entry to session data via normalized path or name
matched_sessions: set[str] = set()

# Direct name match
if proj_name in project_sessions:
matched_sessions = project_sessions[proj_name]
else:
# Try matching via normalized path
if proj_path:
normalized = _normalize_cwd_to_project(proj_path)
if normalized in project_sessions:
matched_sessions = project_sessions[normalized]

# Fuzzy: check if any project_sessions key is contained in name or vice versa
if not matched_sessions:
pname_lower = proj_name.lower().replace(" ", "-")
for key, sids in project_sessions.items():
key_lower = key.lower()
if key_lower in pname_lower or pname_lower in key_lower:
matched_sessions = sids
break

if matched_sessions:
proj["analytics"] = get_project_scoped_analytics(rw_conn, matched_sessions, days)
else:
proj["analytics"] = dict(empty_analytics)

# Create project entries for session groups not yet matched to any project
matched_groups: set[str] = set()
for proj in data.get("projects", []):
a = proj.get("analytics", {})
if a.get("sessionCount", 0) > 0:
# This project was matched — track which session group keys were used
proj_name = proj.get("name", "")
proj_path = proj.get("path", "")
matched_groups.add(proj_name.lower())
if proj_path:
matched_groups.add(_normalize_cwd_to_project(proj_path).lower())

for group_name, sids in project_sessions.items():
if group_name.lower() in matched_groups:
continue
if len(sids) < 2:
continue # Skip single-session groups
analytics = get_project_scoped_analytics(rw_conn, sids, days)
if analytics["summary"]["totalEvents"] == 0:
continue
first_d = analytics.get("firstDate", "")
last_d = analytics.get("lastDate", "")
data.setdefault("projects", []).append({
"projectId": f"session-{group_name}",
"name": group_name,
"path": "",
"projectType": "discovered",
"status": "active",
"description": f"Discovered from {len(sids)} sessions via working directory analysis.",
"language": "",
"framework": "",
"source": "sessions.db",
"sessionLogEntries": 0,
"sessionLogDays": 0,
"firstActivity": first_d or "",
"lastActivity": last_d or "",
"sessionLogMonths": [],
"analytics": analytics,
})
matched_groups.add(group_name.lower())

rw_conn.close()

# Integrate org.db knowledge base if available
if org_conn:
data["knowledgeBase"] = {
"summary": get_knowledge_summary(org_conn),
"graphDistribution": get_knowledge_graph_distribution(org_conn),
"decisionActivity": get_decision_activity(org_conn, days),
"skillEffectiveness": get_skill_effectiveness(org_conn, days),
"errorSolutions": get_error_solutions_summary(org_conn, days),
"skillBreakdown": get_skill_breakdown(org_conn, days),
}

# ─── Filter Projects by Visibility (J.29.19) ─────────────────────────────
visibility_config = load_project_visibility_config(project_config)
data["projects"] = filter_projects(
data.get("projects", []),
include_discovered=include_discovered,
config=visibility_config,
)

return data

def run_test(db_path: Path, org_db_path: Path | None = None) -> bool: """Run a quick self-test against the database.""" print(f"Testing data adapter against {db_path}...") conn = get_connection(db_path) org_conn = get_org_connection(org_db_path) if org_db_path and org_db_path.exists() else None if org_conn: print(f" org.db connected: {org_db_path}")

for tf in ["today", "week", "month"]:
data = build_dashboard_data(conn, tf, org_conn)
s = data['summary']
print(f"\n [{tf}]")
print(f" Sessions: {s['totalSessions']}")
print(f" Events: {s['totalEvents']}")
print(f" Tokens: {s['totalTokens']:,}")
print(f" Cost: ${s['totalCostUsd']:,.2f}")
print(f" I/O: {s['totalInputBytes']:,} in / {s['totalOutputBytes']:,} out bytes")
print(f" Status: {s['successCount']:,} success, {s['failedCount']:,} failed, {s['invokedCount']:,} invoked")
print(f" Success Rate: {s['successRate']}%")
print(f" Timeline Points: {len(data['timeline'])}")
print(f" Agents: {len(data['agentActivity'])}")
print(f" Tools: {len(data['toolPerformance'])}")
print(f" Errors: {len(data['errorDistribution'])}")
print(f" Tracks: {len(data.get('trackActivity', []))}")
if "knowledgeBase" in data:
kb = data["knowledgeBase"]
print(f" KB Decisions: {kb['summary']['totalDecisions']}")
print(f" KB Skill Learnings: {kb['summary']['totalSkillLearnings']:,}")
print(f" KB Error Solutions: {kb['summary']['totalErrorSolutions']}")
print(f" KG Nodes: {kb['summary']['totalKgNodes']:,}")
print(f" KG Edges: {kb['summary']['totalKgEdges']:,}")
print(f" KG Node Types: {len(kb['graphDistribution']['nodeTypes'])}")
print(f" KG Edge Types: {len(kb['graphDistribution']['edgeTypes'])}")
print(f" Decision Types: {len(kb['decisionActivity'])}")
se = kb["skillEffectiveness"]
print(f" Skill Success Rate: {se['successRate']}% ({se['successes']:,}/{se['total']:,})")
print(f" Skill Breakdown: {len(kb.get('skillBreakdown', []))} skills")

conn.close()
if org_conn:
org_conn.close()
print("\nAll tests passed.")
return True

def main(): parser = argparse.ArgumentParser(description="Trajectory Dashboard Data Adapter") parser.add_argument( "--timeframe", choices=list(TIMEFRAME_DAYS.keys()), default="week", help="Time range for data extraction (default: week)", ) parser.add_argument( "--output", type=Path, default=DEFAULT_OUTPUT, help="Output JSON file path", ) parser.add_argument( "--db", type=Path, default=SESSIONS_DB, help="Path to sessions.db", ) parser.add_argument( "--org-db", type=Path, default=ORG_DB, help="Path to org.db (knowledge base)", ) parser.add_argument( "--test", action="store_true", help="Run self-test and exit", ) parser.add_argument( "--pretty", action="store_true", help="Pretty-print JSON output", ) # Auto-detect tracks directories relative to script location script_dir = Path(file).resolve().parent core_root = script_dir.parent.parent default_pilot_tracks = core_root / "internal" / "project" / "plans" / "pilot-tracks" default_ext_tracks = core_root / "internal" / "project" / "plans" / "tracks" default_logs = Path.home() / "PROJECTS" / ".coditect-data" / "session-logs"

parser.add_argument(
"--tracks-dir",
type=Path,
nargs="*",
default=[default_pilot_tracks, default_ext_tracks],
help="Directories containing TRACK-*.md files",
)
parser.add_argument(
"--logs-dir",
type=Path,
default=default_logs,
help="Directory containing session log files",
)
parser.add_argument(
"--patch-session-logs",
action="store_true",
help="Fast mode: only re-parse session logs and patch existing data.json (skips DB queries)",
)
parser.add_argument(
"--include-discovered",
action="store_true",
help="Include auto-discovered projects (derived from CWD session groups). Default: exclude.",
)
parser.add_argument(
"--project-config",
type=Path,
default=None,
help="Path to dashboard-projects.json for project visibility overrides",
)

args = parser.parse_args()

if args.test:
success = run_test(args.db, args.org_db)
sys.exit(0 if success else 1)

# Fast-path: patch only session logs into existing data.json
if args.patch_session_logs:
if not args.output.exists():
print(f"Cannot patch: {args.output} does not exist. Run full adapter first.", file=sys.stderr)
sys.exit(1)
try:
with open(args.output, "r") as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"Warning: {args.output} has corrupt JSON ({e}), cannot patch. Run full adapter.", file=sys.stderr)
sys.exit(2)
session_logs = parse_session_logs(args.logs_dir) if args.logs_dir and args.logs_dir.exists() else []
data["sessionLogs"] = session_logs
# Re-compute executive summary with existing track details
track_details = data.get("trackDetails", [])
data["executiveSummary"] = generate_executive_summary(track_details, session_logs)
# Also update per-project session log counts in projectRegistry
if "projectRegistry" in data and args.logs_dir and args.logs_dir.exists():
projects_root = args.logs_dir / "projects"
if projects_root.exists():
for proj in data["projectRegistry"]:
proj["sessionLogEntries"] = 0
proj["sessionLogDays"] = 0
proj.pop("sessionLogMonths", None)
for proj_dir in sorted(projects_root.iterdir()):
if not proj_dir.is_dir() or proj_dir.name.startswith("."):
continue
months_data = parse_session_logs(proj_dir)
total_entries = sum(m["totalEntries"] for m in months_data)
total_days = sum(len(m["logs"]) for m in months_data)
dir_name = proj_dir.name
for p in data["projectRegistry"]:
pname = p["name"].lower().replace(" ", "-")
if dir_name.lower() in pname or pname in dir_name.lower():
p["sessionLogEntries"] += total_entries
p["sessionLogDays"] += total_days
p["sessionLogMonths"] = (p.get("sessionLogMonths") or []) + months_data
break
indent = 2 if args.pretty else None
write_json_atomic(data, args.output, indent=indent)
print(f"Session logs patched into {args.output} (fast mode, validated)")
sys.exit(0)

conn = get_connection(args.db)
org_conn = get_org_connection(args.org_db) if args.org_db.exists() else None
data = build_dashboard_data(conn, args.timeframe, org_conn,
tracks_dirs=args.tracks_dir, logs_dir=args.logs_dir,
include_discovered=args.include_discovered,
project_config=args.project_config)
conn.close()
if org_conn:
org_conn.close()

# Write JSON (atomic with validation)
indent = 2 if args.pretty else None
write_json_atomic(data, args.output, indent=indent)

proj_count = len(data.get("projects", []))
print(f"Dashboard data written to {args.output} (validated)")
print(f" Time frame: {args.timeframe}")
print(f" Sessions: {data['summary']['totalSessions']}")
print(f" Events: {data['summary']['totalEvents']}")
print(f" Projects: {proj_count}" + ("" if args.include_discovered else " (discovered projects excluded)"))

if name == "main": main()