Skip to main content

scripts-post-session-processor

#!/usr/bin/env python3 """

title: "Post Session Processor" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Post-Session Processing System for CODITECT-core" keywords: ['database', 'post', 'processor', 'session'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "post-session-processor.py" language: python executable: true usage: "python3 scripts/post-session-processor.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Post-Session Processing System for CODITECT-core

Provides reliable bookkeeping and insight extraction after agent sessions complete. Runs independently of agent compliance to ensure 100% reliable state updates.

Usage: python3 scripts/post-session-processor.py --session-id SESSION_ID python3 scripts/post-session-processor.py --transcript path/to/transcript.txt python3 scripts/post-session-processor.py --auto # Process latest session """

import argparse import asyncio import json import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any import re

class PostSessionProcessor: """Process agent sessions to extract insights and update state."""

def __init__(self, coditect_root: Path):
self.root = coditect_root
self.coditect_dir = self.root / ".coditect"
self.config_path = self.root / "config" / "post-session-config.json"

# ADR-114 & ADR-118: Use centralized path discovery for user data
try:
import sys
sys.path.insert(0, str(self.root / "scripts" / "core"))
from paths import (
get_context_storage_dir,
get_org_db_path,
get_sessions_db_path,
ORG_DB,
SESSIONS_DB,
)
self.context_dir = get_context_storage_dir()
self.org_db = ORG_DB
self.sessions_db = SESSIONS_DB
self._paths_available = True
except ImportError:
# Fallback for backward compatibility
_user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if _user_data.exists():
self.context_dir = _user_data
else:
self.context_dir = self.root / "context-storage"
self.org_db = self.context_dir / "org.db"
self.sessions_db = self.context_dir / "sessions.db"
self._paths_available = False

# ADR-118: context.db is DEPRECATED - NO FALLBACK allowed
# Use sessions.db for session data, org.db for knowledge

# Ensure directories exist
self.coditect_dir.mkdir(exist_ok=True)
self.context_dir.mkdir(exist_ok=True)

# State files
self.insights_file = self.coditect_dir / "session-insights.json"
self.task_status_file = self.coditect_dir / "task-status.json"

# Load configuration
self.config = self._load_config()

def _load_config(self) -> Dict[str, Any]:
"""Load post-session processing configuration."""
if self.config_path.exists():
with open(self.config_path, 'r') as f:
return json.load(f)

# Default configuration
return {
"insight_extraction": {
"patterns": {
"code_patterns": True,
"architecture_decisions": True,
"error_solutions": True,
"gotchas_warnings": True
},
"min_confidence": 0.7
},
"knowledge_base": {
"auto_update": True,
"deduplication": True
},
"session_summary": {
"max_length": 1000,
"include_metrics": True
}
}

async def process_session(
self,
transcript: str,
session_id: Optional[str] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Process a complete agent session.

Args:
transcript: Session transcript text
session_id: Optional session identifier
task_id: Optional task identifier

Returns:
Processing results with insights and status
"""
print(f"🔄 Processing session: {session_id or 'unknown'}")

# Generate session ID if not provided
if not session_id:
session_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")

results = {
"session_id": session_id,
"task_id": task_id,
"processed_at": datetime.now(timezone.utc).isoformat(),
"insights": {},
"status": "processing"
}

try:
# Step 1: Extract insights from transcript
print(" 📊 Extracting insights...")
insights = await self._extract_insights(transcript, session_id)
results["insights"] = insights

# Step 2: Update knowledge base
if self.config["knowledge_base"]["auto_update"]:
print(" 💾 Updating knowledge base...")
await self._update_knowledge_base(insights, session_id)

# Step 3: Update task status
if task_id:
print(" ✅ Updating task status...")
await self._update_task_status(task_id, "completed", insights)

# Step 4: Store session summary
print(" 📝 Storing session summary...")
await self._store_session_summary(session_id, insights, transcript)

# Step 5: Append to unified messages
print(" 📎 Appending to unified messages...")
await self._append_to_unified_messages(session_id, insights)

results["status"] = "completed"
print(f"✅ Session processing completed: {session_id}")

except Exception as e:
results["status"] = "failed"
results["error"] = str(e)
print(f"❌ Session processing failed: {e}")
raise

return results

async def _extract_insights(self, transcript: str, session_id: str) -> Dict[str, Any]:
"""Extract structured insights from session transcript."""
insights = {
"session_id": session_id,
"extracted_at": datetime.now(timezone.utc).isoformat(),
"patterns": [],
"decisions": [],
"gotchas": [],
"errors": [],
"file_modifications": [],
"metrics": {}
}

# Extract code patterns
if self.config["insight_extraction"]["patterns"]["code_patterns"]:
insights["patterns"] = self._extract_code_patterns(transcript)

# Extract architecture decisions
if self.config["insight_extraction"]["patterns"]["architecture_decisions"]:
insights["decisions"] = self._extract_decisions(transcript)

# Extract gotchas and warnings
if self.config["insight_extraction"]["patterns"]["gotchas_warnings"]:
insights["gotchas"] = self._extract_gotchas(transcript)

# Extract error solutions
if self.config["insight_extraction"]["patterns"]["error_solutions"]:
insights["errors"] = self._extract_error_solutions(transcript)

# Extract file modifications
insights["file_modifications"] = self._extract_file_modifications(transcript)

# Calculate metrics
insights["metrics"] = self._calculate_session_metrics(transcript, insights)

return insights

def _extract_code_patterns(self, transcript: str) -> List[Dict[str, Any]]:
"""Extract code patterns from transcript."""
patterns = []

# Pattern: Function definitions
func_pattern = r'def\s+(\w+)\s*\([^)]*\):'
for match in re.finditer(func_pattern, transcript):
patterns.append({
"type": "function_definition",
"name": match.group(1),
"confidence": 0.9
})

# Pattern: Class definitions
class_pattern = r'class\s+(\w+)(?:\([^)]*\))?:'
for match in re.finditer(class_pattern, transcript):
patterns.append({
"type": "class_definition",
"name": match.group(1),
"confidence": 0.9
})

# Pattern: Import statements
import_pattern = r'(?:from\s+[\w.]+\s+)?import\s+([\w,\s]+)'
for match in re.finditer(import_pattern, transcript):
patterns.append({
"type": "import",
"modules": match.group(1).strip(),
"confidence": 0.85
})

# Pattern: Async/await usage
if 'async def' in transcript or 'await ' in transcript:
patterns.append({
"type": "async_pattern",
"description": "Async/await pattern used",
"confidence": 0.95
})

return patterns

def _extract_decisions(self, transcript: str) -> List[Dict[str, Any]]:
"""Extract architecture decisions from transcript."""
decisions = []

# Keywords indicating decisions - improved patterns
decision_patterns = [
(r'(?:I\s+)?decided to (.+?)(?:\sbecause\s+(.+?))?[.\n]', 'decided'),
(r'(?:I\s+)?chose (.+?)\s+because\s+(.+?)[.\n]', 'chose'),
(r'using (.+?)\s+instead of\s+(.+?)[.\n]', 'instead_of'),
(r'(?:Architecture )?(?:Decision|Choice):\s*(.+?)[.\n]', 'explicit_decision'),
]

for pattern, decision_type in decision_patterns:
for match in re.finditer(pattern, transcript, re.IGNORECASE | re.DOTALL):
# Extract decision and rationale based on groups
if decision_type == 'chose':
decision_text = f"chose {match.group(1).strip()}"
rationale = match.group(2).strip() if match.lastindex >= 2 else None
elif decision_type == 'decided':
decision_text = f"decided to {match.group(1).strip()}"
rationale = match.group(2).strip() if match.lastindex >= 2 else None
elif decision_type == 'instead_of':
decision_text = f"using {match.group(1).strip()} instead of {match.group(2).strip()}"
rationale = None
else:
decision_text = match.group(1).strip()
rationale = None

decisions.append({
"type": "architecture_decision",
"decision": decision_text[:200], # Limit length
"rationale": rationale[:200] if rationale else None,
"confidence": 0.85
})

return decisions

def _extract_gotchas(self, transcript: str) -> List[Dict[str, Any]]:
"""Extract gotchas and warnings from transcript."""
gotchas = []

# Warning patterns
warning_patterns = [
r'⚠️\s*(.*?)(?:\n|$)',
r'WARNING:\s*(.*?)(?:\n|$)',
r'CAUTION:\s*(.*?)(?:\n|$)',
r'Note:\s*(.*?)(?:\n|$)',
r'Important:\s*(.*?)(?:\n|$)'
]

for pattern in warning_patterns:
for match in re.finditer(pattern, transcript, re.IGNORECASE):
gotchas.append({
"type": "warning",
"message": match.group(1).strip(),
"confidence": 0.85
})

return gotchas

def _extract_error_solutions(self, transcript: str) -> List[Dict[str, Any]]:
"""Extract error solutions from transcript."""
errors = []

# Error patterns
error_patterns = [
r'Error:\s*(.*?)(?:\n|Solution:)',
r'Exception:\s*(.*?)(?:\n|$)',
r'Failed:\s*(.*?)(?:\n|Fixed by:)',
]

solution_patterns = [
r'Solution:\s*(.*?)(?:\n|$)',
r'Fixed by:\s*(.*?)(?:\n|$)',
r'Resolved:\s*(.*?)(?:\n|$)'
]

# Extract errors with solutions
for err_pattern in error_patterns:
for match in re.finditer(err_pattern, transcript, re.IGNORECASE):
error_text = match.group(1).strip()

# Try to find solution nearby
solution = None
for sol_pattern in solution_patterns:
sol_match = re.search(sol_pattern, transcript[match.end():match.end()+500], re.IGNORECASE)
if sol_match:
solution = sol_match.group(1).strip()
break

errors.append({
"type": "error_solution",
"error": error_text,
"solution": solution,
"confidence": 0.9 if solution else 0.7
})

return errors

def _extract_file_modifications(self, transcript: str) -> List[Dict[str, Any]]:
"""Extract file modifications from transcript."""
modifications = []

# File operation patterns
file_patterns = [
r'Created:\s*([^\n]+)',
r'Modified:\s*([^\n]+)',
r'Deleted:\s*([^\n]+)',
r'Writing to:\s*([^\n]+)',
r'Editing:\s*([^\n]+)'
]

for pattern in file_patterns:
for match in re.finditer(pattern, transcript, re.IGNORECASE):
operation = pattern.split(':')[0].strip('r\'').lower()
filepath = match.group(1).strip()

modifications.append({
"operation": operation,
"file": filepath,
"timestamp": datetime.now(timezone.utc).isoformat()
})

return modifications

def _calculate_session_metrics(self, transcript: str, insights: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate session metrics."""
return {
"transcript_length": len(transcript),
"patterns_found": len(insights["patterns"]),
"decisions_made": len(insights["decisions"]),
"gotchas_identified": len(insights["gotchas"]),
"errors_solved": len(insights["errors"]),
"files_modified": len(insights["file_modifications"]),
"processing_duration_ms": 0 # Set by caller
}

async def _update_knowledge_base(self, insights: Dict[str, Any], session_id: str):
"""Update SQLite knowledge base with insights (ADR-118 four-tier)."""
# Session insights go to sessions.db (TIER 3 - regenerable)
# ADR-118: NO FALLBACK to legacy context.db
if not self.sessions_db.exists():
print(" ⚠️ sessions.db not found, skipping knowledge base update")
print(f" Expected at: {self.sessions_db}")
return
db_path = self.sessions_db

conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()

try:
# Create insights table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS session_insights (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
insight_type TEXT NOT NULL,
content TEXT NOT NULL,
confidence REAL,
extracted_at TEXT NOT NULL,
UNIQUE(session_id, insight_type, content)
)
""")

# Insert patterns
for pattern in insights.get("patterns", []):
cursor.execute("""
INSERT OR IGNORE INTO session_insights
(session_id, insight_type, content, confidence, extracted_at)
VALUES (?, ?, ?, ?, ?)
""", (
session_id,
"pattern",
json.dumps(pattern),
pattern.get("confidence", 0.5),
insights["extracted_at"]
))

# Insert decisions
for decision in insights.get("decisions", []):
cursor.execute("""
INSERT OR IGNORE INTO session_insights
(session_id, insight_type, content, confidence, extracted_at)
VALUES (?, ?, ?, ?, ?)
""", (
session_id,
"decision",
json.dumps(decision),
decision.get("confidence", 0.5),
insights["extracted_at"]
))

# Insert error solutions
for error in insights.get("errors", []):
cursor.execute("""
INSERT OR IGNORE INTO session_insights
(session_id, insight_type, content, confidence, extracted_at)
VALUES (?, ?, ?, ?, ?)
""", (
session_id,
"error_solution",
json.dumps(error),
error.get("confidence", 0.5),
insights["extracted_at"]
))

conn.commit()
print(f" ✅ Updated knowledge base with {len(insights.get('patterns', []))} patterns, "
f"{len(insights.get('decisions', []))} decisions, "
f"{len(insights.get('errors', []))} error solutions")

except Exception as e:
print(f" ❌ Failed to update knowledge base: {e}")
conn.rollback()
finally:
conn.close()

async def _update_task_status(self, task_id: str, status: str, insights: Dict[str, Any]):
"""Update task status tracking."""
# Load existing task status
task_status = {}
if self.task_status_file.exists():
with open(self.task_status_file, 'r') as f:
task_status = json.load(f)

# Update task
task_status[task_id] = {
"status": status,
"updated_at": datetime.now(timezone.utc).isoformat(),
"insights_summary": {
"patterns_found": len(insights.get("patterns", [])),
"decisions_made": len(insights.get("decisions", [])),
"files_modified": len(insights.get("file_modifications", []))
}
}

# Save updated status
with open(self.task_status_file, 'w') as f:
json.dump(task_status, f, indent=2)

print(f" ✅ Task {task_id} marked as {status}")

async def _store_session_summary(self, session_id: str, insights: Dict[str, Any], transcript: str):
"""Store session summary with insights."""
# Load existing insights
all_insights = {}
if self.insights_file.exists():
with open(self.insights_file, 'r') as f:
all_insights = json.load(f)

# Create summary
max_length = self.config["session_summary"]["max_length"]
summary = {
"session_id": session_id,
"summary": transcript[:max_length] + "..." if len(transcript) > max_length else transcript,
"insights": insights,
"stored_at": datetime.now(timezone.utc).isoformat()
}

# Add metrics if configured
if self.config["session_summary"]["include_metrics"]:
summary["metrics"] = insights.get("metrics", {})

# Store summary
all_insights[session_id] = summary

# Save to file
with open(self.insights_file, 'w') as f:
json.dump(all_insights, f, indent=2)

print(f" ✅ Session summary stored")

async def _append_to_unified_messages(self, session_id: str, insights: Dict[str, Any]):
"""Append session insights to unified messages JSONL."""
jsonl_path = self.context_dir / "unified_messages.jsonl"

# Create message entry
message = {
"role": "system",
"content": f"Session {session_id} insights: {json.dumps(insights['metrics'])}",
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": session_id,
"metadata": {
"type": "session_insights",
"patterns_count": len(insights.get("patterns", [])),
"decisions_count": len(insights.get("decisions", [])),
"errors_count": len(insights.get("errors", []))
}
}

# Append to JSONL
with open(jsonl_path, 'a') as f:
f.write(json.dumps(message) + '\n')

print(f" ✅ Appended to unified messages")

async def process_latest_session(self) -> Optional[Dict[str, Any]]:
"""Process the most recent session automatically."""
# Find latest session file
session_files = sorted(self.context_dir.glob("session_*.txt"), reverse=True)

if not session_files:
print("❌ No session files found")
return None

latest_session = session_files[0]
session_id = latest_session.stem.replace("session_", "")

print(f"📂 Processing latest session: {latest_session.name}")

with open(latest_session, 'r') as f:
transcript = f.read()

return await self.process_session(transcript, session_id=session_id)

async def main(): """Main entry point for post-session processor.""" parser = argparse.ArgumentParser( description="Post-Session Processing System for CODITECT-core", formatter_class=argparse.RawDescriptionHelpFormatter )

parser.add_argument(
"--session-id",
help="Session ID to process"
)

parser.add_argument(
"--transcript",
help="Path to session transcript file"
)

parser.add_argument(
"--auto",
action="store_true",
help="Automatically process latest session"
)

parser.add_argument(
"--task-id",
help="Optional task ID to associate with session"
)

parser.add_argument(
"--root",
default=".",
help="CODITECT root directory (default: current directory)"
)

args = parser.parse_args()

# Initialize processor
root = Path(args.root).resolve()
processor = PostSessionProcessor(root)

start_time = datetime.now()

try:
if args.auto:
# Process latest session
results = await processor.process_latest_session()
elif args.transcript:
# Process from transcript file
transcript_path = Path(args.transcript)
if not transcript_path.exists():
print(f"❌ Transcript file not found: {transcript_path}")
sys.exit(1)

with open(transcript_path, 'r') as f:
transcript = f.read()

session_id = args.session_id or transcript_path.stem
results = await processor.process_session(
transcript,
session_id=session_id,
task_id=args.task_id
)
else:
print("❌ Must specify --auto, --transcript, or --session-id")
parser.print_help()
sys.exit(1)

# Calculate duration
duration = (datetime.now() - start_time).total_seconds()

if results:
results["insights"]["metrics"]["processing_duration_ms"] = int(duration * 1000)

# Print summary
print("\n" + "="*60)
print("📊 POST-SESSION PROCESSING SUMMARY")
print("="*60)
print(f"Session ID: {results['session_id']}")
print(f"Status: {results['status']}")
print(f"Processing Duration: {duration:.2f}s")
print(f"\nInsights Extracted:")
print(f" - Patterns: {len(results['insights'].get('patterns', []))}")
print(f" - Decisions: {len(results['insights'].get('decisions', []))}")
print(f" - Gotchas: {len(results['insights'].get('gotchas', []))}")
print(f" - Error Solutions: {len(results['insights'].get('errors', []))}")
print(f" - File Modifications: {len(results['insights'].get('file_modifications', []))}")
print("="*60)

sys.exit(0 if results['status'] == 'completed' else 1)
else:
sys.exit(1)

except Exception as e:
print(f"\n❌ Processing failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

if name == "main": asyncio.run(main())