Skip to main content

#!/usr/bin/env python3 """ Token Status Script (H.8)

Display current context/token usage status and manage pivot strategies.

Usage: python3 token-status.py # Show status python3 token-status.py --detail # Detailed status python3 token-status.py --reset # Reset warnings python3 token-status.py --pivot export # Execute pivot

ADR: ADR-096 Token Limit Pivot Management System """ import argparse import json import sys from datetime import datetime from pathlib import Path

ADR-114: Use centralized path discovery

_script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))

try: from scripts.core.paths import get_context_storage_dir, get_user_data_dir CONTEXT_STORAGE = get_context_storage_dir() except ImportError: get_user_data_dir = None _new_location = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" CONTEXT_STORAGE = _new_location if _new_location.exists() else Path.home() / ".coditect" / "context-storage"

Paths (ADR-114 compliant)

STATE_FILE = CONTEXT_STORAGE / "token-limit-state.json" CONFIG_FILE = Path.home() / ".coditect" / "config" / "token-pivot-config.json" WATCHER_FILE = CONTEXT_STORAGE / "watcher-state.json"

ADR-114: Runtime logs go to user data dir

LOG_FILE = (get_user_data_dir() / "logs" / "token-limit-hook.log") if get_user_data_dir else (Path.home() / ".coditect" / "logs" / "token-limit-hook.log")

def load_state() -> dict: """Load current state.""" if STATE_FILE.exists(): try: return json.loads(STATE_FILE.read_text()) except: pass return {}

def load_config() -> dict: """Load configuration.""" defaults = { "thresholds": {"warning": 0.50, "alert": 0.70, "critical": 0.85, "emergency": 0.95}, "max_context_tokens": 200000 } if CONFIG_FILE.exists(): try: config = json.loads(CONFIG_FILE.read_text()) defaults.update(config) except: pass return defaults

def get_watcher_context() -> float | None: """Get actual context % from codi-watcher.""" if WATCHER_FILE.exists(): try: data = json.loads(WATCHER_FILE.read_text()) if "context_percent" in data: return data["context_percent"] / 100.0 except: pass return None

def get_level(percentage: float, thresholds: dict) -> str: """Determine threshold level.""" if percentage >= thresholds.get("emergency", 0.95): return "EMERGENCY" elif percentage >= thresholds.get("critical", 0.85): return "CRITICAL" elif percentage >= thresholds.get("alert", 0.70): return "ALERT" elif percentage >= thresholds.get("warning", 0.50): return "WARNING" return "NORMAL"

def get_level_icon(level: str) -> str: """Get icon for level.""" icons = { "NORMAL": "✓", "WARNING": "⚪", "ALERT": "ℹ️", "CRITICAL": "⚠️", "EMERGENCY": "🚨" } return icons.get(level, "")

def format_bar(percentage: float, width: int = 20) -> str: """Create progress bar.""" filled = int(percentage * width) empty = width - filled return "█" * filled + "░" * empty

def show_status(detail: bool = False): """Display status.""" state = load_state() config = load_config() thresholds = config.get("thresholds", {})

# Get context percentage
watcher_pct = get_watcher_context()
if watcher_pct is not None:
percentage = watcher_pct
source = "codi-watcher"
else:
percentage = state.get("context_percent", 0)
source = state.get("context_source", "estimated")

tool_calls = state.get("tool_calls", 0)
max_tokens = config.get("max_context_tokens", 200000)
est_tokens = int(percentage * max_tokens)
level = get_level(percentage, thresholds)
icon = get_level_icon(level)

# Display
print("╔" + "═" * 62 + "╗")
print("║" + "TOKEN STATUS (H.8)".center(62) + "║")
print("╠" + "═" * 62 + "╣")
print(f"║ Context Usage: [{format_bar(percentage)}] {percentage*100:.0f}%".ljust(63) + "║")
print(f"║ Source: {source}".ljust(63) + "║")
print(f"║ Tool Calls: {tool_calls}".ljust(63) + "║")
print(f"║ Est. Tokens: ~{est_tokens:,} / {max_tokens:,}".ljust(63) + "║")
print("╠" + "═" * 62 + "╣")
print(f"║ Current Level: {icon} {level}".ljust(63) + "║")

# Warnings shown
warnings = state.get("warnings_shown", {})
if isinstance(warnings, dict) and any(v > 0 for v in warnings.values()):
warn_str = ", ".join(f"{k}: {v}" for k, v in warnings.items() if v > 0)
print(f"║ Warnings Shown: {warn_str}".ljust(63) + "║")

print("╠" + "═" * 62 + "╣")

# Recommendation
recommendations = {
"NORMAL": "Continue working normally",
"WARNING": "Monitor, no action needed",
"ALERT": "Consider chunking complex tasks",
"CRITICAL": "Run /export soon to preserve work",
"EMERGENCY": "Run /export IMMEDIATELY!"
}
print(f"║ Recommendation: {recommendations.get(level, 'Unknown')}".ljust(63) + "║")
print("╚" + "═" * 62 + "╝")

if detail:
print("\n=== Detailed State ===")
print(json.dumps(state, indent=2))
print("\n=== Thresholds ===")
for name, value in thresholds.items():
print(f" {name}: {value*100:.0f}%")

def reset_warnings(): """Reset warning counters.""" if not STATE_FILE.exists(): print("No state file to reset") return

state = load_state()
state["warnings_shown"] = {"warning": 0, "alert": 0, "critical": 0, "emergency": 0}
state["thresholds_triggered"] = []
state["auto_export_triggered"] = False
STATE_FILE.write_text(json.dumps(state, indent=2))
print("✓ Warning counters reset")

def execute_pivot(strategy: str): """Execute a pivot strategy.""" print(f"Executing {strategy} pivot strategy...") print() print(f"To execute, run:") print(f' /agent token-limit-pivot-orchestrator "Execute {strategy} pivot strategy"') print() print("Or for quick export:") print(" /export") print(" /cx")

def main(): parser = argparse.ArgumentParser(description="Token Status & Pivot Management") parser.add_argument("--detail", "-d", action="store_true", help="Show detailed status") parser.add_argument("--reset", "-r", action="store_true", help="Reset warning counters") parser.add_argument("--pivot", "-p", choices=["chunk", "compress", "export", "spawn"], help="Execute pivot strategy")

args = parser.parse_args()

if args.reset:
reset_warnings()
elif args.pivot:
execute_pivot(args.pivot)
else:
show_status(args.detail)

if name == "main": main()