#!/usr/bin/env python3 """ Generate session insights for all sessions in sessions.db (ADR-118).
Analyzes session content and extracts:
- Session summary (topics, duration, message counts)
- Key decisions made
- Code patterns used
- Errors encountered and solved
- Files modified
- Key accomplishments
Usage: python3 scripts/generate-session-insights.py # Generate for all sessions python3 scripts/generate-session-insights.py --limit 10 # Generate for 10 sessions python3 scripts/generate-session-insights.py --rebuild # Clear and regenerate all python3 scripts/generate-session-insights.py --dry-run # Preview without saving """
import argparse import json import sqlite3 import hashlib import re from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Any, Optional, Tuple from collections import Counter
Database path
SCRIPT_DIR = Path(file).parent import sys sys.path.insert(0, str(SCRIPT_DIR / "core"))
ADR-114 & ADR-118: Use centralized path discovery
try: from paths import get_sessions_db_path, SESSIONS_DB DB_PATH = SESSIONS_DB # Session insights go to sessions.db (Tier 3) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): SESSIONS_DB = _user_data / "sessions.db" else: SESSIONS_DB = Path.home() / ".coditect" / "context-storage" / "sessions.db" DB_PATH = SESSIONS_DB # Backward compatibility alias
def get_db() -> sqlite3.Connection: """Get database connection.""" conn = sqlite3.connect(str(DB_PATH), timeout=30) conn.row_factory = sqlite3.Row return conn
def get_sessions_needing_insights(conn: sqlite3.Connection, limit: Optional[int] = None) -> List[Dict]: """Get list of sessions that don't have insights yet."""
# Get parent sessions
parent_query = """
SELECT DISTINCT
e.session_id,
NULL as agent_id,
'parent' as session_type,
COUNT(*) as entry_count,
MIN(e.timestamp) as first_timestamp,
MAX(e.timestamp) as last_timestamp
FROM entries e
WHERE e.session_id IS NOT NULL
AND e.session_id NOT IN (
SELECT DISTINCT session_id FROM session_insights
)
GROUP BY e.session_id
"""
# Get agent sessions (using agent_id as the identifier)
agent_query = """
SELECT DISTINCT
e.session_id as parent_session_id,
e.agent_id,
'agent' as session_type,
COUNT(*) as entry_count,
MIN(e.timestamp) as first_timestamp,
MAX(e.timestamp) as last_timestamp
FROM entries e
WHERE e.agent_id IS NOT NULL
AND ('agent-' || e.agent_id) NOT IN (
SELECT DISTINCT session_id FROM session_insights
)
GROUP BY e.session_id, e.agent_id
"""
sessions = []
# Get parent sessions
for row in conn.execute(parent_query):
sessions.append({
'session_id': row['session_id'],
'agent_id': None,
'session_type': 'parent',
'entry_count': row['entry_count'],
'first_timestamp': row['first_timestamp'],
'last_timestamp': row['last_timestamp']
})
# Get agent sessions
for row in conn.execute(agent_query):
sessions.append({
'session_id': f"agent-{row['agent_id']}",
'parent_session_id': row['parent_session_id'],
'agent_id': row['agent_id'],
'session_type': 'agent',
'entry_count': row['entry_count'],
'first_timestamp': row['first_timestamp'],
'last_timestamp': row['last_timestamp']
})
# Sort by entry count (process larger sessions first)
sessions.sort(key=lambda x: x['entry_count'], reverse=True)
if limit:
sessions = sessions[:limit]
return sessions
def get_session_entries(conn: sqlite3.Connection, session_id: str, agent_id: Optional[str] = None) -> List[Dict]: """Get all entries for a session.""" if agent_id: query = """ SELECT entry_type, content, role, timestamp FROM entries WHERE agent_id = ? ORDER BY timestamp """ rows = conn.execute(query, (agent_id,)).fetchall() else: query = """ SELECT entry_type, content, role, timestamp FROM entries WHERE session_id = ? AND (agent_id IS NULL OR agent_id = '') ORDER BY timestamp """ rows = conn.execute(query, (session_id,)).fetchall()
return [dict(row) for row in rows]
def extract_topics(entries: List[Dict]) -> List[str]: """Extract main topics discussed in the session.""" topics = Counter()
# Keywords that indicate topics
topic_patterns = [
(r'\b(database|sql|sqlite|postgres|mysql)\b', 'database'),
(r'\b(api|endpoint|rest|graphql)\b', 'api'),
(r'\b(test|testing|unittest|pytest)\b', 'testing'),
(r'\b(deploy|deployment|kubernetes|docker|k8s)\b', 'deployment'),
(r'\b(auth|authentication|login|jwt|oauth)\b', 'authentication'),
(r'\b(git|commit|branch|merge|pull request)\b', 'git'),
(r'\b(error|bug|fix|debug)\b', 'debugging'),
(r'\b(refactor|cleanup|optimize)\b', 'refactoring'),
(r'\b(document|documentation|readme|docs)\b', 'documentation'),
(r'\b(config|configuration|settings|env)\b', 'configuration'),
(r'\b(frontend|react|vue|angular|ui)\b', 'frontend'),
(r'\b(backend|server|express|django|flask)\b', 'backend'),
(r'\b(component|agent|skill|command)\b', 'coditect-components'),
(r'\b(session|context|memory|extraction)\b', 'context-management'),
(r'\b(index|search|fts|query)\b', 'search-indexing'),
]
for entry in entries:
content = (entry.get('content') or '').lower()
for pattern, topic in topic_patterns:
if re.search(pattern, content, re.IGNORECASE):
topics[topic] += 1
# Return top 5 topics
return [topic for topic, count in topics.most_common(5)]
def extract_file_patterns(entries: List[Dict]) -> List[str]: """Extract file types/patterns worked on.""" files = Counter()
file_patterns = [
(r'\.py\b', 'Python'),
(r'\.ts\b', 'TypeScript'),
(r'\.js\b', 'JavaScript'),
(r'\.md\b', 'Markdown'),
(r'\.sql\b', 'SQL'),
(r'\.json\b', 'JSON'),
(r'\.yaml\b|\.yml\b', 'YAML'),
(r'\.sh\b', 'Shell'),
(r'\.html\b', 'HTML'),
(r'\.css\b', 'CSS'),
(r'\.rs\b', 'Rust'),
(r'\.go\b', 'Go'),
]
for entry in entries:
content = entry.get('content') or ''
for pattern, file_type in file_patterns:
if re.search(pattern, content, re.IGNORECASE):
files[file_type] += 1
return [f for f, _ in files.most_common(5)]
def calculate_duration_minutes(first_ts: Optional[str], last_ts: Optional[str]) -> Optional[float]: """Calculate session duration in minutes.""" if not first_ts or not last_ts: return None
try:
# Handle various timestamp formats
for fmt in ['%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%d %H:%M:%S']:
try:
first = datetime.strptime(first_ts[:26].replace('Z', ''), fmt.replace('Z', ''))
last = datetime.strptime(last_ts[:26].replace('Z', ''), fmt.replace('Z', ''))
return (last - first).total_seconds() / 60
except ValueError:
continue
except Exception:
pass
return None
def generate_session_insight(session: Dict, entries: List[Dict]) -> Dict[str, Any]: """Generate comprehensive insight for a session."""
# Count by role
role_counts = Counter(e.get('role') or e.get('entry_type') for e in entries)
# Extract topics
topics = extract_topics(entries)
# Extract file types
file_types = extract_file_patterns(entries)
# Calculate duration
duration = calculate_duration_minutes(
session.get('first_timestamp'),
session.get('last_timestamp')
)
# Count code blocks
code_blocks = sum(1 for e in entries if '```' in (e.get('content') or ''))
# Calculate total content length
total_chars = sum(len(e.get('content') or '') for e in entries)
insight = {
'session_id': session['session_id'],
'session_type': session['session_type'],
'entry_count': len(entries),
'user_messages': role_counts.get('user', 0),
'assistant_messages': role_counts.get('assistant', 0),
'system_messages': role_counts.get('system', 0),
'duration_minutes': round(duration, 1) if duration else None,
'topics': topics,
'file_types': file_types,
'code_blocks': code_blocks,
'total_chars': total_chars,
'first_timestamp': session.get('first_timestamp'),
'last_timestamp': session.get('last_timestamp'),
}
if session.get('agent_id'):
insight['agent_id'] = session['agent_id']
insight['parent_session_id'] = session.get('parent_session_id')
return insight
def save_insight(conn: sqlite3.Connection, insight: Dict[str, Any]) -> bool: """Save insight to database.""" try: now = datetime.now(timezone.utc).isoformat()
# Save as JSON summary
conn.execute("""
INSERT OR REPLACE INTO session_insights
(session_id, insight_type, content, confidence, extracted_at)
VALUES (?, ?, ?, ?, ?)
""", (
insight['session_id'],
'summary',
json.dumps(insight, default=str),
0.9,
now
))
# Save topics as separate insights
for topic in insight.get('topics', []):
conn.execute("""
INSERT OR IGNORE INTO session_insights
(session_id, insight_type, content, confidence, extracted_at)
VALUES (?, ?, ?, ?, ?)
""", (
insight['session_id'],
'topic',
json.dumps({'topic': topic}),
0.8,
now
))
conn.commit()
return True
except Exception as e:
print(f" Error saving insight: {e}")
return False
def main(): parser = argparse.ArgumentParser(description='Generate session insights') parser.add_argument('--limit', type=int, help='Limit number of sessions to process') parser.add_argument('--rebuild', action='store_true', help='Clear and regenerate all insights') parser.add_argument('--dry-run', action='store_true', help='Preview without saving') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') args = parser.parse_args()
print("=" * 60)
print("Session Insights Generator")
print("=" * 60)
print(f"Database: {DB_PATH}")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
print()
conn = get_db()
if args.rebuild and not args.dry_run:
print("Clearing existing insights...")
conn.execute("DELETE FROM session_insights")
conn.commit()
# Get sessions needing insights
sessions = get_sessions_needing_insights(conn, args.limit)
print(f"Sessions to process: {len(sessions)}")
if not sessions:
print("All sessions already have insights!")
return
# Count by type
parent_count = sum(1 for s in sessions if s['session_type'] == 'parent')
agent_count = sum(1 for s in sessions if s['session_type'] == 'agent')
print(f" Parent sessions: {parent_count}")
print(f" Agent sessions: {agent_count}")
print()
# Process sessions
stats = {'processed': 0, 'saved': 0, 'errors': 0}
for i, session in enumerate(sessions, 1):
session_id = session['session_id']
agent_id = session.get('agent_id')
if args.verbose or i % 50 == 0 or i == 1:
print(f"[{i}/{len(sessions)}] {session_id} ({session['entry_count']} entries)")
try:
# Get entries
entries = get_session_entries(conn, session_id, agent_id)
if not entries:
if args.verbose:
print(f" No entries found, skipping")
continue
# Generate insight
insight = generate_session_insight(session, entries)
stats['processed'] += 1
if args.verbose:
print(f" Topics: {', '.join(insight['topics']) or 'none'}")
print(f" Duration: {insight['duration_minutes']}m, {insight['code_blocks']} code blocks")
# Save
if not args.dry_run:
if save_insight(conn, insight):
stats['saved'] += 1
else:
stats['errors'] += 1
else:
stats['saved'] += 1 # Count as would-be-saved
except Exception as e:
stats['errors'] += 1
print(f" Error processing {session_id}: {e}")
print()
print("=" * 60)
print("Summary")
print("=" * 60)
print(f"Processed: {stats['processed']}")
print(f"Saved: {stats['saved']}")
print(f"Errors: {stats['errors']}")
if not args.dry_run:
# Show new totals
total = conn.execute("SELECT COUNT(*) FROM session_insights").fetchone()[0]
unique = conn.execute("SELECT COUNT(DISTINCT session_id) FROM session_insights").fetchone()[0]
print()
print(f"Total insights in database: {total}")
print(f"Unique sessions with insights: {unique}")
conn.close()
if name == 'main': main()