scripts-session-resume
#!/usr/bin/env python3 """
title: Session Resume Optimizer component_type: script version: 1.0.0 status: active summary: "J.8.4: Optimized session resume context generation from session chains" keywords: [session, resume, context, optimization, memory, adr-118] track: J task_id: J.8.4 adr_references: [ADR-118, ADR-151] created: 2026-02-04 author: Claude (Opus 4.5)
Session Resume Optimizer - J.8.4 Implementation
Generates optimized context for resuming work from a previous session by:
- Finding the session chain (via J.8.3 SessionLinker)
- Extracting key context: decisions, last messages, pending work
- Summarizing previous session accomplishments
- Generating formatted resume context for prompt injection
Usage: from scripts.context_graph.session_resume import SessionResumeOptimizer
optimizer = SessionResumeOptimizer()
resume = optimizer.generate_resume_context(
session_id="abc123",
max_chain_depth=5,
include_decisions=True,
include_pending_tasks=True
)
print(resume.formatted_context) # Markdown for prompt injection
print(resume.token_estimate) # Estimated tokens
"""
import json import logging import re import sqlite3 from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
Setup logging
logger = logging.getLogger(name)
Import paths
import sys _script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))
Import session linker (J.8.3)
try: from scripts.context_graph.session_linker import SessionLinker, SessionLink SESSION_LINKER_AVAILABLE = True except ImportError: SESSION_LINKER_AVAILABLE = False logger.warning("SessionLinker not available")
Import paths (ADR-118)
try: from scripts.core.paths import get_sessions_db_path, get_org_db_path PATHS_AVAILABLE = True except ImportError: PATHS_AVAILABLE = False logger.warning("paths module not available")
=============================================================================
Data Classes
=============================================================================
@dataclass class SessionSummary: """Summary of a single session's work.""" session_id: str start_time: Optional[str] = None end_time: Optional[str] = None message_count: int = 0 tool_calls: int = 0 key_topics: List[str] = field(default_factory=list) decisions_made: List[Dict[str, Any]] = field(default_factory=list) tasks_completed: List[str] = field(default_factory=list) tasks_pending: List[str] = field(default_factory=list) last_messages: List[Dict[str, Any]] = field(default_factory=list) errors_encountered: List[str] = field(default_factory=list)
@dataclass class ResumeContext: """Optimized context for resuming a session.""" target_session: str chain_sessions: List[str] chain_depth: int summaries: List[SessionSummary] formatted_context: str token_estimate: int generated_at: str metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"target_session": self.target_session,
"chain_sessions": self.chain_sessions,
"chain_depth": self.chain_depth,
"summaries": [
{
"session_id": s.session_id,
"start_time": s.start_time,
"end_time": s.end_time,
"message_count": s.message_count,
"tool_calls": s.tool_calls,
"key_topics": s.key_topics,
"decisions_made": s.decisions_made,
"tasks_completed": s.tasks_completed,
"tasks_pending": s.tasks_pending,
"errors_encountered": s.errors_encountered,
}
for s in self.summaries
],
"formatted_context": self.formatted_context,
"token_estimate": self.token_estimate,
"generated_at": self.generated_at,
"metadata": self.metadata,
}
=============================================================================
Session Resume Optimizer
=============================================================================
class SessionResumeOptimizer: """ Generates optimized context for resuming work from previous sessions.
Uses J.8.3 session linking to find related sessions and extracts
key context including decisions, pending tasks, and recent work.
"""
def __init__(
self,
sessions_db_path: Optional[Path] = None,
org_db_path: Optional[Path] = None
):
"""
Initialize the resume optimizer.
Args:
sessions_db_path: Path to sessions.db (Tier 3)
org_db_path: Path to org.db (Tier 2, for decisions)
"""
if PATHS_AVAILABLE:
self.sessions_db = sessions_db_path or get_sessions_db_path()
self.org_db = org_db_path or get_org_db_path()
else:
# Fallback paths
data_dir = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
self.sessions_db = sessions_db_path or data_dir / "sessions.db"
self.org_db = org_db_path or data_dir / "org.db"
# Initialize session linker if available
self.linker: Optional[SessionLinker] = None
if SESSION_LINKER_AVAILABLE and self.sessions_db.exists():
self.linker = SessionLinker(self.sessions_db)
def generate_resume_context(
self,
session_id: str,
max_chain_depth: int = 5,
include_decisions: bool = True,
include_pending_tasks: bool = True,
include_errors: bool = True,
last_messages_count: int = 5,
token_budget: int = 4000
) -> ResumeContext:
"""
Generate optimized resume context for a session.
Args:
session_id: Session to generate resume context for
max_chain_depth: Maximum depth to traverse session chain
include_decisions: Include decisions from org.db
include_pending_tasks: Extract pending tasks from messages
include_errors: Include errors and their solutions
last_messages_count: Number of last messages to include
token_budget: Target token budget for formatted context
Returns:
ResumeContext with formatted context and metadata
"""
# Get session chain
chain_sessions = self._get_session_chain(session_id, max_chain_depth)
# Generate summaries for each session in chain
summaries = []
for sid in chain_sessions:
summary = self._summarize_session(
sid,
include_decisions=include_decisions,
include_pending_tasks=include_pending_tasks,
include_errors=include_errors,
last_messages_count=last_messages_count
)
summaries.append(summary)
# Generate formatted context
formatted = self._format_resume_context(summaries, token_budget)
# Estimate tokens (rough: ~4 chars per token)
token_estimate = len(formatted) // 4
return ResumeContext(
target_session=session_id,
chain_sessions=chain_sessions,
chain_depth=len(chain_sessions),
summaries=summaries,
formatted_context=formatted,
token_estimate=token_estimate,
generated_at=datetime.now(timezone.utc).isoformat(),
metadata={
"include_decisions": include_decisions,
"include_pending_tasks": include_pending_tasks,
"include_errors": include_errors,
"token_budget": token_budget,
}
)
def _get_session_chain(self, session_id: str, max_depth: int) -> List[str]:
"""Get session chain using J.8.3 linker or fallback to direct query."""
if self.linker:
# Use J.8.3 session linker
chain = self.linker.get_full_chain(session_id, relationship='continues')
if chain:
return chain[:max_depth]
# Fallback: just return the target session
return [session_id]
def _summarize_session(
self,
session_id: str,
include_decisions: bool,
include_pending_tasks: bool,
include_errors: bool,
last_messages_count: int
) -> SessionSummary:
"""Generate a summary for a single session."""
summary = SessionSummary(session_id=session_id)
if not self.sessions_db.exists():
return summary
try:
conn = sqlite3.connect(str(self.sessions_db))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get session time range and message count
cursor.execute("""
SELECT
MIN(timestamp) as start_time,
MAX(timestamp) as end_time,
COUNT(*) as message_count,
SUM(CASE WHEN tool_use IS NOT NULL AND tool_use != '' THEN 1 ELSE 0 END) as tool_calls
FROM messages
WHERE session_id = ?
""", (session_id,))
row = cursor.fetchone()
if row:
summary.start_time = row['start_time']
summary.end_time = row['end_time']
summary.message_count = row['message_count'] or 0
summary.tool_calls = row['tool_calls'] or 0
# Get last messages
cursor.execute("""
SELECT role, content, tool_use, timestamp
FROM messages
WHERE session_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (session_id, last_messages_count))
for row in cursor.fetchall():
summary.last_messages.append({
"role": row['role'],
"content": (row['content'] or "")[:500], # Truncate
"tool_use": row['tool_use'],
"timestamp": row['timestamp']
})
summary.last_messages.reverse() # Chronological order
# Extract key topics from messages
summary.key_topics = self._extract_topics(cursor, session_id)
# Extract pending tasks
if include_pending_tasks:
summary.tasks_pending = self._extract_pending_tasks(cursor, session_id)
summary.tasks_completed = self._extract_completed_tasks(cursor, session_id)
# Extract errors
if include_errors:
summary.errors_encountered = self._extract_errors(cursor, session_id)
conn.close()
# Get decisions from org.db
if include_decisions:
summary.decisions_made = self._get_session_decisions(session_id)
except Exception as e:
logger.error(f"Error summarizing session {session_id}: {e}")
return summary
def _extract_topics(self, cursor: sqlite3.Cursor, session_id: str) -> List[str]:
"""Extract key topics from session messages."""
topics = set()
# Look for task IDs (e.g., J.8.4, A.9.1.1)
cursor.execute("""
SELECT content FROM messages
WHERE session_id = ? AND role = 'assistant' AND content IS NOT NULL
LIMIT 50
""", (session_id,))
task_pattern = re.compile(r'\b([A-Z]\.\d+(?:\.\d+)*)\b')
for row in cursor.fetchall():
content = row[0] or ""
# Extract task IDs
matches = task_pattern.findall(content)
for match in matches[:5]: # Limit per message
topics.add(f"Task {match}")
return list(topics)[:10] # Limit total topics
def _extract_pending_tasks(self, cursor: sqlite3.Cursor, session_id: str) -> List[str]:
"""Extract pending/incomplete tasks from session."""
pending = []
cursor.execute("""
SELECT content FROM messages
WHERE session_id = ? AND role = 'assistant' AND content IS NOT NULL
ORDER BY timestamp DESC
LIMIT 20
""", (session_id,))
# Patterns for pending work
pending_patterns = [
r'- \[ \] (.+)', # Unchecked checkbox
r'(?:TODO|FIXME|PENDING)[:\s]+(.+)', # TODO markers
r'(?:Next|remaining|still need)[:\s]+(.+)', # Next steps
]
for row in cursor.fetchall():
content = row[0] or ""
for pattern in pending_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
task = match.strip()[:100] # Truncate
if task and task not in pending:
pending.append(task)
if len(pending) >= 10:
return pending
return pending
def _extract_completed_tasks(self, cursor: sqlite3.Cursor, session_id: str) -> List[str]:
"""Extract completed tasks from session."""
completed = []
cursor.execute("""
SELECT content FROM messages
WHERE session_id = ? AND role = 'assistant' AND content IS NOT NULL
ORDER BY timestamp DESC
LIMIT 30
""", (session_id,))
# Patterns for completed work
completed_patterns = [
r'- \[x\] (.+)', # Checked checkbox
r'✅\s*(.+)', # Checkmark emoji
r'(?:Completed|Done|Finished)[:\s]+(.+)', # Completion markers
]
for row in cursor.fetchall():
content = row[0] or ""
for pattern in completed_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
task = match.strip()[:100]
if task and task not in completed:
completed.append(task)
if len(completed) >= 10:
return completed
return completed
def _extract_errors(self, cursor: sqlite3.Cursor, session_id: str) -> List[str]:
"""Extract errors encountered in session."""
errors = []
cursor.execute("""
SELECT content FROM messages
WHERE session_id = ? AND content IS NOT NULL
AND (content LIKE '%error%' OR content LIKE '%Error%'
OR content LIKE '%failed%' OR content LIKE '%exception%')
LIMIT 10
""", (session_id,))
error_patterns = [
r'(?:Error|Exception|Failed)[:\s]+(.{20,100})',
r'fatal:\s+(.{20,100})',
]
for row in cursor.fetchall():
content = row[0] or ""
for pattern in error_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
error = match.strip()[:100]
if error and error not in errors:
errors.append(error)
if len(errors) >= 5:
return errors
return errors
def _get_session_decisions(self, session_id: str) -> List[Dict[str, Any]]:
"""Get decisions made in session from org.db."""
decisions = []
if not self.org_db.exists():
return decisions
try:
conn = sqlite3.connect(str(self.org_db))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT decision_type, title, rationale, outcome, created_at
FROM decisions
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 5
""", (session_id,))
for row in cursor.fetchall():
decisions.append({
"type": row['decision_type'],
"title": row['title'],
"rationale": (row['rationale'] or "")[:200],
"outcome": row['outcome'],
"created_at": row['created_at']
})
conn.close()
except Exception as e:
logger.debug(f"Could not get decisions: {e}")
return decisions
def _format_resume_context(
self,
summaries: List[SessionSummary],
token_budget: int
) -> str:
"""Format summaries into markdown context for prompt injection."""
lines = []
lines.append("## Session Resume Context")
lines.append("")
lines.append(f"*Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*")
lines.append(f"*Sessions in chain: {len(summaries)}*")
lines.append("")
# Most recent session first (reversed order for context)
for i, summary in enumerate(reversed(summaries)):
if i == 0:
lines.append("### Previous Session (Most Recent)")
else:
lines.append(f"### Session {i+1} Back")
lines.append(f"**Session ID:** `{summary.session_id[:12]}...`")
if summary.start_time:
lines.append(f"**Time:** {summary.start_time}")
lines.append(f"**Messages:** {summary.message_count} | **Tool Calls:** {summary.tool_calls}")
lines.append("")
# Key topics
if summary.key_topics:
lines.append("**Topics:** " + ", ".join(summary.key_topics[:5]))
lines.append("")
# Decisions
if summary.decisions_made:
lines.append("**Decisions Made:**")
for dec in summary.decisions_made[:3]:
lines.append(f"- [{dec.get('type', 'decision')}] {dec.get('title', 'Untitled')}")
lines.append("")
# Completed tasks
if summary.tasks_completed:
lines.append("**Completed:**")
for task in summary.tasks_completed[:5]:
lines.append(f"- ✅ {task}")
lines.append("")
# Pending tasks
if summary.tasks_pending:
lines.append("**Pending/Next:**")
for task in summary.tasks_pending[:5]:
lines.append(f"- [ ] {task}")
lines.append("")
# Errors (only for most recent)
if i == 0 and summary.errors_encountered:
lines.append("**Errors Encountered:**")
for err in summary.errors_encountered[:3]:
lines.append(f"- ⚠️ {err}")
lines.append("")
# Last work (only for most recent session)
if i == 0 and summary.last_messages:
lines.append("**Last Messages:**")
for msg in summary.last_messages[-3:]:
role = msg.get('role', 'unknown')
content = msg.get('content', '')[:150]
if content:
lines.append(f"- [{role}] {content}...")
lines.append("")
lines.append("---")
lines.append("")
# Check token budget (rough estimate)
current_tokens = len("\n".join(lines)) // 4
if current_tokens > token_budget * 0.8:
lines.append("*[Additional sessions truncated for token budget]*")
break
return "\n".join(lines)
def get_quick_resume(self, session_id: str) -> str:
"""
Generate a quick, minimal resume context.
Returns a brief summary suitable for session continuation.
"""
resume = self.generate_resume_context(
session_id=session_id,
max_chain_depth=2,
include_decisions=True,
include_pending_tasks=True,
include_errors=False,
last_messages_count=3,
token_budget=1500
)
return resume.formatted_context
=============================================================================
Output Formatters
=============================================================================
def format_resume_output(resume: ResumeContext, output_format: str = "markdown") -> str: """Format resume context for output.""" if output_format == "json": return json.dumps(resume.to_dict(), indent=2, default=str) elif output_format == "brief": lines = [] lines.append(f"Session: {resume.target_session[:12]}...") lines.append(f"Chain depth: {resume.chain_depth}") lines.append(f"Token estimate: {resume.token_estimate}") lines.append("") # Just topics and pending for s in resume.summaries[:1]: if s.key_topics: lines.append(f"Topics: {', '.join(s.key_topics[:5])}") if s.tasks_pending: lines.append(f"Pending: {len(s.tasks_pending)} tasks") return "\n".join(lines) else: return resume.formatted_context
def format_resume_help() -> str: """Return help text for session resume feature.""" return """Session Resume Optimization (J.8.4)
COMMANDS: --resume SESSION Generate optimized resume context for SESSION --resume-quick Generate minimal resume (fewer tokens) --resume-chain N Maximum chain depth to traverse (default: 5) --resume-tokens N Token budget for formatted output (default: 4000)
EXAMPLES:
Full resume context for a session
/cxq --resume abc123
Quick resume (minimal tokens)
/cxq --resume abc123 --resume-quick
Limit chain depth
/cxq --resume abc123 --resume-chain 3
JSON output
/cxq --resume abc123 --format json
WHAT'S INCLUDED:
- Session chain (linked via J.8.3)
- Key topics and task IDs
- Decisions made (from org.db)
- Completed tasks
- Pending/next tasks
- Last messages (truncated)
- Errors encountered
USE CASE: When resuming work in a new session, use --resume to get context from the previous session chain. This provides:
- What was accomplished
- What's still pending
- Key decisions made
- Recent context for continuity """
=============================================================================
CLI Integration
=============================================================================
if name == "main": import argparse
parser = argparse.ArgumentParser(
description="Session Resume Optimizer - Generate optimized resume context"
)
parser.add_argument("session_id", nargs="?", help="Session ID to resume from")
parser.add_argument("--chain-depth", type=int, default=5, help="Max chain depth")
parser.add_argument("--token-budget", type=int, default=4000, help="Token budget")
parser.add_argument("--quick", action="store_true", help="Quick/minimal resume")
parser.add_argument("--format", choices=["markdown", "json", "brief"], default="markdown")
parser.add_argument("--help-resume", action="store_true", help="Show resume help")
args = parser.parse_args()
if args.help_resume or not args.session_id:
print(format_resume_help())
sys.exit(0)
optimizer = SessionResumeOptimizer()
if args.quick:
output = optimizer.get_quick_resume(args.session_id)
print(output)
else:
resume = optimizer.generate_resume_context(
session_id=args.session_id,
max_chain_depth=args.chain_depth,
token_budget=args.token_budget
)
print(format_resume_output(resume, args.format))