scripts-intent-context-manager
#!/usr/bin/env python3 """
title: Intent Context Manager component_type: script version: 1.0.0 audience: contributor status: active summary: Detects context switches, finds matching plans, prevents plan proliferation keywords:
- intent
- context-switch
- plan-management
- proliferation-guard tokens: ~3500 created: 2026-01-04 updated: 2026-01-04
Intent Context Manager - Handles project/task context switching
Key Responsibilities:
- DETECT context switches in user input
- SEARCH for existing plans matching new context
- ASK for clarification when unclear (never assume)
- GUARD against plan/tasklist proliferation
Usage: from intent_context_manager import IntentContextManager
mgr = IntentContextManager(coditect_root)
# Check if user input indicates context switch
result = mgr.analyze_intent(user_message, current_plan_id)
if result.is_context_switch:
if result.matching_plans:
# Found existing plan(s)
print(f"Found: {result.matching_plans}")
else:
# Need to ask user
print(result.clarification_prompt)
"""
import re import sqlite3 from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from difflib import SequenceMatcher
@dataclass class ContextAnalysisResult: """Result of intent/context analysis.""" is_context_switch: bool = False confidence: float = 0.0 detected_project: Optional[str] = None detected_task_type: Optional[str] = None matching_plans: List[Dict[str, Any]] = field(default_factory=list) matching_tasks: List[Dict[str, Any]] = field(default_factory=list) needs_clarification: bool = False clarification_prompt: Optional[str] = None suggested_action: Optional[str] = None # add_to_existing, create_new, search_more
class IntentContextManager: """Manages intent detection and context switching."""
def __init__(self, coditect_root: Path):
self.root = coditect_root
# ADR-114 & ADR-118: Use centralized path discovery
try:
from paths import get_sessions_db_path, SESSIONS_DB
self.db_path = SESSIONS_DB # Session context goes to sessions.db (Tier 3)
except ImportError:
# Fallback for backward compatibility
_user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if _user_data.exists():
self.db_path = _user_data / "sessions.db"
else:
self.db_path = self.root / "context-storage" / "sessions.db"
# Known plan locations
self.plan_dirs = [
self.root / "internal" / "project" / "plans",
self.root.parent.parent.parent / "docs" / "project-management",
]
# Context switch signal patterns
self.switch_patterns = [
r'\b(?:now\s+)?let\'?s?\s+(?:work\s+on|switch\s+to|move\s+to)\b',
r'\b(?:different|another|new)\s+(?:project|task|feature)\b',
r'\b(?:back\s+to|return\s+to)\b',
r'\b(?:regarding|about|for)\s+the\s+\w+\s+(?:project|feature|module)\b',
r'\bcan\s+(?:we|you)\s+(?:work\s+on|help\s+with)\b',
]
# Project/module name patterns
self.project_patterns = [
r'\b(?:the\s+)?(\w+[-_]?\w*)\s+(?:project|module|submodule|feature|component)\b',
r'\bsubmodules?/\w+/(\w+[-_]?\w*)\b', # submodules/category/name
r'\b(?:in|on|for)\s+(\w+[-_]?\w+)\b',
]
def get_db_connection(self) -> Optional[sqlite3.Connection]:
"""Get database connection if available."""
if not self.db_path.exists():
return None
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def find_all_plans(self) -> List[Dict[str, Any]]:
"""Find all plan files across known locations."""
plans = []
for plan_dir in self.plan_dirs:
if not plan_dir.exists():
continue
for plan_file in plan_dir.glob("*.md"):
# Skip non-plan files
name_lower = plan_file.name.lower()
if any(skip in name_lower for skip in ['readme', 'claude', 'index']):
continue
# Extract plan metadata
content = plan_file.read_text()[:2000] # First 2KB for analysis
# Count tasks
task_count = len(re.findall(r'^-\s*\[[ xX]\]', content, re.MULTILINE))
plans.append({
'path': str(plan_file),
'name': plan_file.stem,
'directory': plan_file.parent.name,
'task_count': task_count,
'content_preview': content[:500],
})
return plans
def find_matching_plans(self, query: str) -> List[Dict[str, Any]]:
"""Find plans that match a query string."""
all_plans = self.find_all_plans()
matches = []
query_lower = query.lower()
query_words = set(re.findall(r'\w+', query_lower))
for plan in all_plans:
score = 0.0
plan_name_lower = plan['name'].lower()
plan_words = set(re.findall(r'\w+', plan_name_lower))
# Exact name match
if query_lower in plan_name_lower:
score += 0.8
# Word overlap
common_words = query_words & plan_words
if common_words:
score += 0.3 * len(common_words) / max(len(query_words), 1)
# Fuzzy match on name
ratio = SequenceMatcher(None, query_lower, plan_name_lower).ratio()
score += ratio * 0.3
# Content match
if query_lower in plan.get('content_preview', '').lower():
score += 0.2
if score > 0.3:
plan['match_score'] = round(score, 2)
matches.append(plan)
# Sort by score
matches.sort(key=lambda x: x['match_score'], reverse=True)
return matches[:5] # Top 5
def find_matching_tasks(self, query: str) -> List[Dict[str, Any]]:
"""Find tasks in database that match query."""
conn = self.get_db_connection()
if not conn:
return []
# Check if task_tracking table exists
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='task_tracking'
""")
if not cursor.fetchone():
conn.close()
return []
# Search tasks
query_pattern = f"%{query}%"
cursor = conn.execute("""
SELECT task_id, task_description, status
FROM task_tracking
WHERE task_description LIKE ?
OR task_id LIKE ?
ORDER BY
CASE WHEN status = 'in_progress' THEN 0
WHEN status = 'pending' THEN 1
ELSE 2 END
LIMIT 10
""", (query_pattern, query_pattern))
tasks = [dict(row) for row in cursor.fetchall()]
conn.close()
return tasks
def detect_context_switch(self, message: str) -> Tuple[bool, float, Optional[str]]:
"""Detect if message indicates a context switch.
Returns: (is_switch, confidence, detected_project)
"""
message_lower = message.lower()
# Check for switch signal patterns
switch_score = 0.0
for pattern in self.switch_patterns:
if re.search(pattern, message_lower):
switch_score += 0.3
# Extract potential project name
detected_project = None
for pattern in self.project_patterns:
match = re.search(pattern, message_lower)
if match:
detected_project = match.group(1)
switch_score += 0.2
break
# Submodule path detection
submodule_match = re.search(r'submodules/(\w+)/(\w+[-_]?\w*)', message_lower)
if submodule_match:
detected_project = submodule_match.group(2)
switch_score += 0.3
is_switch = switch_score >= 0.3
confidence = min(switch_score, 1.0)
return is_switch, confidence, detected_project
def count_plans_in_category(self, category: str) -> int:
"""Count how many plans exist in a category."""
count = 0
for plan_dir in self.plan_dirs:
if plan_dir.exists():
for plan_file in plan_dir.glob("*.md"):
if category.lower() in plan_file.name.lower():
count += 1
return count
def analyze_intent(
self,
user_message: str,
current_plan_id: Optional[str] = None
) -> ContextAnalysisResult:
"""Analyze user intent and detect context switches.
This is the main entry point for intent analysis.
"""
result = ContextAnalysisResult()
# 1. Detect context switch
is_switch, confidence, detected_project = self.detect_context_switch(user_message)
result.is_context_switch = is_switch
result.confidence = confidence
result.detected_project = detected_project
if not is_switch:
return result
# 2. Search for matching plans
if detected_project:
result.matching_plans = self.find_matching_plans(detected_project)
result.matching_tasks = self.find_matching_tasks(detected_project)
# 3. Determine if clarification needed
if result.matching_plans:
if len(result.matching_plans) == 1:
# Single clear match
result.needs_clarification = False
result.suggested_action = 'use_existing'
else:
# Multiple matches - ask user
result.needs_clarification = True
plan_names = [p['name'] for p in result.matching_plans[:3]]
result.clarification_prompt = (
f"I found multiple plans that might match:\n"
f"{chr(10).join(f' - {name}' for name in plan_names)}\n\n"
f"Which plan should we use?"
)
else:
# No matching plans found
result.needs_clarification = True
# Check for proliferation
if detected_project:
similar_count = self.count_plans_in_category(detected_project[:4]) # First 4 chars
if similar_count >= 2:
result.clarification_prompt = (
f"I don't see an existing plan for '{detected_project}'.\n"
f"Note: There are already {similar_count} similar plans.\n\n"
f"Should I:\n"
f" a) Add tasks to the current plan ({current_plan_id or 'PILOT'})?\n"
f" b) Create a new plan for '{detected_project}'?\n"
f" c) Search for an existing plan to add to?"
)
else:
result.clarification_prompt = (
f"I don't see an existing plan for '{detected_project}'.\n\n"
f"Should I:\n"
f" a) Add tasks to the current plan ({current_plan_id or 'PILOT'})?\n"
f" b) Create a new plan for '{detected_project}'?"
)
else:
result.clarification_prompt = (
"This seems like a different context. "
"Could you clarify which project or plan this is for?"
)
return result
def get_current_context_summary(self) -> Dict[str, Any]:
"""Get summary of current context state."""
plans = self.find_all_plans()
conn = self.get_db_connection()
task_stats = {'total': 0, 'completed': 0, 'in_progress': 0, 'pending': 0}
if conn:
try:
cursor = conn.execute("""
SELECT status, COUNT(*) as count
FROM task_tracking
GROUP BY status
""")
for row in cursor.fetchall():
task_stats[row['status']] = row['count']
task_stats['total'] += row['count']
except:
pass
conn.close()
return {
'total_plans': len(plans),
'plans': [{'name': p['name'], 'tasks': p['task_count']} for p in plans],
'task_stats': task_stats,
}
def main(): """CLI for testing intent analysis.""" import sys
if len(sys.argv) < 2:
print("Usage: python3 intent_context_manager.py '<user message>'")
print("\nExamples:")
print(" python3 intent_context_manager.py 'now lets work on the pricing research'")
print(" python3 intent_context_manager.py 'can you help with the frontend module'")
sys.exit(1)
message = sys.argv[1]
root = Path(__file__).parent.parent.parent # Assumes scripts/core/
mgr = IntentContextManager(root)
result = mgr.analyze_intent(message)
print("\n=== Intent Analysis ===\n")
print(f"Input: {message}")
print(f"\nContext Switch Detected: {result.is_context_switch}")
print(f"Confidence: {result.confidence:.1%}")
if result.detected_project:
print(f"Detected Project: {result.detected_project}")
if result.matching_plans:
print(f"\nMatching Plans:")
for plan in result.matching_plans:
print(f" - {plan['name']} (score: {plan['match_score']}, tasks: {plan['task_count']})")
if result.matching_tasks:
print(f"\nMatching Tasks:")
for task in result.matching_tasks[:5]:
print(f" - [{task['task_id']}] {task['task_description'][:50]}... ({task['status']})")
if result.needs_clarification:
print(f"\n--- CLARIFICATION NEEDED ---")
print(result.clarification_prompt)
if name == "main": main()