scripts-context-restore
#!/usr/bin/env python3 """ Context Restoration Script
Gathers context for mid-session continuation after interruption. Used by /continue command to restore working state.
Usage: python3 context-restore.py [--deep] [--task TASK_ID] [--since TIME] """
import argparse import json import os import subprocess import sys from datetime import datetime, timedelta from pathlib import Path
def get_git_root(): """Get git repository root.""" try: result = subprocess.run( ["git", "rev-parse", "--show-toplevel"], capture_output=True, text=True, check=True ) return Path(result.stdout.strip()) except subprocess.CalledProcessError: return Path.cwd()
def run_context_extraction(): """Run /cx to capture any unprocessed exports before querying.""" git_root = get_git_root()
# Find the unified message extractor
extractor_paths = [
git_root / ".coditect" / "scripts" / "unified-message-extractor.py",
git_root / "scripts" / "unified-message-extractor.py",
]
extractor = None
for path in extractor_paths:
if path.exists():
extractor = path
break
if not extractor:
print("Warning: unified-message-extractor.py not found, skipping context capture", file=sys.stderr)
return False
try:
print("Running /cx to capture latest context...", file=sys.stderr)
result = subprocess.run(
["python3", str(extractor)],
capture_output=True, text=True, timeout=120
)
# Extract new message count from output
if "new" in result.stdout.lower():
# Parse output for new message count
import re
match = re.search(r'(\d+)\s+new', result.stdout)
if match:
new_count = int(match.group(1))
if new_count > 0:
print(f"Captured {new_count} new messages", file=sys.stderr)
else:
print("Context database up-to-date", file=sys.stderr)
return result.returncode == 0
except subprocess.TimeoutExpired:
print("Warning: Context extraction timed out", file=sys.stderr)
return False
except Exception as e:
print(f"Warning: Context extraction failed: {e}", file=sys.stderr)
return False
def query_recent_context(limit=50, since_minutes=60): """Query recent messages from context database.""" git_root = get_git_root() context_db = git_root / ".coditect" / "scripts" / "context-db.py"
if not context_db.exists():
# Try alternate location
context_db = git_root / "scripts" / "context-db.py"
if not context_db.exists():
return {"error": "context-db.py not found", "messages": []}
try:
result = subprocess.run(
["python3", str(context_db), "query", "--recent", str(limit), "--format", "json"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
return json.loads(result.stdout) if result.stdout.strip() else {"messages": []}
return {"error": result.stderr, "messages": []}
except Exception as e:
return {"error": str(e), "messages": []}
def query_decisions(limit=5): """Query recent decisions from context database.""" git_root = get_git_root() context_db = git_root / ".coditect" / "scripts" / "context-db.py"
if not context_db.exists():
context_db = git_root / "scripts" / "context-db.py"
if not context_db.exists():
return {"error": "context-db.py not found", "decisions": []}
try:
result = subprocess.run(
["python3", str(context_db), "query", "--decisions", "--limit", str(limit), "--format", "json"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
return json.loads(result.stdout) if result.stdout.strip() else {"decisions": []}
return {"error": result.stderr, "decisions": []}
except Exception as e:
return {"error": str(e), "decisions": []}
def get_git_status(): """Get current git status including last activity timestamp.""" try: # Modified files status = subprocess.run( ["git", "status", "--short"], capture_output=True, text=True, check=True )
# Recent commits with timestamps
log = subprocess.run(
["git", "log", "--oneline", "-5"],
capture_output=True, text=True, check=True
)
# Get last commit timestamp (human readable)
last_commit_time = subprocess.run(
["git", "log", "-1", "--format=%ci"],
capture_output=True, text=True
)
# Get last commit relative time (e.g., "2 hours ago")
last_commit_relative = subprocess.run(
["git", "log", "-1", "--format=%cr"],
capture_output=True, text=True
)
# Recent diff stats
diff = subprocess.run(
["git", "diff", "--stat", "HEAD~3..HEAD"],
capture_output=True, text=True
)
return {
"modified_files": status.stdout.strip().split("\n") if status.stdout.strip() else [],
"recent_commits": log.stdout.strip().split("\n") if log.stdout.strip() else [],
"recent_changes": diff.stdout.strip() if diff.stdout.strip() else "",
"last_commit_time": last_commit_time.stdout.strip() if last_commit_time.stdout else None,
"last_commit_relative": last_commit_relative.stdout.strip() if last_commit_relative.stdout else None
}
except Exception as e:
return {"error": str(e)}
def get_recent_files(since_minutes=None): """Get recently modified files.
If since_minutes is None, gets the most recently modified files
regardless of when they were modified (sorted by mtime).
"""
try:
if since_minutes:
# Time-bounded search
result = subprocess.run(
["find", ".", "-type", "f",
"(", "-name", "*.py", "-o", "-name", "*.md", "-o", "-name", "*.ts", "-o", "-name", "*.tsx", ")",
"-mmin", f"-{since_minutes}"],
capture_output=True, text=True, timeout=30
)
files = [f for f in result.stdout.strip().split("\n") if f and not f.startswith("./.git")]
else:
# Get all code files sorted by modification time (most recent first)
result = subprocess.run(
["find", ".", "-type", "f",
"(", "-name", "*.py", "-o", "-name", "*.md", "-o", "-name", "*.ts", "-o", "-name", "*.tsx", ")"],
capture_output=True, text=True, timeout=30
)
files = [f for f in result.stdout.strip().split("\n") if f and not f.startswith("./.git")]
# Sort by modification time
if files:
files_with_mtime = []
for f in files:
try:
mtime = os.path.getmtime(f)
files_with_mtime.append((f, mtime))
except:
pass
files_with_mtime.sort(key=lambda x: x[1], reverse=True)
files = [f[0] for f in files_with_mtime]
return files[:20] # Limit to 20 most recent
except Exception as e:
return []
def get_session_log(): """Get the most recent session log (not necessarily today's).""" git_root = get_git_root()
# Check common locations for session logs directory
log_dirs = [
git_root / "docs" / "session-logs",
git_root / ".." / ".." / ".." / "docs" / "session-logs",
]
for log_dir in log_dirs:
if log_dir.exists() and log_dir.is_dir():
# Find all session log files and get most recent
log_files = list(log_dir.glob("SESSION-LOG-*.md"))
if log_files:
# Sort by name (date-based naming means alphabetical = chronological)
log_files.sort(reverse=True)
most_recent = log_files[0]
try:
content = most_recent.read_text()
# Get last 3000 chars (recent entries)
return {
"path": str(most_recent),
"filename": most_recent.name,
"recent_content": content[-3000:] if len(content) > 3000 else content
}
except Exception as e:
return {"error": str(e)}
return {"error": "No session logs found"}
def analyze_context(context_data, decisions_data, git_data, recent_files): """Analyze gathered data to identify interruption point.""" analysis = { "last_task": None, "last_actions": [], "key_files": [], "recent_decisions": [], "suggested_next_step": None }
# Extract from context messages
messages = context_data.get("messages", [])
for msg in messages[:10]: # Look at last 10 messages
content = msg.get("content", "")
# Look for task IDs (e.g., A.9.2.6)
import re
task_match = re.search(r'[A-Z]\.\d+(?:\.\d+)*', content)
if task_match and not analysis["last_task"]:
analysis["last_task"] = task_match.group()
# Look for action descriptions
if "completed" in content.lower() or "fixed" in content.lower() or "added" in content.lower():
if len(analysis["last_actions"]) < 3:
# Extract a brief description
brief = content[:100] + "..." if len(content) > 100 else content
analysis["last_actions"].append(brief)
# Key files from git and recent modifications
git_modified = git_data.get("modified_files", [])
analysis["key_files"] = list(set(git_modified[:5] + recent_files[:5]))[:5]
# Recent decisions
decisions = decisions_data.get("decisions", [])
analysis["recent_decisions"] = [d.get("content", "")[:100] for d in decisions[:3]]
return analysis
def format_output(analysis, git_data, deep=False): """Format the restoration summary.""" output = [] output.append("## Context Restored\n")
# Show when work was last active
if git_data.get("last_commit_relative"):
output.append(f"**Last Git Activity:** {git_data['last_commit_relative']}")
if analysis["last_task"]:
output.append(f"**Last Working On:** {analysis['last_task']}")
else:
output.append("**Last Working On:** (Unable to determine - check session log)")
output.append(f"**Status:** in_progress\n")
if analysis["last_actions"]:
output.append("**Last Actions:**")
for action in analysis["last_actions"]:
output.append(f"- {action}")
output.append("")
if analysis["key_files"]:
output.append("**Key Files:**")
for f in analysis["key_files"]:
output.append(f"- `{f}`")
output.append("")
if analysis["recent_decisions"]:
output.append("**Recent Decisions:**")
for d in analysis["recent_decisions"]:
output.append(f"- {d}")
output.append("")
# Git status
if git_data.get("modified_files"):
output.append("**Uncommitted Changes:**")
for f in git_data["modified_files"][:5]:
output.append(f"- {f}")
output.append("")
output.append("**Next Step:** Continue from last action or check session log for details.\n")
output.append("---")
output.append("Ready to continue. What would you like me to do?")
return "\n".join(output)
def main(): parser = argparse.ArgumentParser(description="Restore context for session continuation") parser.add_argument("--deep", action="store_true", help="Extended context restoration") parser.add_argument("--task", type=str, help="Focus on specific task ID") parser.add_argument("--since", type=str, default=None, help="Optional time window (e.g., 1h, 30m). If not specified, finds last active work regardless of when.") parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
# Parse since time (optional - None means find last active work regardless of time)
since_minutes = None
if args.since:
if args.since.endswith("h"):
since_minutes = int(args.since[:-1]) * 60
elif args.since.endswith("m"):
since_minutes = int(args.since[:-1])
# STEP 0: Run /cx first to capture any unprocessed exports
if not args.json: # Skip in JSON mode for scripting
run_context_extraction()
# Gather context (content-based, not time-based)
print("Gathering context...", file=sys.stderr)
# Use more messages for deep mode
limit = 100 if args.deep else 50
context_data = query_recent_context(limit=limit, since_minutes=None) # Always query by count, not time
decisions_data = query_decisions(limit=5)
git_data = get_git_status()
recent_files = get_recent_files(since_minutes=since_minutes) # Optional time filter for files
session_log = get_session_log() # Gets most recent log, not just today
# Analyze
analysis = analyze_context(context_data, decisions_data, git_data, recent_files)
if args.json:
output = {
"analysis": analysis,
"git": git_data,
"recent_files": recent_files,
"session_log": session_log,
"context_count": len(context_data.get("messages", [])),
"decisions_count": len(decisions_data.get("decisions", []))
}
print(json.dumps(output, indent=2, default=str))
else:
print(format_output(analysis, git_data, deep=args.deep))
if args.deep:
print("\n**Session Log (recent):**")
if session_log.get("recent_content"):
print(f"```\n{session_log['recent_content'][-500:]}\n```")
if name == "main": main()