Skip to main content

scripts-activity-project-associator

#!/usr/bin/env python3 """​

title: "Activity-Project Associator" component_type: script version: "1.1.0" audience: contributor status: stable summary: "Associate LLM session activities with projects and tracks via task IDs" keywords: ['association', 'activity', 'project', 'track', 'task-id', 'session'] tokens: ~800 created: 2026-01-28 updated: 2026-01-28 script_name: "activity_project_associator.py" language: python executable: true usage: "python3 scripts/activity_project_associator.py [options]" python_version: "3.10+" dependencies: [] modifies_files: true network_access: false requires_auth: false​

Activity-Project Associator for CODITECT J.14.

Associates LLM session activities with projects and tracks based on:

  1. Explicit task IDs in tool_description (99% confidence)
  2. File paths in tool calls (85% confidence)
  3. Semantic matching (70% confidence - future)
  4. Unassociated fallback (0% confidence) - no match found

Track: J.14 (Memory - Activity-Project Association Pipeline) Task: J.14.1.1 ADR: ADR-118 (Four-Tier Database Architecture)

Database: sessions.db (TIER 3 - Regenerable) NOT context.db (deprecated)

Features:

  • Multi-LLM session discovery (Claude, Codex, Gemini)
  • Overlapping window extraction for large sessions
  • Four-tier confidence scoring with fallback defaults
  • TRACK file parsing for task definition index
  • SQLite storage in sessions.db with proper indexes

Usage: python3 scripts/activity_project_associator.py --batch python3 scripts/activity_project_associator.py --session <session_id> python3 scripts/activity_project_associator.py --report python3 scripts/activity_project_associator.py --dry-run """

import argparse import hashlib import json import os import re import sqlite3 import sys from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Set, Tuple

=============================================================================

Path Discovery (ADR-114)

=============================================================================

SCRIPT_DIR = Path(file).resolve().parent CODITECT_ROOT = SCRIPT_DIR.parent HOME = Path.home()

Add core scripts to path

sys.path.insert(0, str(SCRIPT_DIR / "core"))

try: from paths import get_context_storage_dir, get_sessions_db_path CONTEXT_STORAGE = get_context_storage_dir() SESSIONS_DB = get_sessions_db_path() except ImportError: # Fallback _user_data = HOME / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): CONTEXT_STORAGE = _user_data SESSIONS_DB = _user_data / "sessions.db" # ADR-118 Tier 3 else: CONTEXT_STORAGE = CODITECT_ROOT / "context-storage" SESSIONS_DB = CONTEXT_STORAGE / "sessions.db" # ADR-118 Tier 3

Track files location

TRACKS_DIR = CODITECT_ROOT / "internal" / "project" / "plans" / "tracks"

LLM session paths

CLAUDE_SESSIONS = HOME / ".claude" / "projects" CODEX_SESSIONS = HOME / ".codex" / "sessions" GEMINI_SESSIONS = HOME / ".gemini" / "sessions"

=============================================================================

Schema Definition

=============================================================================

ACTIVITY_ASSOCIATIONS_SCHEMA = """ -- Activity Associations Table (J.14 / ADR-118 Tier 3) -- Maps LLM session activities to projects and tracks -- Stored in sessions.db (regenerable from session data)

-- Sentinel Values: -- task_id: NULL (unknown), '_AMBIGUOUS' (multiple candidates) -- track: NULL (unknown), '_NONE' (no track), '_AMBIGUOUS' (multiple) -- project_name: NULL (unknown), '_UNKNOWN' (no project), '_AMBIGUOUS' (multiple) -- Association Methods: -- 'explicit' (0.99) - Task ID in tool_description -- 'path' (0.85) - File path matches project -- 'temporal' (0.70) - Inferred from nearby activities -- 'semantic' (0.60) - LLM content classification (future) -- 'ambiguous' (0.50) - Multiple candidates, needs resolution -- 'unassociated' (0.00) - No match found

CREATE TABLE IF NOT EXISTS activity_associations ( id INTEGER PRIMARY KEY AUTOINCREMENT,

-- Session identification
session_id TEXT NOT NULL,
message_id TEXT,
llm_source TEXT NOT NULL, -- 'claude', 'codex', 'gemini'

-- Association data
task_id TEXT, -- e.g., 'H.8.1.6', NULL, '_AMBIGUOUS'
track TEXT, -- e.g., 'H', '_NONE', '_AMBIGUOUS'
track_name TEXT, -- e.g., 'Framework', 'Unassociated', 'Ambiguous'

-- Project identification
project_path TEXT, -- Full path to project, NULL if unknown
project_name TEXT, -- e.g., 'coditect-core', '_UNKNOWN', '_AMBIGUOUS'

-- Confidence scoring (0.0 - 1.0)
confidence REAL NOT NULL DEFAULT 0.0,
association_method TEXT NOT NULL, -- See methods above

-- Resolution support (for ambiguous/unassociated)
candidate_tasks TEXT, -- JSON array of candidate task IDs
candidate_projects TEXT, -- JSON array of candidate projects
needs_review INTEGER DEFAULT 0, -- 1 if flagged for human review
resolution_notes TEXT, -- Notes from resolution attempts

-- Provenance
source_file TEXT,
source_line INTEGER,
tool_name TEXT,
tool_description TEXT,

-- Timestamps
activity_timestamp TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
resolved_at TEXT, -- When ambiguity was resolved

-- Deduplication
association_hash TEXT UNIQUE

);

-- Indexes for efficient queries CREATE INDEX IF NOT EXISTS idx_activity_track ON activity_associations(track); CREATE INDEX IF NOT EXISTS idx_activity_project ON activity_associations(project_name); CREATE INDEX IF NOT EXISTS idx_activity_confidence ON activity_associations(confidence); CREATE INDEX IF NOT EXISTS idx_activity_session ON activity_associations(session_id); CREATE INDEX IF NOT EXISTS idx_activity_task_id ON activity_associations(task_id); CREATE INDEX IF NOT EXISTS idx_activity_method ON activity_associations(association_method); CREATE INDEX IF NOT EXISTS idx_activity_timestamp ON activity_associations(activity_timestamp);

-- View: Activities by track CREATE VIEW IF NOT EXISTS activity_by_track AS SELECT track, track_name, COUNT(*) as total_activities, COUNT(DISTINCT session_id) as sessions, COUNT(DISTINCT task_id) as unique_tasks, ROUND(AVG(confidence), 2) as avg_confidence, MAX(activity_timestamp) as last_activity FROM activity_associations GROUP BY track, track_name ORDER BY total_activities DESC;

-- View: Activities by project CREATE VIEW IF NOT EXISTS activity_by_project AS SELECT project_name, COUNT(*) as total_activities, COUNT(DISTINCT session_id) as sessions, COUNT(DISTINCT track) as tracks_touched, ROUND(AVG(confidence), 2) as avg_confidence, MAX(activity_timestamp) as last_activity FROM activity_associations GROUP BY project_name ORDER BY total_activities DESC;

-- View: Association method distribution CREATE VIEW IF NOT EXISTS activity_by_method AS SELECT association_method, COUNT() as count, ROUND(100.0 * COUNT() / (SELECT COUNT(*) FROM activity_associations), 1) as percentage, ROUND(AVG(confidence), 2) as avg_confidence FROM activity_associations GROUP BY association_method ORDER BY count DESC;

-- View: Orphan activities (no task ID or project) CREATE VIEW IF NOT EXISTS orphan_activities AS SELECT session_id, llm_source, tool_name, tool_description, activity_timestamp, source_file FROM activity_associations WHERE (task_id IS NULL OR task_id = '') AND (project_name IS NULL OR project_name = '') ORDER BY activity_timestamp DESC; """

=============================================================================

Data Classes

=============================================================================

@dataclass class TaskDefinition: """Task definition parsed from TRACK files.""" task_id: str track: str track_name: str description: str status: str = "pending"

def to_dict(self) -> Dict[str, Any]:
return {
"task_id": self.task_id,
"track": self.track,
"track_name": self.track_name,
"description": self.description,
"status": self.status
}

@dataclass class ActivityAssociation: """An activity-project association.""" session_id: str llm_source: str message_id: Optional[str] = None task_id: Optional[str] = None track: Optional[str] = None track_name: Optional[str] = None project_path: Optional[str] = None project_name: Optional[str] = None confidence: float = 0.0 association_method: str = "unknown" source_file: Optional[str] = None source_line: Optional[int] = None tool_name: Optional[str] = None tool_description: Optional[str] = None activity_timestamp: Optional[str] = None

def compute_hash(self) -> str:
"""Compute unique hash for deduplication."""
hash_input = f"{self.session_id}:{self.message_id}:{self.task_id}:{self.tool_name}:{self.activity_timestamp}"
return hashlib.sha256(hash_input.encode()).hexdigest()[:32]

def to_dict(self) -> Dict[str, Any]:
return {
"session_id": self.session_id,
"message_id": self.message_id,
"llm_source": self.llm_source,
"task_id": self.task_id,
"track": self.track,
"track_name": self.track_name,
"project_path": self.project_path,
"project_name": self.project_name,
"confidence": self.confidence,
"association_method": self.association_method,
"source_file": self.source_file,
"source_line": self.source_line,
"tool_name": self.tool_name,
"tool_description": self.tool_description,
"activity_timestamp": self.activity_timestamp,
"association_hash": self.compute_hash()
}

=============================================================================

Track Name Mapping

=============================================================================

TRACK_NAMES = { "A": "Backend", "B": "Frontend", "C": "DevOps", "D": "Security", "E": "Testing", "F": "Documentation", "G": "DMS", "H": "Framework", "I": "UI Components", "J": "Memory", "K": "Workflow", "L": "Extended Testing", "M": "Extended Security", "N": "GTM", # PCF Tracks "O": "Vision & Strategy", "P": "Products & Services", "Q": "Marketing & Sales", "R": "Physical Delivery", "S": "Service Delivery", "T": "Customer Service", "U": "Human Capital", "V": "Information Technology", "W": "Financial Resources", "X": "Asset Management", "Y": "Risk & Compliance", "Z": "External Relationships", "AA": "Business Capabilities", # Extension Tracks "AB": "Platform Mobile", "AC": "Platform Desktop", "AD": "AI/ML Integration", "AE": "Data Engineering", "AF": "API Integrations", "AG": "Healthcare", "AH": "Finance", "AI": "Government", "AJ": "Localization", "AK": "Sustainability", }

=============================================================================

Project Mapping

=============================================================================

Known project patterns for path-based association

PROJECT_PATTERNS = { "coditect-core": [ "/coditect-core/", "/submodules/core/coditect-core/", "/.coditect/", ], "coditect-cloud-infra": [ "/coditect-cloud-infra/", "/submodules/cloud/coditect-cloud-infra/", ], "coditect-cloud-frontend": [ "/coditect-cloud-frontend/", "/submodules/cloud/coditect-cloud-frontend/", ], "coditect-cloud-ide": [ "/coditect-cloud-ide/", "/submodules/cloud/coditect-cloud-ide/", ], "coditect-rollout-master": [ "/coditect-rollout-master/", "PROJECTS/coditect-rollout-master", ], "coditect-docs-main": [ "/coditect-docs-main/", "/submodules/docs/coditect-docs-main/", ], "coditect-cli": [ "/coditect-cli/", "/submodules/dev/coditect-cli/", ], "coditect-telemetry": [ "/coditect-telemetry/", "/submodules/dev/coditect-telemetry/", ], }

=============================================================================

Track Parser

=============================================================================

class TrackParser: """Parse TRACK files to build task definition index."""

# Regex for task IDs: A.1.2.3 format
TASK_ID_PATTERN = re.compile(r'^([A-Z]{1,2})\.(\d+)(?:\.(\d+))?(?:\.(\d+))?')

# Regex for task lines in track files
TASK_LINE_PATTERN = re.compile(
r'^[-*]\s*\[([x ])\]\s+([A-Z]{1,2}\.\d+(?:\.\d+)*):?\s*(.*)$',
re.MULTILINE
)

def __init__(self, tracks_dir: Path = TRACKS_DIR):
self.tracks_dir = tracks_dir
self.task_index: Dict[str, TaskDefinition] = {}

def parse_all_tracks(self) -> Dict[str, TaskDefinition]:
"""Parse all TRACK files and build task index."""
if not self.tracks_dir.exists():
print(f"Warning: Tracks directory not found: {self.tracks_dir}")
return self.task_index

# Find all track files (both TRACK-*.md and track-*.md)
track_files = list(self.tracks_dir.glob("track-*.md"))
track_files.extend(list(self.tracks_dir.glob("TRACK-*.md")))

for track_file in track_files:
self._parse_track_file(track_file)

return self.task_index

def _parse_track_file(self, track_file: Path) -> None:
"""Parse a single TRACK file."""
try:
content = track_file.read_text(encoding='utf-8')
except Exception as e:
print(f"Error reading {track_file}: {e}")
return

# Extract track letter from filename
# track-f-documentation-support.md → F
# TRACK-A-BACKEND-COMPLETION.md → A
name = track_file.stem.lower()
track_match = re.search(r'track-([a-z]+)-', name)
if track_match:
track_letter = track_match.group(1).upper()
else:
track_letter = "?"

track_name = TRACK_NAMES.get(track_letter, "Unknown")

# Find all task lines
for match in self.TASK_LINE_PATTERN.finditer(content):
status_char = match.group(1)
task_id = match.group(2)
description = match.group(3).strip()

status = "completed" if status_char == "x" else "pending"

# Extract track from task_id if we couldn't get it from filename
id_match = self.TASK_ID_PATTERN.match(task_id)
if id_match:
task_track = id_match.group(1)
task_track_name = TRACK_NAMES.get(task_track, track_name)
else:
task_track = track_letter
task_track_name = track_name

self.task_index[task_id] = TaskDefinition(
task_id=task_id,
track=task_track,
track_name=task_track_name,
description=description,
status=status
)

def get_task(self, task_id: str) -> Optional[TaskDefinition]:
"""Get task definition by ID."""
return self.task_index.get(task_id)

def get_track_for_task(self, task_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Get track letter and name for a task ID."""
# First check index
if task_id in self.task_index:
task = self.task_index[task_id]
return task.track, task.track_name

# Parse from task ID directly
match = self.TASK_ID_PATTERN.match(task_id)
if match:
track = match.group(1)
return track, TRACK_NAMES.get(track, "Unknown")

return None, None

=============================================================================

Session Extractor with Overlapping Windows

=============================================================================

class SessionExtractor: """Extract activities from LLM session files with overlapping windows."""

# Task ID regex in tool descriptions
TASK_ID_REGEX = re.compile(r'^([A-Z]{1,2}\.\d+(?:\.\d+)*)')

# File path patterns
PATH_PATTERNS = [
re.compile(r'"file_path":\s*"([^"]+)"'),
re.compile(r'"path":\s*"([^"]+)"'),
re.compile(r'"command":\s*"[^"]*(/[^\s"]+)'),
re.compile(r'"cwd":\s*"([^"]+)"'),
]

def __init__(self, track_parser: TrackParser, window_size: int = 4096, overlap: int = 512):
"""
Initialize extractor.

Args:
track_parser: TrackParser instance for task lookups
window_size: Token window size for chunked processing
overlap: Token overlap between windows
"""
self.track_parser = track_parser
self.window_size = window_size
self.overlap = overlap

def discover_sessions(self) -> Iterator[Tuple[Path, str]]:
"""Discover all LLM session files."""
# Claude sessions
if CLAUDE_SESSIONS.exists():
for jsonl in CLAUDE_SESSIONS.rglob("*.jsonl"):
yield jsonl, "claude"

# Codex sessions
if CODEX_SESSIONS.exists():
for jsonl in CODEX_SESSIONS.rglob("*.jsonl"):
yield jsonl, "codex"

# Gemini sessions
if GEMINI_SESSIONS.exists():
for jsonl in GEMINI_SESSIONS.rglob("*.jsonl"):
yield jsonl, "gemini"

def extract_from_session(
self,
session_file: Path,
llm_source: str
) -> Iterator[ActivityAssociation]:
"""Extract activity associations from a session file."""
session_id = session_file.stem

try:
with open(session_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue

try:
entry = json.loads(line)
except json.JSONDecodeError:
continue

# Process tool_use entries
for assoc in self._extract_from_entry(
entry,
session_id,
llm_source,
str(session_file),
line_num
):
yield assoc

except Exception as e:
print(f"Error processing {session_file}: {e}")

def _extract_from_entry(
self,
entry: Dict[str, Any],
session_id: str,
llm_source: str,
source_file: str,
line_num: int
) -> Iterator[ActivityAssociation]:
"""Extract associations from a single JSONL entry."""
entry_type = entry.get('type', '')
timestamp = entry.get('timestamp', '')

# Direct tool_use entries
if entry_type == 'tool_use':
tool_name = entry.get('name', 'unknown')
tool_input = entry.get('input', {})
message_id = entry.get('uuid', '')

for assoc in self._process_tool_call(
session_id, llm_source, message_id, tool_name,
tool_input, timestamp, source_file, line_num
):
yield assoc

# Assistant messages with tool_use content blocks
elif entry_type == 'assistant' or entry.get('role') == 'assistant':
message = entry.get('message', {})
content_blocks = message.get('content', [])
message_id = entry.get('uuid', '')

if isinstance(content_blocks, list):
for block in content_blocks:
if isinstance(block, dict) and block.get('type') == 'tool_use':
tool_name = block.get('name', 'unknown')
tool_input = block.get('input', {})

for assoc in self._process_tool_call(
session_id, llm_source, message_id, tool_name,
tool_input, timestamp, source_file, line_num
):
yield assoc

def _process_tool_call(
self,
session_id: str,
llm_source: str,
message_id: str,
tool_name: str,
tool_input: Dict[str, Any],
timestamp: str,
source_file: str,
line_num: int
) -> Iterator[ActivityAssociation]:
"""Process a tool call and create associations."""
description = tool_input.get('description', '')

# Create base association
assoc = ActivityAssociation(
session_id=session_id,
llm_source=llm_source,
message_id=message_id,
tool_name=tool_name,
tool_description=description,
activity_timestamp=timestamp,
source_file=source_file,
source_line=line_num
)

# Tier 1: Explicit task ID (99% confidence)
task_id = self._extract_task_id(description)
if task_id:
assoc.task_id = task_id
assoc.confidence = 0.99
assoc.association_method = "explicit"

# Get track info
track, track_name = self.track_parser.get_track_for_task(task_id)
assoc.track = track
assoc.track_name = track_name

yield assoc
return

# Tier 2: Path-based association (85% confidence)
paths = self._extract_paths(tool_input)
project_name, project_path = self._identify_project(paths)

if project_name:
assoc.project_name = project_name
assoc.project_path = project_path
assoc.confidence = 0.85
assoc.association_method = "path"

yield assoc
return

# Tier 3: Unassociated (create orphan entry for tracking)
assoc.confidence = 0.0
assoc.association_method = "unassociated"
yield assoc

def _extract_task_id(self, description: str) -> Optional[str]:
"""Extract task ID from tool description."""
if not description:
return None

match = self.TASK_ID_REGEX.match(description)
if match:
return match.group(1)

return None

def _extract_paths(self, tool_input: Dict[str, Any]) -> List[str]:
"""Extract file paths from tool input."""
paths = []

# Direct fields
for field in ['file_path', 'path', 'cwd', 'dir_path', 'pattern']:
if field in tool_input:
value = tool_input[field]
if isinstance(value, str) and '/' in value:
paths.append(value)

# Command parsing for Bash
command = tool_input.get('command', '')
if command:
# Extract paths from command
path_match = re.findall(r'(/[^\s"\']+)', command)
paths.extend(path_match)

return paths

def _identify_project(self, paths: List[str]) -> Tuple[Optional[str], Optional[str]]:
"""Identify project from file paths."""
for path in paths:
for project_name, patterns in PROJECT_PATTERNS.items():
for pattern in patterns:
if pattern in path:
return project_name, path

return None, None

=============================================================================

Database Manager

=============================================================================

class DatabaseManager: """Manage activity associations in SQLite."""

def __init__(self, db_path: Path = SESSIONS_DB):
self.db_path = db_path
self.conn: Optional[sqlite3.Connection] = None

def connect(self) -> sqlite3.Connection:
"""Connect to database and ensure schema exists."""
if self.conn is not None:
return self.conn

self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row

# Create schema
self.conn.executescript(ACTIVITY_ASSOCIATIONS_SCHEMA)
self.conn.commit()

return self.conn

def close(self) -> None:
"""Close database connection."""
if self.conn:
self.conn.close()
self.conn = None

def get_existing_hashes(self) -> Set[str]:
"""Get all existing association hashes."""
conn = self.connect()
cursor = conn.execute(
"SELECT association_hash FROM activity_associations WHERE association_hash IS NOT NULL"
)
return {row[0] for row in cursor.fetchall()}

def insert_association(self, assoc: ActivityAssociation) -> bool:
"""Insert association if not duplicate. Returns True if inserted."""
conn = self.connect()
data = assoc.to_dict()

try:
conn.execute("""
INSERT INTO activity_associations (
session_id, message_id, llm_source, task_id, track, track_name,
project_path, project_name, confidence, association_method,
source_file, source_line, tool_name, tool_description,
activity_timestamp, association_hash
) VALUES (
:session_id, :message_id, :llm_source, :task_id, :track, :track_name,
:project_path, :project_name, :confidence, :association_method,
:source_file, :source_line, :tool_name, :tool_description,
:activity_timestamp, :association_hash
)
""", data)
conn.commit()
return True
except sqlite3.IntegrityError:
# Duplicate hash
return False

def insert_batch(self, associations: List[ActivityAssociation]) -> Tuple[int, int]:
"""Insert batch of associations. Returns (inserted, duplicates)."""
inserted = 0
duplicates = 0

existing = self.get_existing_hashes()

conn = self.connect()
for assoc in associations:
hash_val = assoc.compute_hash()
if hash_val in existing:
duplicates += 1
continue

data = assoc.to_dict()
try:
conn.execute("""
INSERT INTO activity_associations (
session_id, message_id, llm_source, task_id, track, track_name,
project_path, project_name, confidence, association_method,
source_file, source_line, tool_name, tool_description,
activity_timestamp, association_hash
) VALUES (
:session_id, :message_id, :llm_source, :task_id, :track, :track_name,
:project_path, :project_name, :confidence, :association_method,
:source_file, :source_line, :tool_name, :tool_description,
:activity_timestamp, :association_hash
)
""", data)
inserted += 1
existing.add(hash_val)
except sqlite3.IntegrityError:
duplicates += 1

conn.commit()
return inserted, duplicates

def get_report(self) -> Dict[str, Any]:
"""Generate association report."""
conn = self.connect()

report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_associations": 0,
"by_track": [],
"by_project": [],
"by_method": [],
"orphan_count": 0,
"confidence_distribution": {},
}

# Total count
cursor = conn.execute("SELECT COUNT(*) FROM activity_associations")
report["total_associations"] = cursor.fetchone()[0]

# By track
cursor = conn.execute("SELECT * FROM activity_by_track")
report["by_track"] = [dict(row) for row in cursor.fetchall()]

# By project
cursor = conn.execute("SELECT * FROM activity_by_project")
report["by_project"] = [dict(row) for row in cursor.fetchall()]

# By method
cursor = conn.execute("SELECT * FROM activity_by_method")
report["by_method"] = [dict(row) for row in cursor.fetchall()]

# Orphan count
cursor = conn.execute("SELECT COUNT(*) FROM orphan_activities")
report["orphan_count"] = cursor.fetchone()[0]

# Confidence distribution
cursor = conn.execute("""
SELECT
CASE
WHEN confidence >= 0.9 THEN 'high (90%+)'
WHEN confidence >= 0.7 THEN 'medium (70-89%)'
WHEN confidence >= 0.5 THEN 'low (50-69%)'
ELSE 'unassociated (<50%)'
END as tier,
COUNT(*) as count
FROM activity_associations
GROUP BY tier
ORDER BY count DESC
""")
report["confidence_distribution"] = {row[0]: row[1] for row in cursor.fetchall()}

return report

=============================================================================

Main Pipeline

=============================================================================

class ActivityProjectAssociator: """Main pipeline coordinator."""

def __init__(
self,
db_path: Path = SESSIONS_DB,
tracks_dir: Path = TRACKS_DIR,
dry_run: bool = False,
verbose: bool = False
):
self.dry_run = dry_run
self.verbose = verbose

self.db_manager = DatabaseManager(db_path)
self.track_parser = TrackParser(tracks_dir)
self.extractor = SessionExtractor(self.track_parser)

# Stats
self.stats = {
"sessions_processed": 0,
"activities_found": 0,
"associations_inserted": 0,
"duplicates_skipped": 0,
"errors": 0,
}

def run_batch(self, max_sessions: Optional[int] = None) -> Dict[str, Any]:
"""Run batch processing on all discovered sessions."""
print("=" * 60)
print("J.14: Activity-Project Association Pipeline")
print("=" * 60)

# Phase 1: Parse track files
print("\nPhase 1: Parsing TRACK files...")
self.track_parser.parse_all_tracks()
print(f" Loaded {len(self.track_parser.task_index)} task definitions")

# Phase 2: Discover sessions
print("\nPhase 2: Discovering LLM sessions...")
sessions = list(self.extractor.discover_sessions())
print(f" Found {len(sessions)} session files")

# Limit if requested
if max_sessions:
sessions = sessions[:max_sessions]
print(f" Processing first {max_sessions} sessions")

# Phase 3: Extract associations
print("\nPhase 3: Extracting associations...")
associations_buffer = []

for session_file, llm_source in sessions:
self.stats["sessions_processed"] += 1

if self.verbose:
print(f" Processing: {session_file.name} ({llm_source})")

try:
for assoc in self.extractor.extract_from_session(session_file, llm_source):
self.stats["activities_found"] += 1
associations_buffer.append(assoc)

# Batch insert every 1000
if len(associations_buffer) >= 1000:
if not self.dry_run:
inserted, dupes = self.db_manager.insert_batch(associations_buffer)
self.stats["associations_inserted"] += inserted
self.stats["duplicates_skipped"] += dupes
associations_buffer = []

except Exception as e:
self.stats["errors"] += 1
if self.verbose:
print(f" Error: {e}")

# Insert remaining
if associations_buffer and not self.dry_run:
inserted, dupes = self.db_manager.insert_batch(associations_buffer)
self.stats["associations_inserted"] += inserted
self.stats["duplicates_skipped"] += dupes

# Phase 4: Report
print("\n" + "=" * 60)
print("Results:")
print("=" * 60)
print(f" Sessions processed: {self.stats['sessions_processed']}")
print(f" Activities found: {self.stats['activities_found']}")
print(f" Associations inserted: {self.stats['associations_inserted']}")
print(f" Duplicates skipped: {self.stats['duplicates_skipped']}")
print(f" Errors: {self.stats['errors']}")

if self.dry_run:
print("\n [DRY RUN - No data written]")

return self.stats

def run_session(self, session_id: str) -> Dict[str, Any]:
"""Process a specific session."""
# Find session file
session_file = None
llm_source = None

for path, source in self.extractor.discover_sessions():
if session_id in path.stem:
session_file = path
llm_source = source
break

if not session_file:
print(f"Session not found: {session_id}")
return {"error": "Session not found"}

print(f"Processing session: {session_file}")

# Parse tracks
self.track_parser.parse_all_tracks()

# Extract associations
associations = list(self.extractor.extract_from_session(session_file, llm_source))

print(f"Found {len(associations)} activities")

# Insert
if not self.dry_run:
inserted, dupes = self.db_manager.insert_batch(associations)
print(f"Inserted: {inserted}, Duplicates: {dupes}")

# Show sample
print("\nSample associations:")
for assoc in associations[:10]:
print(f" {assoc.task_id or 'N/A':15} | {assoc.association_method:12} | {assoc.confidence:.0%} | {assoc.tool_name}")

return {
"session_id": session_id,
"activities": len(associations),
}

def generate_report(self) -> Dict[str, Any]:
"""Generate and display association report."""
report = self.db_manager.get_report()

print("\n" + "=" * 60)
print("J.14: Activity-Project Association Report")
print("=" * 60)
print(f"\nGenerated: {report['generated_at']}")
print(f"Total associations: {report['total_associations']}")
print(f"Orphan activities: {report['orphan_count']}")

print("\n--- Confidence Distribution ---")
for tier, count in report['confidence_distribution'].items():
print(f" {tier}: {count}")

print("\n--- By Association Method ---")
for item in report['by_method']:
print(f" {item['association_method']:15} | {item['count']:6} | {item['percentage']:.1f}% | avg conf: {item['avg_confidence']}")

print("\n--- By Track ---")
for item in report['by_track'][:15]:
print(f" {item['track'] or 'N/A':3} {item['track_name'] or 'Unknown':20} | {item['total_activities']:6} activities | {item['sessions']} sessions | {item['unique_tasks']} tasks")

print("\n--- By Project ---")
for item in report['by_project'][:10]:
print(f" {item['project_name'] or 'Unknown':30} | {item['total_activities']:6} activities | {item['sessions']} sessions")

return report

=============================================================================

CLI

=============================================================================

def main(): parser = argparse.ArgumentParser( description="J.14: Activity-Project Association Pipeline" ) parser.add_argument( "--batch", action="store_true", help="Process all discovered sessions" ) parser.add_argument( "--session", type=str, help="Process a specific session by ID" ) parser.add_argument( "--report", action="store_true", help="Generate association report" ) parser.add_argument( "--max-sessions", type=int, default=None, help="Limit number of sessions to process" ) parser.add_argument( "--dry-run", action="store_true", help="Don't write to database" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Verbose output" ) parser.add_argument( "--db", type=str, default=str(SESSIONS_DB), help=f"Database path (default: {SESSIONS_DB})" )

args = parser.parse_args()

# Create pipeline
pipeline = ActivityProjectAssociator(
db_path=Path(args.db),
dry_run=args.dry_run,
verbose=args.verbose
)

try:
if args.batch:
pipeline.run_batch(max_sessions=args.max_sessions)
elif args.session:
pipeline.run_session(args.session)
elif args.report:
pipeline.generate_report()
else:
# Default: show help and current stats
parser.print_help()
print("\n\nCurrent database stats:")
pipeline.generate_report()
finally:
pipeline.db_manager.close()

if name == "main": main()