scripts-memory-retrieval
#!/usr/bin/env python3 """
title: "Configuration" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Memory Retrieval Script - Intelligent Long-Term Memory for CODITECT" keywords: ['api', 'database', 'memory', 'optimization', 'retrieval'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "memory-retrieval.py" language: python executable: true usage: "python3 scripts/memory-retrieval.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Memory Retrieval Script - Intelligent Long-Term Memory for CODITECT
Retrieves relevant prior work context from the unified message database with signal-to-noise optimization and token-efficient summarization.
Part of the CODITECT anti-forgetting system.
Usage: python3 scripts/memory-retrieval.py "topic" python3 scripts/memory-retrieval.py --status python3 scripts/memory-retrieval.py --decisions "authentication" python3 scripts/memory-retrieval.py --patterns --language python python3 scripts/memory-retrieval.py --errors "TypeError" python3 scripts/memory-retrieval.py --deep "API design" --budget 5000
Version: 1.0.0 """
import argparse import json import os import subprocess import sys from datetime import datetime, timedelta from pathlib import Path from typing import Optional
Configuration
DEFAULT_BUDGET = 2000 MINIMAL_BUDGET = 500 COMPREHENSIVE_BUDGET = 5000 FULL_BUDGET = 10000
HIGH_RELEVANCE = 0.8 MEDIUM_RELEVANCE = 0.5 MIN_RELEVANCE = 0.3
FRESHNESS_TODAY = 1.0 FRESHNESS_WEEK = 0.8 FRESHNESS_MONTH = 0.5 FRESHNESS_OLD = 0.3
Paths
SCRIPT_DIR = Path(file).parent CONTEXT_DB_SCRIPT = SCRIPT_DIR / "context-db.py"
ADR-114 & ADR-118: Use centralized path discovery for user data
sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import ( get_context_storage_dir, get_org_db_path, get_sessions_db_path, ORG_DB, SESSIONS_DB, ) CONTEXT_STORAGE = get_context_storage_dir() PATHS_AVAILABLE = True except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): CONTEXT_STORAGE = _user_data else: CONTEXT_STORAGE = SCRIPT_DIR.parent / "context-storage" ORG_DB = CONTEXT_STORAGE / "org.db" SESSIONS_DB = CONTEXT_STORAGE / "sessions.db" PATHS_AVAILABLE = False
NOTE: context.db is DEPRECATED - NO FALLBACK per ADR-118
def run_cxq(args: list[str], capture: bool = True) -> tuple[int, str, str]: """Run the context-db.py script with given arguments.""" cmd = [sys.executable, str(CONTEXT_DB_SCRIPT)] + args
if capture:
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode, result.stdout, result.stderr
else:
result = subprocess.run(cmd)
return result.returncode, "", ""
def check_database() -> bool: """Check if the context database exists (ADR-118 four-tier architecture).
Returns True if sessions.db or org.db exists.
NOTE: context.db is DEPRECATED - NO FALLBACK per ADR-118.
"""
# ADR-118: Only check for four-tier databases, NO FALLBACK to context.db
return SESSIONS_DB.exists() or ORG_DB.exists()
def estimate_tokens(text: str) -> int: """Rough token estimation (4 chars per token average).""" return len(text) // 4
def truncate_to_budget(text: str, budget: int) -> str: """Truncate text to fit within token budget.""" estimated = estimate_tokens(text) if estimated <= budget: return text
# Truncate to approximate budget
char_limit = budget * 4
truncated = text[:char_limit]
# Find last complete line
last_newline = truncated.rfind('\n')
if last_newline > char_limit * 0.8:
truncated = truncated[:last_newline]
return truncated + "\n\n[... truncated to fit token budget ...]"
def format_work_status(items: list[dict]) -> str: """Format work status items.""" if not items: return "No active work items found.\n"
lines = ["**Work Status:**"]
# Group by status
in_progress = [i for i in items if i.get('status') == 'in_progress']
blocked = [i for i in items if i.get('status') == 'blocked']
completed = [i for i in items if i.get('status') == 'completed']
for item in in_progress[:5]:
lines.append(f"- [IN_PROGRESS] {item.get('title', 'Unknown')} - {item.get('progress', '?')}%")
for item in blocked[:3]:
lines.append(f"- [BLOCKED] {item.get('title', 'Unknown')} - {item.get('reason', 'Unknown reason')}")
for item in completed[:3]:
lines.append(f"- [COMPLETED] {item.get('title', 'Unknown')} ({item.get('date', 'Unknown date')})")
return '\n'.join(lines) + '\n'
def format_decisions(items: list[dict]) -> str: """Format decision items.""" if not items: return "No relevant decisions found.\n"
lines = ["**Relevant Decisions:**"]
for i, item in enumerate(items[:5], 1):
summary = item.get('summary', 'Unknown decision')
source = item.get('source', '')
relevance = item.get('relevance', 0)
source_str = f" ({source})" if source else ""
lines.append(f"{i}. {summary}{source_str}")
if item.get('rationale'):
lines.append(f" - Rationale: {item['rationale'][:100]}...")
return '\n'.join(lines) + '\n'
def format_patterns(items: list[dict]) -> str: """Format code pattern items.""" if not items: return "No applicable patterns found.\n"
lines = ["**Applicable Patterns:**"]
for item in items[:5]:
name = item.get('name', 'Unknown pattern')
language = item.get('language', '')
desc = item.get('description', '')
lang_str = f" ({language})" if language else ""
lines.append(f"- {name}{lang_str}")
if desc:
lines.append(f" {desc[:80]}...")
return '\n'.join(lines) + '\n'
def format_errors(items: list[dict]) -> str: """Format error-solution items.""" if not items: return "No known issues found.\n"
lines = ["**Known Issues:**"]
for item in items[:3]:
error = item.get('error', 'Unknown error')
solution = item.get('solution', 'No solution recorded')
lines.append(f"- {error[:60]}")
lines.append(f" Solution: {solution[:80]}...")
return '\n'.join(lines) + '\n'
def retrieve_recall(topic: str, limit: int = 20) -> dict: """Retrieve RAG-style recall for a topic.""" code, stdout, stderr = run_cxq(['--recall', topic, '--limit', str(limit), '--json'])
if code != 0:
return {'error': stderr or 'Recall failed'}
try:
return json.loads(stdout)
except json.JSONDecodeError:
# Return raw output if not JSON
return {'raw': stdout}
def retrieve_decisions(topic: Optional[str] = None, limit: int = 10) -> list[dict]: """Retrieve decisions, optionally filtered by topic.""" args = ['--decisions', '--limit', str(limit), '--json'] if topic: args = [topic] + args
code, stdout, stderr = run_cxq(args)
if code != 0:
return []
try:
data = json.loads(stdout)
return data if isinstance(data, list) else []
except json.JSONDecodeError:
return []
def retrieve_patterns(language: Optional[str] = None, limit: int = 5) -> list[dict]: """Retrieve code patterns, optionally filtered by language.""" args = ['--patterns', '--limit', str(limit), '--json'] if language: args.extend(['--language', language])
code, stdout, stderr = run_cxq(args)
if code != 0:
return []
try:
data = json.loads(stdout)
return data if isinstance(data, list) else []
except json.JSONDecodeError:
return []
def retrieve_errors(error_type: Optional[str] = None, limit: int = 5) -> list[dict]: """Retrieve error-solution pairs.""" args = ['--errors', '--limit', str(limit), '--json'] if error_type: args = [error_type] + args
code, stdout, stderr = run_cxq(args)
if code != 0:
return []
try:
data = json.loads(stdout)
return data if isinstance(data, list) else []
except json.JSONDecodeError:
return []
def retrieve_recent(limit: int = 20, role: Optional[str] = None) -> list[dict]: """Retrieve recent messages.""" args = ['--recent', str(limit), '--json'] if role: args.extend(['--role', role])
code, stdout, stderr = run_cxq(args)
if code != 0:
return []
try:
data = json.loads(stdout)
return data if isinstance(data, list) else []
except json.JSONDecodeError:
return []
def extract_work_status(messages: list[dict]) -> list[dict]: """Extract work status from recent messages.""" # This is a simplified extraction - in production, this would use # more sophisticated NLP to identify work items work_items = []
status_keywords = {
'in_progress': ['working on', 'implementing', 'in progress', 'continuing'],
'blocked': ['blocked', 'waiting for', 'stuck on', 'need'],
'completed': ['completed', 'finished', 'done', 'implemented']
}
for msg in messages:
content = msg.get('content', '').lower()
for status, keywords in status_keywords.items():
for keyword in keywords:
if keyword in content:
# Extract a title (simplified)
title = content[:50].replace('\n', ' ')
work_items.append({
'title': title,
'status': status,
'date': msg.get('timestamp', 'Unknown'),
'progress': 50 if status == 'in_progress' else (100 if status == 'completed' else 0)
})
break
return work_items[:10] # Limit to 10 items
def generate_summary(topic: str, budget: int, detailed: bool = False) -> str: """Generate a context summary for a topic.""" sections = [] current_tokens = 0
# Header
header = f"## Prior Context Summary\n\n"
sections.append(header)
current_tokens += estimate_tokens(header)
# Retrieve all context types
recall_data = retrieve_recall(topic, limit=20)
decisions = retrieve_decisions(topic, limit=10)
patterns = retrieve_patterns(limit=5)
errors = retrieve_errors(topic, limit=5)
recent = retrieve_recent(limit=30)
work_status = extract_work_status(recent)
# Format sections within budget
remaining = budget - current_tokens
# Work status (priority)
if work_status and remaining > 200:
status_text = format_work_status(work_status)
sections.append(status_text)
current_tokens += estimate_tokens(status_text)
remaining = budget - current_tokens
# Decisions (high priority)
if decisions and remaining > 300:
decisions_text = format_decisions(decisions)
sections.append(decisions_text)
current_tokens += estimate_tokens(decisions_text)
remaining = budget - current_tokens
# Patterns (medium priority)
if patterns and remaining > 200:
patterns_text = format_patterns(patterns)
sections.append(patterns_text)
current_tokens += estimate_tokens(patterns_text)
remaining = budget - current_tokens
# Errors (medium priority)
if errors and remaining > 200:
errors_text = format_errors(errors)
sections.append(errors_text)
current_tokens += estimate_tokens(errors_text)
# Join and add token count
output = '\n'.join(sections)
output += f"\n---\n*Token count: ~{current_tokens} / {budget} budget*\n"
return output
def generate_json_output(topic: str, budget: int) -> dict: """Generate JSON output for programmatic use.""" recall_data = retrieve_recall(topic, limit=20) decisions = retrieve_decisions(topic, limit=10) patterns = retrieve_patterns(limit=5) errors = retrieve_errors(topic, limit=5) recent = retrieve_recent(limit=30) work_status = extract_work_status(recent)
output = {
'retrieval_id': f"mem_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
'topic': topic,
'timestamp': datetime.now().isoformat(),
'budget': budget,
'work_status': work_status,
'decisions': decisions,
'patterns': patterns,
'errors': errors,
'recall_data': recall_data if not recall_data.get('error') else None,
'token_estimate': 0 # Will be calculated
}
# Estimate tokens
output['token_estimate'] = estimate_tokens(json.dumps(output))
return output
def main(): parser = argparse.ArgumentParser( description='Memory Retrieval - Intelligent long-term memory for CODITECT', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 memory-retrieval.py "authentication" python3 memory-retrieval.py --status python3 memory-retrieval.py --decisions "database" python3 memory-retrieval.py --patterns --language python python3 memory-retrieval.py --deep "API design" --budget 5000 python3 memory-retrieval.py "topic" --json """ )
# Positional
parser.add_argument('topic', nargs='?', default=None,
help='Topic to retrieve context for')
# Retrieval modes
parser.add_argument('--status', action='store_true',
help='Show work status summary')
parser.add_argument('--deep', action='store_true',
help='Comprehensive retrieval (5000 token budget)')
parser.add_argument('--decisions', action='store_true',
help='Retrieve decisions')
parser.add_argument('--patterns', action='store_true',
help='Retrieve code patterns')
parser.add_argument('--errors', action='store_true',
help='Retrieve error-solution pairs')
parser.add_argument('--blockers', action='store_true',
help='Show blocked items')
parser.add_argument('--wip', action='store_true',
help='Show work-in-progress items')
# Filters
parser.add_argument('--language', type=str,
help='Filter patterns by language')
parser.add_argument('--type', type=str,
help='Filter by type (decision type or error type)')
parser.add_argument('--since', type=str,
help='Only items since date (YYYY-MM-DD)')
parser.add_argument('--project', type=str,
help='Filter by project name')
# Output control
parser.add_argument('--budget', type=int, default=DEFAULT_BUDGET,
help=f'Token budget (default: {DEFAULT_BUDGET})')
parser.add_argument('--json', action='store_true',
help='Output as JSON')
parser.add_argument('--detailed', action='store_true',
help='Full content, no summarization')
# Info
parser.add_argument('--stats', action='store_true',
help='Show memory system statistics')
args = parser.parse_args()
# Check database exists
if not check_database():
print("Error: Memory system not initialized.")
print("Run '/cx' first to build the context database.")
sys.exit(1)
# Adjust budget for deep mode
if args.deep:
args.budget = COMPREHENSIVE_BUDGET
# Handle different modes
if args.stats:
# Pass through to context-db.py
run_cxq(['--stats'], capture=False)
return
if args.status:
recent = retrieve_recent(limit=50)
work_status = extract_work_status(recent)
if args.json:
print(json.dumps(work_status, indent=2))
else:
print("## Work Status Summary\n")
print(format_work_status(work_status))
return
if args.blockers:
recent = retrieve_recent(limit=50)
work_status = extract_work_status(recent)
blocked = [w for w in work_status if w.get('status') == 'blocked']
if args.json:
print(json.dumps(blocked, indent=2))
else:
print("## Blocked Items\n")
if blocked:
for item in blocked:
print(f"- {item.get('title', 'Unknown')}")
print(f" Reason: {item.get('reason', 'Unknown')}")
else:
print("No blocked items found.")
return
if args.wip:
recent = retrieve_recent(limit=50)
work_status = extract_work_status(recent)
wip = [w for w in work_status if w.get('status') == 'in_progress']
if args.json:
print(json.dumps(wip, indent=2))
else:
print("## Work In Progress\n")
if wip:
for item in wip:
print(f"- {item.get('title', 'Unknown')} ({item.get('progress', '?')}%)")
else:
print("No work-in-progress items found.")
return
if args.decisions:
decisions = retrieve_decisions(args.topic, limit=10)
if args.json:
print(json.dumps(decisions, indent=2))
else:
print("## Decisions\n")
print(format_decisions(decisions))
return
if args.patterns:
patterns = retrieve_patterns(args.language, limit=10)
if args.json:
print(json.dumps(patterns, indent=2))
else:
print("## Code Patterns\n")
print(format_patterns(patterns))
return
if args.errors:
errors = retrieve_errors(args.topic or args.type, limit=10)
if args.json:
print(json.dumps(errors, indent=2))
else:
print("## Error Solutions\n")
print(format_errors(errors))
return
# Default: full retrieval for topic
if not args.topic:
parser.print_help()
print("\nError: Please provide a topic or use --status/--decisions/--patterns/--errors")
sys.exit(1)
if args.json:
output = generate_json_output(args.topic, args.budget)
print(json.dumps(output, indent=2))
else:
output = generate_summary(args.topic, args.budget, args.detailed)
print(output)
if name == 'main': main()