scripts-trajectory-logging
#!/usr/bin/env python3 """ CODITECT Trajectory Visualization System (ADR-079)
⚠️ DEPRECATED (2026-01-23): This file-based logger has been replaced by database storage.
Trajectory data is now:
- Extracted via /cx pipeline (trajectory_extractor.py)
- Stored in sessions.db:tool_analytics table (ADR-118 Tier 3)
- Deduplicated via trajectory_hash
The trajectories/ folder has been removed (was 7,956 files, 36MB). See ADR-079 for migration details.
Original description (historical): Implements RLM-inspired trajectory logging for debugging agent execution flows, reasoning chains, and skill invocations.
Key Features:
- JSONL format for trajectories (one event per line)
- Thread-safe singleton logger
- Rich event types for tools, skills, agents, iterations
- Automatic stats tracking
- Truncation for large outputs
Based on analysis of submodules/rlm/rlm/logger/. """
from dataclasses import dataclass, field, asdict from typing import List, Optional, Dict, Any, Literal from datetime import datetime from pathlib import Path import json import re import uuid from threading import Lock, Thread from queue import Queue, Empty import atexit
Secret redaction patterns (P1 recommendation from MoE judges)
SECRET_PATTERNS = [ # API Keys (key=value format) (r'(?i)(api[-]?key|apikey)["\s:=]+["']?([a-zA-Z0-9-]{20,})["']?', r'\1="[REDACTED]"'), (r'(?i)(secret[-]?key|secretkey)["\s:=]+["']?([a-zA-Z0-9-]{20,})["']?', r'\1="[REDACTED]"'), (r'(?i)(access[-]?token|accesstoken)["\s:=]+["']?([a-zA-Z0-9-]{20,})["']?', r'\1="[REDACTED]"'), (r'(?i)(auth[-]?token|authtoken)["\s:=]+["']?([a-zA-Z0-9-]{20,})["']?', r'\1="[REDACTED]"'), # AWS (r'(?i)(aws[-]?access[-]?key[-]?id)["\s:=]+["']?([A-Z0-9]{20})["']?', r'\1="[REDACTED]"'), (r'(?i)(aws[-]?secret[-]?access[-]?key)["\s:=]+["']?([a-zA-Z0-9/+=]{40})["']?', r'\1="[REDACTED]"'), # Generic patterns (r'(?i)(password|passwd|pwd)["\s:=]+["']?([^\s"']{8,})["']?', r'\1="[REDACTED]"'), (r'(?i)(bearer\s+)([a-zA-Z0-9_-.]{20,})', r'\1[REDACTED]'), # GitHub/GitLab tokens - match anywhere (r'(ghp_[a-zA-Z0-9]{36})', '[REDACTED_GITHUB_TOKEN]'), (r'(gho_[a-zA-Z0-9]{36})', '[REDACTED_GITHUB_TOKEN]'), (r'(glpat-[a-zA-Z0-9-]{20,})', '[REDACTED_GITLAB_TOKEN]'), # Anthropic keys - match the full sk-ant pattern first (more specific) (r'(sk-ant-[a-zA-Z0-9-]{20,})', '[REDACTED_ANTHROPIC_KEY]'), # OpenAI project keys (sk-proj-) (r'(sk-proj-[a-zA-Z0-9-]{20,})', '[REDACTED_OPENAI_KEY]'), # Generic sk- API keys (OpenAI, etc.) - less specific, match last (r'(sk-[a-zA-Z0-9]{32,})', '[REDACTED_API_KEY]'), # Private keys (r'-----BEGIN [A-Z ]+ PRIVATE KEY-----[\s\S]*?-----END [A-Z ]+ PRIVATE KEY-----', '[REDACTED_PRIVATE_KEY]'), # Connection strings (r'(?i)(mongodb(+srv)?://)[^\s]+@', r'\1[REDACTED]@'), (r'(?i)(postgres(ql)?://)[^\s]+@', r'\1[REDACTED]@'), (r'(?i)(mysql://)[^\s]+@', r'\1[REDACTED]@'), (r'(?i)(redis://)[^\s]+@', r'\1[REDACTED]@'), ]
Sensitive key names (used by redact_dict to detect sensitive fields by key name)
SENSITIVE_KEYS = { 'password', 'passwd', 'pwd', 'secret', 'token', 'api_key', 'apikey', 'access_token', 'auth_token', 'private_key', 'secret_key', 'credentials', 'authorization', 'bearer', 'api-key', 'auth-token', 'access-token' }
Compiled patterns for performance
_COMPILED_PATTERNS = [(re.compile(pattern), repl) for pattern, repl in SECRET_PATTERNS]
def redact_secrets(text: str) -> str: """ Redact sensitive information from text.
Applies regex patterns to replace API keys, passwords, tokens,
and other secrets with [REDACTED] placeholders.
Args:
text: Input text that may contain secrets
Returns:
Text with secrets redacted
"""
if not text:
return text
result = text
for pattern, replacement in _COMPILED_PATTERNS:
result = pattern.sub(replacement, result)
return result
def is_sensitive_key(key: str) -> bool: """Check if a key name indicates sensitive data.""" key_lower = key.lower().replace('-', '') return key_lower in SENSITIVE_KEYS
def redact_dict(data: Dict[str, Any]) -> Dict[str, Any]: """ Recursively redact secrets from a dictionary.
Checks both:
1. Values containing secret patterns (via redact_secrets)
2. Keys that indicate sensitive fields (via SENSITIVE_KEYS)
Args:
data: Dictionary that may contain secrets
Returns:
Dictionary with secrets redacted
"""
if not data:
return data
result = {}
for key, value in data.items():
# Check if the key name indicates sensitive data
if _is_sensitive_key(key) and isinstance(value, str) and len(value) >= 8:
result[key] = "[REDACTED]"
elif isinstance(value, str):
result[key] = redact_secrets(value)
elif isinstance(value, dict):
result[key] = redact_dict(value)
elif isinstance(value, list):
result[key] = [
redact_secrets(v) if isinstance(v, str)
else redact_dict(v) if isinstance(v, dict)
else v
for v in value
]
else:
result[key] = value
return result
Event type literals
EventType = Literal[ "metadata", "tool_call", "skill_invoke", "agent_dispatch", "agent_complete", "iteration", "error", "final" ]
@dataclass class TrajectoryEvent: """ Base class for trajectory events.
All events have:
- type: Event type identifier
- timestamp: ISO format timestamp (auto-generated)
- task_id: Optional PILOT plan task ID
"""
type: EventType
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
task_id: Optional[str] = None
def to_jsonl(self) -> str:
"""Serialize event to JSONL format (single line JSON)."""
return json.dumps(asdict(self), default=str)
@dataclass class MetadataEvent(TrajectoryEvent): """ Session metadata event (first line of trajectory).
Contains session-level information for context.
"""
type: EventType = "metadata"
session_id: str = ""
started_at: str = ""
model: str = ""
project_root: str = ""
coditect_version: str = ""
@dataclass class ToolCallEvent(TrajectoryEvent): """ Tool invocation event.
Captures tool name, inputs, outputs, timing, and success status.
"""
type: EventType = "tool_call"
tool_name: str = ""
tool_input: Dict[str, Any] = field(default_factory=dict)
output: str = ""
duration_ms: float = 0
success: bool = True
error: Optional[str] = None
@dataclass class SkillInvokeEvent(TrajectoryEvent): """ Skill invocation event.
Captures skill name, arguments, result, and timing.
"""
type: EventType = "skill_invoke"
skill_name: str = ""
args: Optional[str] = None
result: str = ""
duration_ms: float = 0
@dataclass class AgentDispatchEvent(TrajectoryEvent): """ Agent dispatch event.
Captures when a subagent is dispatched via Task tool.
"""
type: EventType = "agent_dispatch"
agent_type: str = ""
prompt_preview: str = "" # First 500 chars of prompt
depth: int = 0
parent_task_id: Optional[str] = None
@dataclass class AgentCompleteEvent(TrajectoryEvent): """ Agent completion event.
Captures when a subagent completes, with results and stats.
"""
type: EventType = "agent_complete"
agent_type: str = ""
result_preview: str = "" # First 500 chars of result
duration_ms: float = 0
child_calls: int = 0
usage: Dict[str, int] = field(default_factory=dict)
@dataclass class IterationEvent(TrajectoryEvent): """ Model iteration event (reasoning step).
Captures prompt/response previews and tool call counts.
"""
type: EventType = "iteration"
iteration: int = 0
prompt_preview: str = ""
response_preview: str = ""
thinking: Optional[str] = None # Extended thinking if captured
tool_calls_in_response: int = 0
@dataclass class ErrorEvent(TrajectoryEvent): """ Error event.
Captures error details for debugging.
"""
type: EventType = "error"
error_type: str = ""
error_message: str = ""
traceback: Optional[str] = None
@dataclass class FinalEvent(TrajectoryEvent): """ Session completion event (last line of trajectory).
Contains accumulated statistics and final status.
"""
type: EventType = "final"
ended_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
total_duration_ms: float = 0
total_tool_calls: int = 0
total_skill_invokes: int = 0
total_agent_dispatches: int = 0
total_tokens: int = 0
final_status: str = "completed"
class TrajectoryLogger: """ Thread-safe singleton logger for execution trajectories.
Writes events to JSONL file with automatic stats tracking.
Usage:
logger = TrajectoryLogger()
logger.start_session("session-123", output_dir)
logger.log_tool_call("Bash", {"command": "ls"}, "output", 100)
logger.log_skill_invoke("tdd-patterns", "result", 500)
logger.log_agent_dispatch("backend", "prompt", 1, "A.1.1")
logger.end_session(total_duration_ms=60000, total_tokens=10000)
"""
_instance: Optional["TrajectoryLogger"] = None
_lock: Lock = Lock()
def __new__(cls):
"""Singleton pattern with double-checked locking."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
instance = super().__new__(cls)
instance._file = None
instance._session_id = None
instance._output_dir = None
instance._write_lock = Lock()
instance._stats = {
"tool_calls": 0,
"skill_invokes": 0,
"agent_dispatches": 0
}
cls._instance = instance
return cls._instance
def start_session(
self,
session_id: str,
output_dir: Optional[Path] = None,
model: str = "",
project_root: str = ""
) -> None:
"""
Start trajectory logging for a session.
Args:
session_id: Unique session identifier
output_dir: Directory for trajectory file (default: ~/.coditect/trajectories/)
model: Model name (e.g., "claude-opus-4.5")
project_root: Project root path
"""
self._session_id = session_id
# ADR-114 & ADR-118: Trajectories are now in sessions.db (Tier 3)
# This file-based approach is DEPRECATED - see docstring
if output_dir is None:
_user_data = Path.home() / "PROJECTS" / ".coditect-data"
output_dir = _user_data / "trajectories" if _user_data.exists() else Path.home() / ".coditect" / "trajectories"
self._output_dir = Path(output_dir)
self._output_dir.mkdir(parents=True, exist_ok=True)
# Open file for writing
filepath = self._output_dir / f"trajectory-{session_id}.jsonl"
self._file = open(filepath, 'w')
# Reset stats
self._stats = {
"tool_calls": 0,
"skill_invokes": 0,
"agent_dispatches": 0
}
# Write metadata as first line
metadata = MetadataEvent(
session_id=session_id,
started_at=datetime.utcnow().isoformat(),
model=model,
project_root=project_root,
coditect_version="2.8.0"
)
self._write(metadata)
def log_tool_call(
self,
tool_name: str,
tool_input: Dict[str, Any],
output: str,
duration_ms: float,
task_id: Optional[str] = None,
success: bool = True,
error: Optional[str] = None
) -> None:
"""
Log a tool call event.
Args:
tool_name: Name of the tool (e.g., "Bash", "Edit")
tool_input: Tool input parameters
output: Tool output (truncated to 5000 chars)
duration_ms: Execution duration in milliseconds
task_id: Optional PILOT plan task ID
success: Whether the call succeeded
error: Error message if failed
"""
with self._write_lock:
self._stats["tool_calls"] += 1
# Apply secret redaction (P1 MoE recommendation)
redacted_input = redact_dict(tool_input) if tool_input else {}
redacted_output = redact_secrets(output[:5000] if output else "")
redacted_error = redact_secrets(error) if error else None
event = ToolCallEvent(
tool_name=tool_name,
tool_input=redacted_input,
output=redacted_output,
duration_ms=duration_ms,
task_id=task_id,
success=success,
error=redacted_error
)
self._write(event)
def log_skill_invoke(
self,
skill_name: str,
result: str,
duration_ms: float,
task_id: Optional[str] = None,
args: Optional[str] = None
) -> None:
"""
Log a skill invocation event.
Args:
skill_name: Name of the skill
result: Skill result (truncated to 5000 chars)
duration_ms: Execution duration in milliseconds
task_id: Optional PILOT plan task ID
args: Optional skill arguments
"""
with self._write_lock:
self._stats["skill_invokes"] += 1
# Apply secret redaction (P1 MoE recommendation)
redacted_args = redact_secrets(args) if args else None
redacted_result = redact_secrets(result[:5000] if result else "")
event = SkillInvokeEvent(
skill_name=skill_name,
args=redacted_args,
result=redacted_result,
duration_ms=duration_ms,
task_id=task_id
)
self._write(event)
def log_agent_dispatch(
self,
agent_type: str,
prompt: str,
depth: int,
task_id: str,
parent_task_id: Optional[str] = None
) -> None:
"""
Log an agent dispatch event.
Args:
agent_type: Type of subagent being dispatched
prompt: Prompt for the agent (preview: first 500 chars)
depth: Recursion depth
task_id: Task ID for this agent
parent_task_id: Parent task ID if nested
"""
with self._write_lock:
self._stats["agent_dispatches"] += 1
# Apply secret redaction (P1 MoE recommendation)
redacted_prompt = redact_secrets(prompt[:500] if prompt else "")
event = AgentDispatchEvent(
agent_type=agent_type,
prompt_preview=redacted_prompt,
depth=depth,
task_id=task_id,
parent_task_id=parent_task_id
)
self._write(event)
def log_agent_complete(
self,
agent_type: str,
result: str,
duration_ms: float,
task_id: str,
child_calls: int = 0,
usage: Optional[Dict[str, int]] = None
) -> None:
"""
Log an agent completion event.
Args:
agent_type: Type of subagent that completed
result: Result from the agent (preview: first 500 chars)
duration_ms: Total duration in milliseconds
task_id: Task ID for this agent
child_calls: Number of child agent calls made
usage: Token usage dictionary
"""
# Apply secret redaction (P1 MoE recommendation)
redacted_result = redact_secrets(result[:500] if result else "")
event = AgentCompleteEvent(
agent_type=agent_type,
result_preview=redacted_result,
duration_ms=duration_ms,
task_id=task_id,
child_calls=child_calls,
usage=usage or {}
)
self._write(event)
def log_iteration(
self,
iteration: int,
prompt: str,
response: str,
task_id: Optional[str] = None,
thinking: Optional[str] = None,
tool_calls: int = 0
) -> None:
"""
Log a model iteration event.
Args:
iteration: Iteration number
prompt: Prompt (preview: first 500 chars)
response: Response (preview: first 500 chars)
task_id: Optional task ID
thinking: Extended thinking content if captured
tool_calls: Number of tool calls in this response
"""
# Apply secret redaction (P1 MoE recommendation)
redacted_prompt = redact_secrets(prompt[:500] if prompt else "")
redacted_response = redact_secrets(response[:500] if response else "")
redacted_thinking = redact_secrets(thinking[:1000]) if thinking else None
event = IterationEvent(
iteration=iteration,
prompt_preview=redacted_prompt,
response_preview=redacted_response,
task_id=task_id,
thinking=redacted_thinking,
tool_calls_in_response=tool_calls
)
self._write(event)
def log_error(
self,
error_type: str,
error_message: str,
task_id: Optional[str] = None,
traceback: Optional[str] = None
) -> None:
"""
Log an error event.
Args:
error_type: Type of error (e.g., "ValidationError")
error_message: Error message
task_id: Optional task ID
traceback: Optional traceback string
"""
# Apply secret redaction (P1 MoE recommendation)
redacted_message = redact_secrets(error_message)
redacted_traceback = redact_secrets(traceback) if traceback else None
event = ErrorEvent(
error_type=error_type,
error_message=redacted_message,
task_id=task_id,
traceback=redacted_traceback
)
self._write(event)
def end_session(
self,
total_duration_ms: float,
total_tokens: int = 0,
status: str = "completed"
) -> None:
"""
End trajectory logging and write final event.
Args:
total_duration_ms: Total session duration in milliseconds
total_tokens: Total tokens used
status: Final status (e.g., "completed", "failed", "interrupted")
"""
event = FinalEvent(
total_duration_ms=total_duration_ms,
total_tool_calls=self._stats["tool_calls"],
total_skill_invokes=self._stats["skill_invokes"],
total_agent_dispatches=self._stats["agent_dispatches"],
total_tokens=total_tokens,
final_status=status
)
self._write(event)
# Close file
if self._file:
self._file.close()
self._file = None
def _write(self, event: TrajectoryEvent) -> None:
"""Write event to JSONL file (thread-safe)."""
if self._file:
with self._write_lock:
self._file.write(event.to_jsonl() + '\n')
self._file.flush()
def get_trajectory_path(self) -> Optional[Path]:
"""Get path to current trajectory file."""
if self._session_id and self._output_dir:
return self._output_dir / f"trajectory-{self._session_id}.jsonl"
return None
@property
def stats(self) -> Dict[str, int]:
"""Get current stats (read-only copy)."""
return self._stats.copy()
class AsyncTrajectoryWriter: """ Asynchronous trajectory writer for high-volume scenarios (P3 MoE recommendation).
Uses a background thread and queue to avoid blocking the main thread
during file I/O operations.
Usage:
writer = AsyncTrajectoryWriter(filepath)
writer.start()
writer.write(event) # Non-blocking
writer.stop() # Flush and close
"""
def __init__(self, filepath: Path, queue_size: int = 1000):
"""
Initialize async writer.
Args:
filepath: Path to output file
queue_size: Maximum queue size (blocks if full)
"""
self._filepath = filepath
self._queue: Queue = Queue(maxsize=queue_size)
self._thread: Optional[Thread] = None
self._running = False
self._file = None
self._write_count = 0
self._drop_count = 0
def start(self) -> None:
"""Start the background writer thread."""
if self._running:
return
self._file = open(self._filepath, 'w')
self._running = True
self._thread = Thread(target=self._writer_loop, daemon=True)
self._thread.start()
# Register cleanup on exit
atexit.register(self.stop)
def _writer_loop(self) -> None:
"""Background thread that processes the write queue."""
while self._running or not self._queue.empty():
try:
event = self._queue.get(timeout=0.1)
if event is None: # Sentinel to stop
break
if self._file:
self._file.write(event.to_jsonl() + '\n')
self._write_count += 1
# Flush periodically (every 10 writes)
if self._write_count % 10 == 0:
self._file.flush()
self._queue.task_done()
except Empty:
continue
except Exception:
pass # Ignore write errors in background thread
# Final flush
if self._file:
self._file.flush()
self._file.close()
self._file = None
def write(self, event: "TrajectoryEvent", block: bool = False) -> bool:
"""
Queue an event for writing.
Args:
event: TrajectoryEvent to write
block: If True, block when queue is full; if False, drop event
Returns:
True if queued successfully, False if dropped
"""
if not self._running:
return False
try:
self._queue.put(event, block=block, timeout=0.1 if block else None)
return True
except Exception:
self._drop_count += 1
return False
def stop(self, timeout: float = 5.0) -> None:
"""
Stop the writer and flush remaining events.
Args:
timeout: Maximum time to wait for queue to drain
"""
if not self._running:
return
self._running = False
# Send sentinel to stop thread
try:
self._queue.put(None, timeout=0.1)
except Exception:
pass
# Wait for thread to finish
if self._thread and self._thread.is_alive():
self._thread.join(timeout=timeout)
# Unregister atexit handler
try:
atexit.unregister(self.stop)
except Exception:
pass
@property
def stats(self) -> Dict[str, int]:
"""Get writer statistics."""
return {
"queued": self._queue.qsize(),
"written": self._write_count,
"dropped": self._drop_count
}
@property
def is_running(self) -> bool:
"""Check if writer is running."""
return self._running
Convenience functions
def get_logger() -> TrajectoryLogger: """Get the singleton TrajectoryLogger instance.""" return TrajectoryLogger()
def create_session_id() -> str: """Create a unique session ID.""" return f"{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}"
def load_trajectory(filepath: Path) -> List[Dict[str, Any]]: """ Load a trajectory file and return list of events.
Args:
filepath: Path to trajectory JSONL file
Returns:
List of event dictionaries
"""
events = []
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line:
events.append(json.loads(line))
return events
def get_trajectory_summary(filepath: Path) -> Dict[str, Any]: """ Get summary of a trajectory file.
Args:
filepath: Path to trajectory JSONL file
Returns:
Summary dictionary with metadata and stats
"""
events = load_trajectory(filepath)
metadata = None
final = None
for event in events:
if event["type"] == "metadata":
metadata = event
elif event["type"] == "final":
final = event
return {
"session_id": metadata.get("session_id") if metadata else None,
"model": metadata.get("model") if metadata else None,
"started_at": metadata.get("started_at") if metadata else None,
"ended_at": final.get("ended_at") if final else None,
"duration_ms": final.get("total_duration_ms") if final else None,
"total_events": len(events),
"tool_calls": final.get("total_tool_calls") if final else 0,
"skill_invokes": final.get("total_skill_invokes") if final else 0,
"agent_dispatches": final.get("total_agent_dispatches") if final else 0,
"total_tokens": final.get("total_tokens") if final else 0,
"status": final.get("final_status") if final else "unknown"
}
def list_trajectories( output_dir: Optional[Path] = None, limit: int = 20 ) -> List[Dict[str, Any]]: """ List recent trajectory files with summaries.
Args:
output_dir: Directory to search (default: ~/.coditect/trajectories/)
limit: Maximum number to return
Returns:
List of trajectory summaries
"""
if output_dir is None:
output_dir = Path.home() / ".coditect" / "trajectories"
if not output_dir.exists():
return []
trajectories = []
files = sorted(output_dir.glob("trajectory-*.jsonl"), reverse=True)
for filepath in files[:limit]:
try:
summary = get_trajectory_summary(filepath)
summary["filepath"] = str(filepath)
trajectories.append(summary)
except Exception:
pass # Skip corrupted files
return trajectories
File rotation/cleanup configuration (P2 MoE recommendation)
DEFAULT_MAX_TRAJECTORY_FILES = 100 # Keep at most this many files DEFAULT_MAX_TRAJECTORY_AGE_DAYS = 30 # Delete files older than this DEFAULT_MAX_TRAJECTORY_SIZE_MB = 500 # Maximum total size
def cleanup_old_trajectories( output_dir: Optional[Path] = None, max_files: int = DEFAULT_MAX_TRAJECTORY_FILES, max_age_days: int = DEFAULT_MAX_TRAJECTORY_AGE_DAYS, max_total_size_mb: float = DEFAULT_MAX_TRAJECTORY_SIZE_MB, dry_run: bool = False ) -> Dict[str, Any]: """ Clean up old trajectory files based on configurable policies.
Implements P2 MoE recommendation for file rotation and cleanup.
Cleanup policies (applied in order):
1. Delete files older than max_age_days
2. Delete oldest files if count exceeds max_files
3. Delete oldest files if total size exceeds max_total_size_mb
Args:
output_dir: Directory to clean (default: ~/.coditect/trajectories/)
max_files: Maximum number of files to keep
max_age_days: Delete files older than this many days
max_total_size_mb: Maximum total size in MB
dry_run: If True, report what would be deleted without deleting
Returns:
Dictionary with cleanup statistics
"""
import os
from datetime import datetime, timedelta
if output_dir is None:
output_dir = Path.home() / ".coditect" / "trajectories"
if not output_dir.exists():
return {"deleted": 0, "deleted_size_mb": 0, "remaining": 0}
# Get all trajectory files with stats
files = []
for filepath in output_dir.glob("trajectory-*.jsonl"):
try:
stat = filepath.stat()
files.append({
"path": filepath,
"size": stat.st_size,
"mtime": datetime.fromtimestamp(stat.st_mtime)
})
except OSError:
pass
# Sort by modification time (oldest first)
files.sort(key=lambda f: f["mtime"])
deleted_files = []
now = datetime.now()
cutoff_date = now - timedelta(days=max_age_days)
# Policy 1: Delete files older than max_age_days
for f in files[:]:
if f["mtime"] < cutoff_date:
deleted_files.append(f)
files.remove(f)
# Policy 2: Delete oldest files if count exceeds max_files
while len(files) > max_files:
deleted_files.append(files.pop(0))
# Policy 3: Delete oldest files if total size exceeds limit
max_total_bytes = max_total_size_mb * 1024 * 1024
while files:
total_size = sum(f["size"] for f in files)
if total_size <= max_total_bytes:
break
deleted_files.append(files.pop(0))
# Execute deletions (unless dry_run)
deleted_size = sum(f["size"] for f in deleted_files)
if not dry_run:
for f in deleted_files:
try:
f["path"].unlink()
except OSError:
pass
return {
"deleted": len(deleted_files),
"deleted_size_mb": round(deleted_size / (1024 * 1024), 2),
"remaining": len(files),
"remaining_size_mb": round(sum(f["size"] for f in files) / (1024 * 1024), 2),
"deleted_files": [str(f["path"]) for f in deleted_files] if dry_run else [],
"dry_run": dry_run
}
def get_trajectory_stats(output_dir: Optional[Path] = None) -> Dict[str, Any]: """ Get statistics about trajectory files.
Args:
output_dir: Directory to analyze (default: ~/.coditect/trajectories/)
Returns:
Dictionary with trajectory statistics
"""
from datetime import datetime
if output_dir is None:
output_dir = Path.home() / ".coditect" / "trajectories"
if not output_dir.exists():
return {
"total_files": 0,
"total_size_mb": 0,
"oldest_file": None,
"newest_file": None
}
files = []
for filepath in output_dir.glob("trajectory-*.jsonl"):
try:
stat = filepath.stat()
files.append({
"path": filepath,
"size": stat.st_size,
"mtime": datetime.fromtimestamp(stat.st_mtime)
})
except OSError:
pass
if not files:
return {
"total_files": 0,
"total_size_mb": 0,
"oldest_file": None,
"newest_file": None
}
files.sort(key=lambda f: f["mtime"])
total_size = sum(f["size"] for f in files)
return {
"total_files": len(files),
"total_size_mb": round(total_size / (1024 * 1024), 2),
"oldest_file": str(files[0]["path"].name),
"oldest_date": files[0]["mtime"].isoformat(),
"newest_file": str(files[-1]["path"].name),
"newest_date": files[-1]["mtime"].isoformat(),
"avg_file_size_kb": round(total_size / len(files) / 1024, 2)
}