#!/usr/bin/env python3 """ ADR-151 Intelligent Context Pruning (J.9.3)
Intelligent pruning of session messages and context based on relevance, importance scoring, age decay, and token budget constraints.
Unlike graph-level pruning (pruning.py), this module operates on session messages in sessions.db, providing smart retention policies for the context memory system.
Strategies: - Importance scoring: Decisions, errors, solutions get high priority - Age-based decay: Older messages decay unless highly important - Relevance scoring: Query-based relevance for active context - Token budget: Prune to fit within token limits - Priority retention: Never prune critical knowledge types
Integration: - J.8.3 SessionLinker: Considers cross-session links when pruning - J.8.4 SessionResumeOptimizer: Feeds into resume context optimization - J.9.2 AutoCheckpointer: Triggers checkpoints before major pruning
Created: 2026-02-04 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.9.3 (Intelligent context pruning) """
import hashlib import json import logging import math import re import sqlite3 from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple
logger = logging.getLogger(name)
=============================================================================
Configuration and Constants
=============================================================================
class MessageType(Enum): """Message importance types for pruning decisions.""" DECISION = "decision" ADR = "adr" ERROR_SOLUTION = "error_solution" SKILL_LEARNING = "skill_learning" POLICY = "policy" CODE_PATTERN = "code_pattern" TOOL_USE = "tool_use" CONVERSATION = "conversation" SYSTEM = "system" UNKNOWN = "unknown"
Type priorities (higher = more important, less likely to prune)
Aligned with pruning.py graph node priorities
TYPE_PRIORITIES: Dict[str, int] = { "decision": 100, "adr": 95, "error_solution": 90, "policy": 85, "skill_learning": 80, "code_pattern": 70, "tool_use": 50, "conversation": 40, "system": 30, "unknown": 25, }
Role priorities
ROLE_PRIORITIES: Dict[str, int] = { "assistant": 60, # AI responses often contain valuable context "user": 50, # User queries provide context "system": 30, # System messages are often boilerplate "tool": 40, # Tool results can be regenerated }
Age decay parameters
AGE_DECAY_HALF_LIFE_DAYS = 7 # Score halves every 7 days AGE_DECAY_MIN_SCORE = 0.1 # Never decay below 10%
@dataclass class PruningConfig: """Configuration for intelligent context pruning."""
# Token budget constraints
token_budget: int = 50000 # Target token budget for context
token_per_char: float = 0.25 # Approximate tokens per character
# Importance thresholds
min_importance_score: float = 0.2 # Below this, message is pruneable
preserve_threshold: float = 0.8 # Above this, never prune
# Age decay settings
age_decay_enabled: bool = True
age_decay_half_life_days: int = AGE_DECAY_HALF_LIFE_DAYS
age_decay_min_score: float = AGE_DECAY_MIN_SCORE
# Type-based retention
always_preserve_types: Set[str] = field(
default_factory=lambda: {"decision", "adr", "error_solution", "policy"}
)
# Session-based retention
preserve_recent_sessions: int = 3 # Always keep last N sessions
preserve_linked_sessions: bool = True # Keep cross-linked sessions
# Dry run mode
dry_run: bool = False
def to_dict(self) -> Dict[str, Any]:
"""Serialize config to dictionary."""
return {
"token_budget": self.token_budget,
"token_per_char": self.token_per_char,
"min_importance_score": self.min_importance_score,
"preserve_threshold": self.preserve_threshold,
"age_decay_enabled": self.age_decay_enabled,
"age_decay_half_life_days": self.age_decay_half_life_days,
"age_decay_min_score": self.age_decay_min_score,
"always_preserve_types": list(self.always_preserve_types),
"preserve_recent_sessions": self.preserve_recent_sessions,
"preserve_linked_sessions": self.preserve_linked_sessions,
"dry_run": self.dry_run,
}
@dataclass class ScoredMessage: """A message with its computed importance score."""
id: int
hash: str
content: str
role: str
session_id: Optional[str]
timestamp: Optional[str]
message_type: MessageType
token_estimate: int
importance_score: float
type_score: float
role_score: float
age_decay: float
relevance_score: float
is_preserved: bool
preserve_reason: Optional[str]
@property
def composite_score(self) -> float:
"""Compute weighted composite score."""
# Weights for different factors
weights = {
"type": 0.35,
"role": 0.15,
"age": 0.20,
"relevance": 0.30,
}
return (
weights["type"] * self.type_score +
weights["role"] * self.role_score +
weights["age"] * self.age_decay +
weights["relevance"] * self.relevance_score
)
@dataclass class PruningResult: """Result of a pruning operation."""
success: bool
messages_analyzed: int
messages_pruned: int
messages_preserved: int
tokens_before: int
tokens_after: int
tokens_freed: int
sessions_affected: List[str]
preserved_types: Dict[str, int]
pruning_summary: str
dry_run: bool
errors: List[str] = field(default_factory=list)
@property
def prune_percentage(self) -> float:
"""Percentage of messages pruned."""
if self.messages_analyzed == 0:
return 0.0
return (self.messages_pruned / self.messages_analyzed) * 100
@property
def token_reduction_percentage(self) -> float:
"""Percentage of tokens freed."""
if self.tokens_before == 0:
return 0.0
return (self.tokens_freed / self.tokens_before) * 100
=============================================================================
Message Type Detection
=============================================================================
def detect_message_type(content: str, role: str) -> MessageType: """ Detect the type of message based on content analysis.
Uses pattern matching to identify high-value message types.
Args:
content: Message content
role: Message role (user, assistant, system, tool)
Returns:
MessageType: Detected type
"""
content_lower = content.lower()
# Decision patterns
decision_patterns = [
r'\bdecision\b.*:',
r'\bdecided\s+to\b',
r'\bchoose\s+to\b',
r'\bselect(?:ed|ing)?\s+approach\b',
r'\bwill\s+use\b.*\binstead\b',
r'\bgoing\s+(?:to|with)\b',
r'\bchose\b',
r'\bprefer\b.*\bover\b',
]
if any(re.search(p, content_lower) for p in decision_patterns):
return MessageType.DECISION
# ADR patterns
adr_patterns = [
r'\bADR-\d+\b',
r'\barchitecture\s+decision\b',
r'\bdecision\s+record\b',
r'## Status\s*\n.*\b(?:Accepted|Proposed|Deprecated)\b',
]
if any(re.search(p, content, re.IGNORECASE) for p in adr_patterns):
return MessageType.ADR
# Error solution patterns
error_patterns = [
r'\berror\b.*\bfix(?:ed)?\b',
r'\bfix(?:ed)?\b.*\berror\b',
r'\bresolved?\b.*\bissue\b',
r'\bissue\b.*\bresolved?\b',
r'\bsolution\b.*\bproblem\b',
r'\bproblem\b.*\bsolution\b',
r'\bworkaround\b',
r'\bdebugg(?:ed|ing)\b',
r'\btroubleshoot\b',
]
if any(re.search(p, content_lower) for p in error_patterns):
return MessageType.ERROR_SOLUTION
# Skill learning patterns
learning_patterns = [
r'\blearned?\b.*\bthat\b',
r'\bnote\s+to\s+self\b',
r'\bremember\b.*\bfor\s+future\b',
r'\bpattern\b.*\bworks?\s+well\b',
r'\bbest\s+practice\b',
r'\banti-?pattern\b',
r'\blesson\b.*\blearned\b',
]
if any(re.search(p, content_lower) for p in learning_patterns):
return MessageType.SKILL_LEARNING
# Policy patterns
policy_patterns = [
r'\bpolicy\b.*:',
r'\brule\b.*:',
r'\bstandard\b.*:',
r'\bconvention\b',
r'\bnaming\s+convention\b',
r'\bcoding\s+standard\b',
r'\bmust\s+always\b',
r'\bnever\s+(?:use|do)\b',
]
if any(re.search(p, content_lower) for p in policy_patterns):
return MessageType.POLICY
# Code pattern detection
code_patterns = [
r'```\w+\n', # Code blocks
r'\bfunction\s+\w+\s*\(',
r'\bdef\s+\w+\s*\(',
r'\bclass\s+\w+\s*[:\(]',
r'\bimport\s+\w+',
r'\bfrom\s+\w+\s+import\b',
]
if any(re.search(p, content) for p in code_patterns):
return MessageType.CODE_PATTERN
# Tool use detection
if role == "tool":
return MessageType.TOOL_USE
# System message detection
if role == "system":
return MessageType.SYSTEM
# Default to conversation
if role in ("user", "assistant"):
return MessageType.CONVERSATION
return MessageType.UNKNOWN
=============================================================================
Importance Scoring
=============================================================================
def compute_type_score(message_type: MessageType) -> float: """Compute normalized type score (0.0-1.0).""" priority = TYPE_PRIORITIES.get(message_type.value, 25) return priority / 100.0
def compute_role_score(role: str) -> float: """Compute normalized role score (0.0-1.0).""" priority = ROLE_PRIORITIES.get(role.lower(), 30) return priority / 100.0
def compute_age_decay( timestamp: Optional[str], half_life_days: int = AGE_DECAY_HALF_LIFE_DAYS, min_score: float = AGE_DECAY_MIN_SCORE, ) -> float: """ Compute age-based decay score using exponential decay.
Score = max(min_score, 2^(-age_days / half_life_days))
Args:
timestamp: ISO format timestamp
half_life_days: Days for score to halve
min_score: Minimum score (never decays below)
Returns:
float: Decay factor (0.0-1.0)
"""
if not timestamp:
return 1.0 # No timestamp = assume current
try:
if 'T' in timestamp:
msg_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
else:
msg_time = datetime.fromisoformat(timestamp)
now = datetime.now(msg_time.tzinfo) if msg_time.tzinfo else datetime.now()
age_days = (now - msg_time).days
if age_days <= 0:
return 1.0
# Exponential decay: 2^(-age/half_life)
decay = math.pow(2, -age_days / half_life_days)
return max(min_score, decay)
except (ValueError, TypeError):
return 1.0
def compute_relevance_score( content: str, query_terms: Optional[List[str]] = None, ) -> float: """ Compute relevance score based on content features.
Without query terms, scores based on content richness.
With query terms, scores based on term matches.
Args:
content: Message content
query_terms: Optional list of search terms
Returns:
float: Relevance score (0.0-1.0)
"""
if not content:
return 0.0
base_score = 0.5
# Length bonus (longer messages often more valuable)
length = len(content)
if length > 500:
base_score += 0.1
if length > 1000:
base_score += 0.1
# Code block bonus
if '```' in content:
base_score += 0.15
# Structure bonus (headers, lists)
if re.search(r'^#+\s', content, re.MULTILINE):
base_score += 0.1
if re.search(r'^\s*[-*]\s', content, re.MULTILINE):
base_score += 0.05
# Query term matching
if query_terms:
content_lower = content.lower()
matches = sum(1 for term in query_terms if term.lower() in content_lower)
if query_terms:
term_score = matches / len(query_terms)
base_score = (base_score * 0.4) + (term_score * 0.6)
return min(1.0, base_score)
def estimate_tokens(content: str, token_per_char: float = 0.25) -> int: """Estimate token count from content length.""" return int(len(content) * token_per_char)
=============================================================================
Context Pruner Class
=============================================================================
class ContextPruner: """ Intelligent context pruner for session messages.
Implements J.9.3 intelligent context pruning with:
- Message importance scoring
- Age-based decay
- Token budget enforcement
- Type-based preservation
"""
def __init__(self, sessions_db: Optional[Path] = None, config: Optional[PruningConfig] = None):
"""
Initialize the context pruner.
Args:
sessions_db: Path to sessions.db (auto-detected if not provided)
config: Pruning configuration
"""
if sessions_db is None:
try:
from scripts.core.paths import get_sessions_db_path
sessions_db = get_sessions_db_path()
except ImportError:
sessions_db = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db"
self.sessions_db = sessions_db
self.config = config or PruningConfig()
def _get_connection(self) -> sqlite3.Connection:
"""Get database connection."""
if not self.sessions_db.exists():
raise FileNotFoundError(f"Sessions database not found: {self.sessions_db}")
return sqlite3.connect(str(self.sessions_db))
def _get_linked_sessions(self, conn: sqlite3.Connection) -> Set[str]:
"""Get session IDs that have cross-session links (J.8.3 integration)."""
linked = set()
try:
# Check if session_links table exists
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='session_links'"
)
if cursor.fetchone():
cursor.execute(
"SELECT DISTINCT source_session_id FROM session_links "
"UNION SELECT DISTINCT target_session_id FROM session_links"
)
linked = {row[0] for row in cursor.fetchall() if row[0]}
except sqlite3.Error:
pass
return linked
def _get_recent_sessions(self, conn: sqlite3.Connection, count: int) -> Set[str]:
"""Get the N most recent session IDs."""
recent = set()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT DISTINCT session_id FROM messages "
"WHERE session_id IS NOT NULL "
"ORDER BY timestamp DESC LIMIT ?",
(count * 100,) # Get more rows to find unique sessions
)
seen = set()
for row in cursor.fetchall():
if row[0] and row[0] not in seen:
seen.add(row[0])
if len(seen) >= count:
break
recent = seen
except sqlite3.Error:
pass
return recent
def score_message(
self,
msg_id: int,
msg_hash: str,
content: str,
role: str,
session_id: Optional[str],
timestamp: Optional[str],
preserved_sessions: Set[str],
query_terms: Optional[List[str]] = None,
) -> ScoredMessage:
"""
Score a single message for pruning decisions.
Args:
msg_id: Message ID
msg_hash: Message hash
content: Message content
role: Message role
session_id: Session ID
timestamp: Message timestamp
preserved_sessions: Set of session IDs to preserve
query_terms: Optional query terms for relevance
Returns:
ScoredMessage: Scored message with all computed scores
"""
# Detect message type
message_type = detect_message_type(content, role)
# Compute individual scores
type_score = compute_type_score(message_type)
role_score = compute_role_score(role)
age_decay = compute_age_decay(
timestamp,
self.config.age_decay_half_life_days,
self.config.age_decay_min_score,
) if self.config.age_decay_enabled else 1.0
relevance_score = compute_relevance_score(content, query_terms)
# Estimate tokens
token_estimate = estimate_tokens(content, self.config.token_per_char)
# Compute composite importance score
importance_score = (
0.35 * type_score +
0.15 * role_score +
0.20 * age_decay +
0.30 * relevance_score
)
# Determine preservation
is_preserved = False
preserve_reason = None
# Check type-based preservation
if message_type.value in self.config.always_preserve_types:
is_preserved = True
preserve_reason = f"Type '{message_type.value}' always preserved"
# Check threshold-based preservation
elif importance_score >= self.config.preserve_threshold:
is_preserved = True
preserve_reason = f"High importance score: {importance_score:.2f}"
# Check session-based preservation
elif session_id and session_id in preserved_sessions:
is_preserved = True
preserve_reason = f"Session preserved (linked or recent)"
return ScoredMessage(
id=msg_id,
hash=msg_hash,
content=content,
role=role,
session_id=session_id,
timestamp=timestamp,
message_type=message_type,
token_estimate=token_estimate,
importance_score=importance_score,
type_score=type_score,
role_score=role_score,
age_decay=age_decay,
relevance_score=relevance_score,
is_preserved=is_preserved,
preserve_reason=preserve_reason,
)
def analyze_messages(
self,
session_id: Optional[str] = None,
query_terms: Optional[List[str]] = None,
limit: int = 10000,
) -> List[ScoredMessage]:
"""
Analyze messages and compute importance scores.
Args:
session_id: Optional session ID to filter by
query_terms: Optional query terms for relevance scoring
limit: Maximum messages to analyze
Returns:
List[ScoredMessage]: Scored messages sorted by importance
"""
conn = self._get_connection()
cursor = conn.cursor()
try:
# Get preserved sessions
preserved_sessions = set()
if self.config.preserve_linked_sessions:
preserved_sessions.update(self._get_linked_sessions(conn))
if self.config.preserve_recent_sessions > 0:
preserved_sessions.update(
self._get_recent_sessions(conn, self.config.preserve_recent_sessions)
)
# Build query
if session_id:
cursor.execute(
"SELECT id, hash, content, role, session_id, timestamp "
"FROM messages WHERE session_id = ? LIMIT ?",
(session_id, limit)
)
else:
cursor.execute(
"SELECT id, hash, content, role, session_id, timestamp "
"FROM messages LIMIT ?",
(limit,)
)
scored_messages = []
for row in cursor.fetchall():
msg_id, msg_hash, content, role, sess_id, timestamp = row
scored = self.score_message(
msg_id, msg_hash, content or "", role or "unknown",
sess_id, timestamp, preserved_sessions, query_terms
)
scored_messages.append(scored)
# Sort by importance score (lowest first for pruning)
scored_messages.sort(key=lambda m: m.importance_score)
return scored_messages
finally:
conn.close()
def prune_to_budget(
self,
session_id: Optional[str] = None,
token_budget: Optional[int] = None,
query_terms: Optional[List[str]] = None,
) -> PruningResult:
"""
Prune messages to fit within token budget.
Args:
session_id: Optional session ID to filter by
token_budget: Token budget (uses config default if not provided)
query_terms: Optional query terms for relevance scoring
Returns:
PruningResult: Result of the pruning operation
"""
token_budget = token_budget or self.config.token_budget
# Analyze all messages
scored_messages = self.analyze_messages(session_id, query_terms)
if not scored_messages:
return PruningResult(
success=True,
messages_analyzed=0,
messages_pruned=0,
messages_preserved=0,
tokens_before=0,
tokens_after=0,
tokens_freed=0,
sessions_affected=[],
preserved_types={},
pruning_summary="No messages to analyze",
dry_run=self.config.dry_run,
)
# Calculate current token usage
tokens_before = sum(m.token_estimate for m in scored_messages)
# Separate preserved and pruneable messages
preserved = [m for m in scored_messages if m.is_preserved]
pruneable = [m for m in scored_messages if not m.is_preserved]
# Sort pruneable by importance (lowest first for removal)
pruneable.sort(key=lambda m: m.importance_score)
# Track what to prune
to_prune = []
tokens_preserved = sum(m.token_estimate for m in preserved)
# Prune lowest importance messages until we're within budget
for msg in pruneable:
if tokens_preserved + sum(m.token_estimate for m in pruneable if m not in to_prune) <= token_budget:
break
if msg.importance_score < self.config.min_importance_score:
to_prune.append(msg)
else:
# Even above threshold, prune if needed to meet budget
to_prune.append(msg)
# Execute pruning (or dry run)
errors = []
if to_prune and not self.config.dry_run:
try:
conn = self._get_connection()
cursor = conn.cursor()
# Create pruned_messages archive table if not exists
cursor.execute("""
CREATE TABLE IF NOT EXISTS pruned_messages (
id INTEGER PRIMARY KEY,
original_id INTEGER,
hash TEXT,
content TEXT,
role TEXT,
session_id TEXT,
timestamp TEXT,
importance_score REAL,
message_type TEXT,
pruned_at TEXT,
prune_reason TEXT
)
""")
# Archive and delete
pruned_at = datetime.utcnow().isoformat() + "Z"
for msg in to_prune:
# Archive
cursor.execute(
"INSERT INTO pruned_messages "
"(original_id, hash, content, role, session_id, timestamp, "
"importance_score, message_type, pruned_at, prune_reason) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
msg.id, msg.hash, msg.content, msg.role, msg.session_id,
msg.timestamp, msg.importance_score, msg.message_type.value,
pruned_at, f"Budget pruning: score={msg.importance_score:.2f}"
)
)
# Delete
cursor.execute("DELETE FROM messages WHERE id = ?", (msg.id,))
conn.commit()
conn.close()
except sqlite3.Error as e:
errors.append(f"Database error: {e}")
# Calculate results
tokens_pruned = sum(m.token_estimate for m in to_prune)
tokens_after = tokens_before - tokens_pruned
# Count preserved types
preserved_types: Dict[str, int] = {}
for msg in preserved:
t = msg.message_type.value
preserved_types[t] = preserved_types.get(t, 0) + 1
# Get affected sessions
sessions_affected = list({m.session_id for m in to_prune if m.session_id})
return PruningResult(
success=len(errors) == 0,
messages_analyzed=len(scored_messages),
messages_pruned=len(to_prune),
messages_preserved=len(preserved) + len(pruneable) - len(to_prune),
tokens_before=tokens_before,
tokens_after=tokens_after,
tokens_freed=tokens_pruned,
sessions_affected=sessions_affected,
preserved_types=preserved_types,
pruning_summary=f"Pruned {len(to_prune)} messages ({tokens_pruned} tokens) "
f"to meet {token_budget} token budget",
dry_run=self.config.dry_run,
errors=errors,
)
def prune_by_age(
self,
max_age_days: int = 30,
preserve_important: bool = True,
) -> PruningResult:
"""
Prune messages older than specified age.
Args:
max_age_days: Maximum age in days
preserve_important: If True, keep high-importance messages regardless of age
Returns:
PruningResult: Result of the pruning operation
"""
scored_messages = self.analyze_messages()
if not scored_messages:
return PruningResult(
success=True,
messages_analyzed=0,
messages_pruned=0,
messages_preserved=0,
tokens_before=0,
tokens_after=0,
tokens_freed=0,
sessions_affected=[],
preserved_types={},
pruning_summary="No messages to analyze",
dry_run=self.config.dry_run,
)
cutoff = datetime.utcnow() - timedelta(days=max_age_days)
cutoff_str = cutoff.isoformat()
tokens_before = sum(m.token_estimate for m in scored_messages)
to_prune = []
for msg in scored_messages:
if not msg.timestamp:
continue
try:
msg_time = msg.timestamp.replace('Z', '')
if msg_time < cutoff_str:
# Old message - check if we should preserve
if preserve_important and msg.is_preserved:
continue
to_prune.append(msg)
except (ValueError, TypeError):
continue
# Execute pruning (reuse logic from prune_to_budget)
errors = []
if to_prune and not self.config.dry_run:
try:
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS pruned_messages (
id INTEGER PRIMARY KEY,
original_id INTEGER,
hash TEXT,
content TEXT,
role TEXT,
session_id TEXT,
timestamp TEXT,
importance_score REAL,
message_type TEXT,
pruned_at TEXT,
prune_reason TEXT
)
""")
pruned_at = datetime.utcnow().isoformat() + "Z"
for msg in to_prune:
cursor.execute(
"INSERT INTO pruned_messages "
"(original_id, hash, content, role, session_id, timestamp, "
"importance_score, message_type, pruned_at, prune_reason) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
msg.id, msg.hash, msg.content, msg.role, msg.session_id,
msg.timestamp, msg.importance_score, msg.message_type.value,
pruned_at, f"Age pruning: older than {max_age_days} days"
)
)
cursor.execute("DELETE FROM messages WHERE id = ?", (msg.id,))
conn.commit()
conn.close()
except sqlite3.Error as e:
errors.append(f"Database error: {e}")
tokens_pruned = sum(m.token_estimate for m in to_prune)
sessions_affected = list({m.session_id for m in to_prune if m.session_id})
return PruningResult(
success=len(errors) == 0,
messages_analyzed=len(scored_messages),
messages_pruned=len(to_prune),
messages_preserved=len(scored_messages) - len(to_prune),
tokens_before=tokens_before,
tokens_after=tokens_before - tokens_pruned,
tokens_freed=tokens_pruned,
sessions_affected=sessions_affected,
preserved_types={},
pruning_summary=f"Pruned {len(to_prune)} messages older than {max_age_days} days",
dry_run=self.config.dry_run,
errors=errors,
)
def get_stats(self) -> Dict[str, Any]:
"""Get pruning statistics and message breakdown."""
scored_messages = self.analyze_messages()
type_counts: Dict[str, int] = {}
type_tokens: Dict[str, int] = {}
role_counts: Dict[str, int] = {}
for msg in scored_messages:
t = msg.message_type.value
type_counts[t] = type_counts.get(t, 0) + 1
type_tokens[t] = type_tokens.get(t, 0) + msg.token_estimate
role_counts[msg.role] = role_counts.get(msg.role, 0) + 1
total_tokens = sum(m.token_estimate for m in scored_messages)
preserved_count = sum(1 for m in scored_messages if m.is_preserved)
pruneable_count = len(scored_messages) - preserved_count
return {
"total_messages": len(scored_messages),
"total_tokens": total_tokens,
"preserved_count": preserved_count,
"pruneable_count": pruneable_count,
"type_breakdown": type_counts,
"type_tokens": type_tokens,
"role_breakdown": role_counts,
"avg_importance_score": (
sum(m.importance_score for m in scored_messages) / len(scored_messages)
if scored_messages else 0
),
}
=============================================================================
Convenience Functions
=============================================================================
def prune_context( token_budget: int = 50000, dry_run: bool = True, session_id: Optional[str] = None, ) -> PruningResult: """ Convenience function to prune context to token budget.
Args:
token_budget: Target token budget
dry_run: If True, don't actually delete messages
session_id: Optional session to filter by
Returns:
PruningResult: Pruning results
"""
config = PruningConfig(token_budget=token_budget, dry_run=dry_run)
pruner = ContextPruner(config=config)
return pruner.prune_to_budget(session_id=session_id)
def prune_old_context( max_age_days: int = 30, dry_run: bool = True, preserve_important: bool = True, ) -> PruningResult: """ Convenience function to prune old context.
Args:
max_age_days: Maximum age in days
dry_run: If True, don't actually delete messages
preserve_important: Keep high-importance messages regardless of age
Returns:
PruningResult: Pruning results
"""
config = PruningConfig(dry_run=dry_run)
pruner = ContextPruner(config=config)
return pruner.prune_by_age(max_age_days, preserve_important)
def get_pruning_stats() -> Dict[str, Any]: """ Convenience function to get pruning statistics.
Returns:
Dict: Message breakdown and statistics
"""
pruner = ContextPruner()
return pruner.get_stats()
def format_pruning_result(result: PruningResult) -> str: """Format pruning result for display.""" lines = [ "=" * 60, "Context Pruning Result", "=" * 60, "", f"Status: {'SUCCESS' if result.success else 'FAILED'}", f"Mode: {'DRY RUN' if result.dry_run else 'EXECUTED'}", "", "Messages:", f" Analyzed: {result.messages_analyzed:,}", f" Pruned: {result.messages_pruned:,} ({result.prune_percentage:.1f}%)", f" Preserved: {result.messages_preserved:,}", "", "Tokens:", f" Before: {result.tokens_before:,}", f" After: {result.tokens_after:,}", f" Freed: {result.tokens_freed:,} ({result.token_reduction_percentage:.1f}%)", "", ]
if result.preserved_types:
lines.append("Preserved by Type:")
for t, count in sorted(result.preserved_types.items(), key=lambda x: -x[1]):
lines.append(f" {t}: {count}")
lines.append("")
if result.sessions_affected:
lines.append(f"Sessions Affected: {len(result.sessions_affected)}")
lines.append("")
lines.append(f"Summary: {result.pruning_summary}")
if result.errors:
lines.append("")
lines.append("Errors:")
for error in result.errors:
lines.append(f" - {error}")
lines.append("=" * 60)
return "\n".join(lines)
def format_pruning_help() -> str: """Return help text for context pruning.""" return """ Context Pruning (J.9.3) - Intelligent Message Retention
OVERVIEW: Intelligently prune session messages based on importance scoring, age decay, and token budget constraints while preserving critical knowledge (decisions, ADRs, error solutions).
IMPORTANCE SCORING: Messages are scored on:
- Type (35%): decisions=1.0, ADR=0.95, error_solution=0.90, etc.
- Role (15%): assistant=0.60, user=0.50, system=0.30
- Age (20%): Exponential decay with 7-day half-life
- Relevance (30%): Content richness, code blocks, structure
PRESERVATION RULES:
- Types 'decision', 'adr', 'error_solution', 'policy' always preserved
- Messages with importance >= 0.8 always preserved
- Last 3 sessions always preserved
- Cross-linked sessions (J.8.3) always preserved
USAGE:
Prune to token budget (dry run)
python -m scripts.context_graph.context_pruner --budget 50000 --dry-run
Prune messages older than 30 days
python -m scripts.context_graph.context_pruner --age 30
Show statistics only
python -m scripts.context_graph.context_pruner --stats
Execute pruning (actually delete)
python -m scripts.context_graph.context_pruner --budget 50000 --execute
OPTIONS: --budget N Token budget to prune to (default: 50000) --age N Prune messages older than N days --session ID Filter to specific session --dry-run Preview only, don't delete (default) --execute Actually delete messages --stats Show statistics only --help Show this help
SAFETY:
- Pruned messages are archived to
pruned_messagestable - Use --dry-run first to preview what would be pruned
- Critical knowledge types are never pruned """
=============================================================================
CLI Interface
=============================================================================
if name == "main": import argparse
parser = argparse.ArgumentParser(
description="J.9.3: Intelligent Context Pruning",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=format_pruning_help(),
)
parser.add_argument(
"--budget", type=int, default=50000,
help="Token budget to prune to (default: 50000)"
)
parser.add_argument(
"--age", type=int,
help="Prune messages older than N days"
)
parser.add_argument(
"--session", type=str,
help="Filter to specific session ID"
)
parser.add_argument(
"--dry-run", action="store_true", default=True,
help="Preview only, don't delete (default)"
)
parser.add_argument(
"--execute", action="store_true",
help="Actually delete messages (overrides --dry-run)"
)
parser.add_argument(
"--stats", action="store_true",
help="Show statistics only"
)
parser.add_argument(
"--json", action="store_true",
help="Output as JSON"
)
args = parser.parse_args()
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(message)s")
# Determine dry_run mode
dry_run = not args.execute
if args.stats:
# Show statistics only
stats = get_pruning_stats()
if args.json:
print(json.dumps(stats, indent=2))
else:
print("Context Pruning Statistics")
print("=" * 40)
print(f"Total Messages: {stats['total_messages']:,}")
print(f"Total Tokens: {stats['total_tokens']:,}")
print(f"Preserved: {stats['preserved_count']:,}")
print(f"Pruneable: {stats['pruneable_count']:,}")
print(f"Avg Importance: {stats['avg_importance_score']:.2f}")
print()
print("Type Breakdown:")
for t, count in sorted(stats['type_breakdown'].items(), key=lambda x: -x[1]):
tokens = stats['type_tokens'].get(t, 0)
print(f" {t}: {count:,} ({tokens:,} tokens)")
elif args.age:
# Age-based pruning
result = prune_old_context(
max_age_days=args.age,
dry_run=dry_run,
)
if args.json:
print(json.dumps({
"success": result.success,
"messages_pruned": result.messages_pruned,
"tokens_freed": result.tokens_freed,
"dry_run": result.dry_run,
}, indent=2))
else:
print(format_pruning_result(result))
else:
# Budget-based pruning
result = prune_context(
token_budget=args.budget,
dry_run=dry_run,
session_id=args.session,
)
if args.json:
print(json.dumps({
"success": result.success,
"messages_analyzed": result.messages_analyzed,
"messages_pruned": result.messages_pruned,
"tokens_before": result.tokens_before,
"tokens_after": result.tokens_after,
"tokens_freed": result.tokens_freed,
"dry_run": result.dry_run,
}, indent=2))
else:
print(format_pruning_result(result))