#!/usr/bin/env python3 """ CODITECT Token Economics Instrumentation (ADR-111)
Implements comprehensive token tracking, budget enforcement, and cost optimization for multi-agent autonomous development.
Key Insight from Ralph Wiggum Analysis: "Token economics matter—15x multiplier for multi-agent means cost awareness is essential."
Features:
- Per-agent, per-task, per-iteration token tracking
- Hierarchical budget enforcement (org → project → task → agent)
- Real-time cost calculation with model-specific pricing
- Throttling and auto-throttle on budget exceeded
- Efficiency metrics and forecasting
Usage: from scripts.core.ralph_wiggum import TokenEconomicsService, TokenRecord
service = TokenEconomicsService()
# Record consumption
await service.record_consumption(TokenRecord(
model="claude-opus-4-5",
input_tokens=1000,
output_tokens=500,
task_id="task-001",
agent_id="agent-001",
))
# Check budget before call
result = await service.check_budget(context, estimated_cost=0.50)
if result.allowed:
# proceed with API call
pass
Author: CODITECT Framework Version: 1.0.0 Created: January 24, 2026 ADR Reference: ADR-111-token-economics-instrumentation.md """
import json import logging import time import uuid from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple
Configure logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
ADR-114: Configuration path (framework install)
CONFIG_PATH = Path(file).parent.parent.parent.parent / "config" / "model-pricing.json"
=============================================================================
EXCEPTIONS
=============================================================================
class TokenEconomicsError(Exception): """Base exception for token economics.""" pass
class BudgetExceededError(TokenEconomicsError): """Budget limit exceeded.""" pass
class PricingError(TokenEconomicsError): """Error loading or applying pricing.""" pass
=============================================================================
ENUMS
=============================================================================
class BudgetAction(Enum): """Actions when budget is exceeded.""" ALLOW = "allow" # Proceed normally THROTTLE = "throttle" # Delay before proceeding DENY = "deny" # Block the request ALERT_ONLY = "alert_only" # Log warning but proceed
class BudgetLevel(Enum): """Budget hierarchy levels.""" ORGANIZATION = "organization" PROJECT = "project" TASK = "task" AGENT = "agent"
=============================================================================
DATA MODELS
=============================================================================
@dataclass class ModelPricing: """Pricing per million tokens (USD).""" input_price: float = 1.0 output_price: float = 1.0 cached_price: float = 0.0 cache_write_price: float = 0.0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass class TokenConsumption: """Token consumption details.""" model: str = "" input_tokens: int = 0 output_tokens: int = 0 cache_read_tokens: int = 0 cache_write_tokens: int = 0
@property
def total_tokens(self) -> int:
"""Total tokens consumed."""
return self.input_tokens + self.output_tokens + self.cache_read_tokens + self.cache_write_tokens
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
data = asdict(self)
data["total_tokens"] = self.total_tokens
return data
@dataclass class CostBreakdown: """Cost breakdown by category.""" input_cost: float = 0.0 output_cost: float = 0.0 cache_read_cost: float = 0.0 cache_write_cost: float = 0.0 total_cost: float = 0.0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass class TokenRecord: """ Complete token consumption record.
This is the primary data structure for tracking token usage.
"""
record_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
# Context
organization_id: str = ""
project_id: str = ""
task_id: str = ""
agent_id: str = ""
iteration: int = 1
checkpoint_id: str = ""
# Consumption
consumption: TokenConsumption = field(default_factory=TokenConsumption)
# Cost
cost: CostBreakdown = field(default_factory=CostBreakdown)
# Metadata
tool_calls: List[str] = field(default_factory=list)
latency_ms: int = 0
success: bool = True
error_type: str = ""
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
data = asdict(self)
data["consumption"] = self.consumption.to_dict()
data["cost"] = self.cost.to_dict()
return data
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TokenRecord":
"""Create from dictionary."""
if "consumption" in data and isinstance(data["consumption"], dict):
data["consumption"] = TokenConsumption(**{
k: v for k, v in data["consumption"].items()
if k in TokenConsumption.__dataclass_fields__
})
if "cost" in data and isinstance(data["cost"], dict):
data["cost"] = CostBreakdown(**{
k: v for k, v in data["cost"].items()
if k in CostBreakdown.__dataclass_fields__
})
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
@dataclass class Budget: """Budget configuration for a context.""" limit_usd: float = 100.0 limit_tokens: int = 1_000_000 alert_threshold_percent: float = 80.0 hard_limit_action: str = BudgetAction.THROTTLE.value period: str = "task" # task, daily, monthly
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Budget":
"""Create from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
@dataclass class BudgetContext: """Context for budget checking.""" organization_id: str = "" project_id: str = "" task_id: str = "" agent_id: str = ""
def to_key(self) -> str:
"""Convert to storage key."""
parts = []
if self.organization_id:
parts.append(f"org:{self.organization_id}")
if self.project_id:
parts.append(f"proj:{self.project_id}")
if self.task_id:
parts.append(f"task:{self.task_id}")
if self.agent_id:
parts.append(f"agent:{self.agent_id}")
return "/".join(parts) if parts else "global"
@dataclass class BudgetCheckResult: """Result of budget check.""" allowed: bool = True action: str = BudgetAction.ALLOW.value throttle_delay_ms: int = 0 reason: str = "" budget_utilization: Dict[str, float] = field(default_factory=dict) estimated_cost: float = 0.0 remaining_budget: float = 0.0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass class EfficiencyMetrics: """Efficiency metrics for token usage.""" tokens_per_tool_call: float = 0.0 cost_per_iteration: float = 0.0 cache_hit_rate: float = 0.0 output_input_ratio: float = 0.0 cost_per_completed_task: float = 0.0 model_usage_breakdown: List[Dict[str, Any]] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass class RunningTotal: """Running total for a context.""" context_key: str = "" total_tokens: int = 0 total_cost_usd: float = 0.0 total_calls: int = 0 period_start: str = "" last_updated: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
=============================================================================
PRICING SERVICE
=============================================================================
class PricingService: """Service for loading and applying token pricing."""
# Default pricing (January 2026)
DEFAULT_PRICING: Dict[str, ModelPricing] = {
"claude-opus-4-5": ModelPricing(
input_price=15.00,
output_price=75.00,
cached_price=1.50,
cache_write_price=18.75,
),
"claude-sonnet-4-5": ModelPricing(
input_price=3.00,
output_price=15.00,
cached_price=0.30,
cache_write_price=3.75,
),
"claude-haiku-4-5": ModelPricing(
input_price=0.80,
output_price=4.00,
cached_price=0.08,
cache_write_price=1.00,
),
}
_cache: Optional[Dict[str, ModelPricing]] = None
@classmethod
def load_pricing(cls, refresh: bool = False) -> Dict[str, ModelPricing]:
"""Load pricing from config file or use defaults."""
if cls._cache is not None and not refresh:
return cls._cache
pricing = dict(cls.DEFAULT_PRICING)
if CONFIG_PATH.exists():
try:
data = json.loads(CONFIG_PATH.read_text())
models = data.get("models", {})
for model_id, config in models.items():
# Normalize model ID
normalized = model_id.replace("-20251101", "").replace("-20250929", "").replace("-20251001", "")
pricing[normalized] = ModelPricing(
input_price=config.get("input_price", 1.0),
output_price=config.get("output_price", 1.0),
cached_price=config.get("cached_price", 0.0),
cache_write_price=config.get("cache_write_price", 0.0),
)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Error loading pricing config: {e}, using defaults")
cls._cache = pricing
return pricing
@classmethod
def calculate_cost(cls, consumption: TokenConsumption) -> CostBreakdown:
"""Calculate cost for token consumption."""
pricing = cls.load_pricing()
# Normalize model name
model = consumption.model.replace("-20251101", "").replace("-20250929", "").replace("-20251001", "")
model_pricing = pricing.get(model)
if not model_pricing:
# Fall back to default pricing
model_pricing = ModelPricing()
logger.warning(f"No pricing found for model {model}, using default")
input_cost = (consumption.input_tokens / 1_000_000) * model_pricing.input_price
output_cost = (consumption.output_tokens / 1_000_000) * model_pricing.output_price
cache_read_cost = (consumption.cache_read_tokens / 1_000_000) * model_pricing.cached_price
cache_write_cost = (consumption.cache_write_tokens / 1_000_000) * model_pricing.cache_write_price
return CostBreakdown(
input_cost=round(input_cost, 6),
output_cost=round(output_cost, 6),
cache_read_cost=round(cache_read_cost, 6),
cache_write_cost=round(cache_write_cost, 6),
total_cost=round(input_cost + output_cost + cache_read_cost + cache_write_cost, 6),
)
@classmethod
def estimate_cost(
cls,
model: str,
estimated_input_tokens: int,
estimated_output_tokens: int,
) -> float:
"""Estimate cost before API call."""
consumption = TokenConsumption(
model=model,
input_tokens=estimated_input_tokens,
output_tokens=estimated_output_tokens,
)
cost = cls.calculate_cost(consumption)
return cost.total_cost
=============================================================================
TOKEN ECONOMICS SERVICE
=============================================================================
class TokenEconomicsService: """ Service for comprehensive token economics management.
Features:
- Token consumption recording
- Budget management and enforcement
- Running totals with real-time updates
- Efficiency metrics calculation
- Cost forecasting
"""
# Throttling configuration
INITIAL_THROTTLE_DELAY_MS = 1000
MAX_THROTTLE_DELAY_MS = 60000
THROTTLE_BACKOFF_MULTIPLIER = 2
def __init__(self, storage_path: Optional[Path] = None):
"""Initialize token economics service."""
# ADR-114 & ADR-118: Token economics is Tier 3 data (regenerable)
_user_data = Path.home() / "PROJECTS" / ".coditect-data"
default_path = _user_data / "token-economics" if _user_data.exists() else Path.home() / ".coditect" / "token-economics"
self.storage_path = storage_path or default_path
self.storage_path.mkdir(parents=True, exist_ok=True)
self._records: List[TokenRecord] = []
self._running_totals: Dict[str, RunningTotal] = {}
self._budgets: Dict[str, Budget] = {}
self._throttle_states: Dict[str, int] = {} # context -> delay_ms
# Load persisted data
self._load_state()
def _load_state(self) -> None:
"""Load persisted state."""
totals_path = self.storage_path / "running_totals.json"
if totals_path.exists():
try:
data = json.loads(totals_path.read_text())
for key, total_data in data.items():
self._running_totals[key] = RunningTotal(**total_data)
except (json.JSONDecodeError, KeyError):
pass
budgets_path = self.storage_path / "budgets.json"
if budgets_path.exists():
try:
data = json.loads(budgets_path.read_text())
for key, budget_data in data.items():
self._budgets[key] = Budget.from_dict(budget_data)
except (json.JSONDecodeError, KeyError):
pass
def _save_state(self) -> None:
"""Save state to disk."""
totals_path = self.storage_path / "running_totals.json"
totals_path.write_text(json.dumps(
{k: v.to_dict() for k, v in self._running_totals.items()},
indent=2,
))
budgets_path = self.storage_path / "budgets.json"
budgets_path.write_text(json.dumps(
{k: v.to_dict() for k, v in self._budgets.items()},
indent=2,
))
def record_consumption(
self,
model: str,
input_tokens: int,
output_tokens: int,
task_id: str = "",
agent_id: str = "",
organization_id: str = "",
project_id: str = "",
iteration: int = 1,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
tool_calls: Optional[List[str]] = None,
latency_ms: int = 0,
success: bool = True,
error_type: str = "",
) -> TokenRecord:
"""
Record token consumption.
This should be called after every API call to track usage.
"""
consumption = TokenConsumption(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_write_tokens=cache_write_tokens,
)
cost = PricingService.calculate_cost(consumption)
record = TokenRecord(
organization_id=organization_id,
project_id=project_id,
task_id=task_id,
agent_id=agent_id,
iteration=iteration,
consumption=consumption,
cost=cost,
tool_calls=tool_calls or [],
latency_ms=latency_ms,
success=success,
error_type=error_type,
)
self._records.append(record)
# Update running totals
self._update_running_totals(record)
# Save records to file
self._save_record(record)
logger.debug(
f"Recorded consumption: {consumption.total_tokens} tokens, "
f"${cost.total_cost:.4f} for task {task_id}"
)
return record
def _save_record(self, record: TokenRecord) -> None:
"""Save record to file."""
records_dir = self.storage_path / "records"
records_dir.mkdir(parents=True, exist_ok=True)
# Save by date
date_str = record.timestamp[:10]
records_file = records_dir / f"{date_str}.jsonl"
with open(records_file, "a") as f:
f.write(record.to_json() + "\n")
def _update_running_totals(self, record: TokenRecord) -> None:
"""Update running totals for all applicable contexts."""
contexts = [
f"org:{record.organization_id}" if record.organization_id else None,
f"proj:{record.project_id}" if record.project_id else None,
f"task:{record.task_id}" if record.task_id else None,
f"agent:{record.agent_id}" if record.agent_id else None,
]
for ctx in [c for c in contexts if c]:
if ctx not in self._running_totals:
self._running_totals[ctx] = RunningTotal(
context_key=ctx,
period_start=datetime.now(timezone.utc).isoformat(),
)
total = self._running_totals[ctx]
total.total_tokens += record.consumption.total_tokens
total.total_cost_usd += record.cost.total_cost
total.total_calls += 1
total.last_updated = datetime.now(timezone.utc).isoformat()
self._save_state()
def set_budget(self, context: BudgetContext, budget: Budget) -> None:
"""Set budget for a context."""
key = context.to_key()
self._budgets[key] = budget
self._save_state()
logger.info(f"Set budget for {key}: ${budget.limit_usd}")
def get_budget(self, context: BudgetContext) -> Optional[Budget]:
"""Get budget for a context."""
return self._budgets.get(context.to_key())
def check_budget(
self,
context: BudgetContext,
estimated_cost: float,
) -> BudgetCheckResult:
"""
Check if budget allows a request.
Should be called before making API calls.
"""
key = context.to_key()
budget = self._budgets.get(key)
total = self._running_totals.get(key, RunningTotal(context_key=key))
# No budget set - allow
if not budget:
return BudgetCheckResult(
allowed=True,
action=BudgetAction.ALLOW.value,
estimated_cost=estimated_cost,
remaining_budget=float("inf"),
)
current_cost = total.total_cost_usd
projected_cost = current_cost + estimated_cost
utilization = (current_cost / budget.limit_usd) * 100 if budget.limit_usd > 0 else 0
result = BudgetCheckResult(
estimated_cost=estimated_cost,
remaining_budget=max(0, budget.limit_usd - current_cost),
budget_utilization={key: utilization},
)
# Check if would exceed budget
if projected_cost > budget.limit_usd:
result.allowed = budget.hard_limit_action == BudgetAction.ALERT_ONLY.value
result.action = budget.hard_limit_action
result.reason = f"Would exceed budget: ${projected_cost:.2f} > ${budget.limit_usd:.2f}"
if budget.hard_limit_action == BudgetAction.THROTTLE.value:
result.throttle_delay_ms = self._get_throttle_delay(key)
logger.warning(f"Budget check failed for {key}: {result.reason}")
# Check if approaching threshold
elif utilization >= budget.alert_threshold_percent:
result.allowed = True
result.action = BudgetAction.ALERT_ONLY.value
result.reason = f"Approaching budget threshold: {utilization:.1f}%"
logger.info(f"Budget warning for {key}: {result.reason}")
else:
result.allowed = True
result.action = BudgetAction.ALLOW.value
return result
def _get_throttle_delay(self, context_key: str) -> int:
"""Get throttle delay with exponential backoff."""
current_delay = self._throttle_states.get(context_key, 0)
if current_delay == 0:
new_delay = self.INITIAL_THROTTLE_DELAY_MS
else:
new_delay = min(
current_delay * self.THROTTLE_BACKOFF_MULTIPLIER,
self.MAX_THROTTLE_DELAY_MS,
)
self._throttle_states[context_key] = new_delay
return new_delay
def reset_throttle(self, context: BudgetContext) -> None:
"""Reset throttle state for a context."""
key = context.to_key()
if key in self._throttle_states:
del self._throttle_states[key]
def get_running_total(self, context: BudgetContext) -> RunningTotal:
"""Get running total for a context."""
key = context.to_key()
return self._running_totals.get(key, RunningTotal(context_key=key))
def get_consumption(
self,
context: BudgetContext,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
) -> Dict[str, Any]:
"""Get consumption summary for a context."""
# Filter records by context and time
filtered = []
for record in self._records:
# Check context match
if context.task_id and record.task_id != context.task_id:
continue
if context.agent_id and record.agent_id != context.agent_id:
continue
if context.project_id and record.project_id != context.project_id:
continue
if context.organization_id and record.organization_id != context.organization_id:
continue
# Check time range
if start_time and record.timestamp < start_time:
continue
if end_time and record.timestamp > end_time:
continue
filtered.append(record)
# Aggregate
total_tokens = sum(r.consumption.total_tokens for r in filtered)
total_cost = sum(r.cost.total_cost for r in filtered)
total_calls = len(filtered)
by_model: Dict[str, Dict[str, Any]] = {}
for record in filtered:
model = record.consumption.model
if model not in by_model:
by_model[model] = {
"tokens": 0,
"cost": 0.0,
"calls": 0,
}
by_model[model]["tokens"] += record.consumption.total_tokens
by_model[model]["cost"] += record.cost.total_cost
by_model[model]["calls"] += 1
return {
"context": context.to_key(),
"total_tokens": total_tokens,
"total_cost_usd": round(total_cost, 4),
"total_calls": total_calls,
"by_model": by_model,
}
def get_efficiency_metrics(self, context: BudgetContext) -> EfficiencyMetrics:
"""Calculate efficiency metrics for a context."""
consumption = self.get_consumption(context)
total_tokens = consumption["total_tokens"]
total_cost = consumption["total_cost_usd"]
total_calls = consumption["total_calls"]
# Calculate metrics
tokens_per_call = total_tokens / max(total_calls, 1)
# Calculate cache hit rate from records
cache_reads = 0
input_tokens = 0
output_tokens = 0
for record in self._records:
if context.task_id and record.task_id != context.task_id:
continue
cache_reads += record.consumption.cache_read_tokens
input_tokens += record.consumption.input_tokens
output_tokens += record.consumption.output_tokens
cache_hit_rate = cache_reads / max(cache_reads + input_tokens, 1)
output_input_ratio = output_tokens / max(input_tokens, 1)
# Model breakdown
model_breakdown = []
for model, stats in consumption.get("by_model", {}).items():
model_breakdown.append({
"model": model,
"percentage": (stats["tokens"] / max(total_tokens, 1)) * 100,
"cost": stats["cost"],
})
return EfficiencyMetrics(
tokens_per_tool_call=round(tokens_per_call, 2),
cost_per_iteration=round(total_cost / max(total_calls, 1), 4),
cache_hit_rate=round(cache_hit_rate, 4),
output_input_ratio=round(output_input_ratio, 4),
model_usage_breakdown=model_breakdown,
)
=============================================================================
EXPORTS
=============================================================================
all = [ # Exceptions "TokenEconomicsError", "BudgetExceededError", "PricingError", # Enums "BudgetAction", "BudgetLevel", # Data Models "ModelPricing", "TokenConsumption", "CostBreakdown", "TokenRecord", "Budget", "BudgetContext", "BudgetCheckResult", "EfficiencyMetrics", "RunningTotal", # Services "PricingService", "TokenEconomicsService", ]