Observability and Monitoring Guide
Comprehensive Monitoring for Agentic AI Systems
Document ID: C2-OBSERVABILITY
Version: 1.0
Category: P3 - Technical Deep Dives
Audience: DevOps, SRE, Platform Engineers
Executive Summary
Agentic systems introduce unique observability challenges: multi-step reasoning traces, tool call chains, memory operations, and emergent behaviors. This guide covers metrics, logging, tracing, and alerting patterns for production agentic deployments.
Part 1: Metrics Framework
Core Metrics
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List
import time
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
SUMMARY = "summary"
@dataclass
class AgentMetrics:
"""Core metrics for agentic systems."""
# Request metrics
requests_total: int = 0
requests_success: int = 0
requests_failed: int = 0
# Latency metrics (ms)
latency_p50: float = 0.0
latency_p95: float = 0.0
latency_p99: float = 0.0
# Token metrics
tokens_input_total: int = 0
tokens_output_total: int = 0
tokens_per_request_avg: float = 0.0
# Tool metrics
tool_calls_total: int = 0
tool_calls_success: int = 0
tool_calls_failed: int = 0
# Cost metrics
cost_total: float = 0.0
cost_per_request_avg: float = 0.0
# Agent-specific metrics
iterations_per_task_avg: float = 0.0
reflexion_triggers: int = 0
memory_retrievals: int = 0
class MetricsCollector:
"""Collect and export metrics."""
def __init__(self, namespace: str = "agentic"):
self.namespace = namespace
self.metrics: Dict[str, List[float]] = {}
self.counters: Dict[str, int] = {}
def increment(self, name: str, value: int = 1, labels: Dict = None):
"""Increment a counter."""
key = self._make_key(name, labels)
self.counters[key] = self.counters.get(key, 0) + value
def observe(self, name: str, value: float, labels: Dict = None):
"""Record an observation for histogram/summary."""
key = self._make_key(name, labels)
if key not in self.metrics:
self.metrics[key] = []
self.metrics[key].append(value)
# Keep last 1000 observations
self.metrics[key] = self.metrics[key][-1000:]
def gauge(self, name: str, value: float, labels: Dict = None):
"""Set a gauge value."""
key = self._make_key(name, labels)
self.counters[key] = value
def get_percentile(self, name: str, percentile: float, labels: Dict = None) -> float:
"""Get percentile value for a metric."""
key = self._make_key(name, labels)
values = self.metrics.get(key, [])
if not values:
return 0.0
sorted_values = sorted(values)
index = int(len(sorted_values) * percentile / 100)
return sorted_values[min(index, len(sorted_values) - 1)]
def export_prometheus(self) -> str:
"""Export metrics in Prometheus format."""
lines = []
for key, value in self.counters.items():
lines.append(f"{self.namespace}_{key} {value}")
for key, values in self.metrics.items():
if values:
lines.append(f"{self.namespace}_{key}_count {len(values)}")
lines.append(f"{self.namespace}_{key}_sum {sum(values)}")
lines.append(f"{self.namespace}_{key}_p50 {self.get_percentile(key.split('{')[0], 50)}")
lines.append(f"{self.namespace}_{key}_p95 {self.get_percentile(key.split('{')[0], 95)}")
lines.append(f"{self.namespace}_{key}_p99 {self.get_percentile(key.split('{')[0], 99)}")
return "\n".join(lines)
Prometheus Integration
# prometheus.yml
scrape_configs:
- job_name: 'agentic-platform'
scrape_interval: 15s
static_configs:
- targets: ['localhost:8000']
metrics_path: '/metrics'
# Key metrics to track
metrics:
# Request metrics
- agentic_requests_total{status, paradigm, task_type}
- agentic_request_duration_seconds{paradigm, task_type}
# Token metrics
- agentic_tokens_total{direction, model, provider}
- agentic_tokens_per_request{model}
# Tool metrics
- agentic_tool_calls_total{tool_name, status}
- agentic_tool_duration_seconds{tool_name}
# Cost metrics
- agentic_cost_dollars_total{model, provider}
# Agent metrics
- agentic_iterations_total{paradigm, task_type}
- agentic_reflexion_triggers_total{reason}
- agentic_memory_operations_total{operation_type}
Part 2: Distributed Tracing
OpenTelemetry Integration
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.trace import Status, StatusCode
from contextlib import contextmanager
# Initialize tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("agentic-platform")
# Add exporter
otlp_exporter = OTLPSpanExporter(endpoint="localhost:4317")
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(otlp_exporter)
)
class AgentTracer:
"""Trace agentic workflows."""
def __init__(self):
self.tracer = trace.get_tracer("agent-tracer")
@contextmanager
def trace_task(self, task_id: str, task_type: str):
"""Trace entire task execution."""
with self.tracer.start_as_current_span(
"agent_task",
attributes={
"task.id": task_id,
"task.type": task_type
}
) as span:
try:
yield span
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
@contextmanager
def trace_iteration(self, iteration_num: int, paradigm: str):
"""Trace single agent iteration."""
with self.tracer.start_as_current_span(
"agent_iteration",
attributes={
"iteration.number": iteration_num,
"iteration.paradigm": paradigm
}
) as span:
yield span
@contextmanager
def trace_llm_call(self, provider: str, model: str):
"""Trace LLM API call."""
with self.tracer.start_as_current_span(
"llm_call",
attributes={
"llm.provider": provider,
"llm.model": model
}
) as span:
start_time = time.time()
try:
yield span
finally:
duration = time.time() - start_time
span.set_attribute("llm.duration_ms", duration * 1000)
@contextmanager
def trace_tool_call(self, tool_name: str, tool_category: str):
"""Trace tool execution."""
with self.tracer.start_as_current_span(
"tool_call",
attributes={
"tool.name": tool_name,
"tool.category": tool_category
}
) as span:
yield span
@contextmanager
def trace_memory_operation(self, operation: str, memory_type: str):
"""Trace memory system operations."""
with self.tracer.start_as_current_span(
"memory_operation",
attributes={
"memory.operation": operation,
"memory.type": memory_type
}
) as span:
yield span
Trace Visualization
@dataclass
class TraceVisualization:
"""Generate trace visualizations."""
def render_trace_timeline(self, trace_id: str) -> str:
"""Render trace as ASCII timeline."""
spans = self._get_spans(trace_id)
# Sort by start time
spans.sort(key=lambda s: s.start_time)
timeline = []
min_time = spans[0].start_time if spans else 0
for span in spans:
offset = int((span.start_time - min_time) * 100)
duration = int(span.duration * 100)
line = " " * offset + "█" * max(duration, 1)
label = f" {span.name} ({span.duration:.2f}s)"
timeline.append(line + label)
return "\n".join(timeline)
def generate_flame_graph(self, trace_id: str) -> Dict:
"""Generate flame graph data structure."""
spans = self._get_spans(trace_id)
def build_tree(parent_id: Optional[str] = None) -> List[Dict]:
children = [s for s in spans if s.parent_id == parent_id]
return [
{
"name": span.name,
"value": span.duration * 1000, # ms
"attributes": span.attributes,
"children": build_tree(span.span_id)
}
for span in children
]
return {"name": "root", "children": build_tree()}
Part 3: Structured Logging
Log Schema
import json
import logging
from dataclasses import dataclass, asdict
from typing import Optional, Any
@dataclass
class AgentLogRecord:
"""Structured log record for agentic systems."""
# Standard fields
timestamp: str
level: str
message: str
# Request context
request_id: Optional[str] = None
session_id: Optional[str] = None
user_id: Optional[str] = None
# Agent context
agent_id: Optional[str] = None
paradigm: Optional[str] = None
iteration: Optional[int] = None
# Operation details
operation: Optional[str] = None
duration_ms: Optional[float] = None
# LLM details
llm_provider: Optional[str] = None
llm_model: Optional[str] = None
tokens_input: Optional[int] = None
tokens_output: Optional[int] = None
# Tool details
tool_name: Optional[str] = None
tool_input: Optional[Dict] = None
tool_output: Optional[Dict] = None
# Error details
error_type: Optional[str] = None
error_message: Optional[str] = None
stack_trace: Optional[str] = None
# Custom attributes
attributes: Optional[Dict[str, Any]] = None
class StructuredLogger:
"""Structured logging for agentic systems."""
def __init__(self, name: str, output: str = "stdout"):
self.logger = logging.getLogger(name)
self.output = output
self._context: Dict[str, Any] = {}
def set_context(self, **kwargs):
"""Set persistent context for all logs."""
self._context.update(kwargs)
def clear_context(self):
"""Clear persistent context."""
self._context.clear()
def log(self, level: str, message: str, **kwargs):
"""Log structured message."""
record = AgentLogRecord(
timestamp=datetime.utcnow().isoformat(),
level=level,
message=message,
**self._context,
**kwargs
)
log_line = json.dumps(asdict(record), default=str)
if level == "ERROR":
self.logger.error(log_line)
elif level == "WARN":
self.logger.warning(log_line)
elif level == "DEBUG":
self.logger.debug(log_line)
else:
self.logger.info(log_line)
def log_llm_call(
self,
provider: str,
model: str,
tokens_input: int,
tokens_output: int,
duration_ms: float,
success: bool
):
"""Log LLM API call."""
self.log(
"INFO" if success else "ERROR",
"LLM call completed",
operation="llm_call",
llm_provider=provider,
llm_model=model,
tokens_input=tokens_input,
tokens_output=tokens_output,
duration_ms=duration_ms
)
def log_tool_call(
self,
tool_name: str,
tool_input: Dict,
tool_output: Dict,
duration_ms: float,
success: bool
):
"""Log tool execution."""
self.log(
"INFO" if success else "ERROR",
f"Tool call: {tool_name}",
operation="tool_call",
tool_name=tool_name,
tool_input=tool_input,
tool_output=tool_output if success else None,
duration_ms=duration_ms
)
def log_iteration(
self,
iteration: int,
paradigm: str,
action_taken: str,
outcome: str
):
"""Log agent iteration."""
self.log(
"INFO",
f"Agent iteration {iteration}",
iteration=iteration,
paradigm=paradigm,
attributes={
"action": action_taken,
"outcome": outcome
}
)
Part 4: Alerting
Alert Rules
# alerting_rules.yml
groups:
- name: agentic_alerts
rules:
# High error rate
- alert: HighAgentErrorRate
expr: |
sum(rate(agentic_requests_total{status="error"}[5m]))
/ sum(rate(agentic_requests_total[5m])) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High agent error rate ({{ $value | humanizePercentage }})"
# High latency
- alert: HighAgentLatency
expr: |
histogram_quantile(0.95, rate(agentic_request_duration_seconds_bucket[5m])) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "P95 latency above 30s ({{ $value | humanizeDuration }})"
# Cost anomaly
- alert: CostSpike
expr: |
sum(increase(agentic_cost_dollars_total[1h]))
> 2 * sum(increase(agentic_cost_dollars_total[1h] offset 1d))
for: 15m
labels:
severity: warning
annotations:
summary: "Cost spike detected - 2x normal hourly rate"
# Tool failures
- alert: ToolFailureRate
expr: |
sum(rate(agentic_tool_calls_total{status="error"}[5m])) by (tool_name)
/ sum(rate(agentic_tool_calls_total[5m])) by (tool_name) > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "Tool {{ $labels.tool_name }} failing at {{ $value | humanizePercentage }}"
# LLM provider issues
- alert: LLMProviderDown
expr: |
sum(rate(agentic_requests_total{status="error", error_type="provider_error"}[5m])) by (provider)
/ sum(rate(agentic_requests_total[5m])) by (provider) > 0.5
for: 2m
labels:
severity: critical
annotations:
summary: "LLM provider {{ $labels.provider }} experiencing errors"
# Memory system issues
- alert: MemoryRetrievalSlow
expr: |
histogram_quantile(0.95, rate(agentic_memory_operation_seconds_bucket{operation="retrieve"}[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "Memory retrieval P95 above 2s"
Alert Handler
class AlertHandler:
"""Handle and route alerts."""
def __init__(self, config: Dict):
self.config = config
self.notification_channels = self._setup_channels()
async def handle_alert(self, alert: Dict):
"""Process incoming alert."""
severity = alert.get("labels", {}).get("severity", "info")
# Determine notification channels
channels = self._get_channels_for_severity(severity)
# Format message
message = self._format_alert_message(alert)
# Send to channels
for channel in channels:
await channel.send(message)
def _format_alert_message(self, alert: Dict) -> str:
return f"""
🚨 **{alert['labels'].get('alertname', 'Unknown Alert')}**
**Severity:** {alert['labels'].get('severity', 'unknown')}
**Status:** {alert.get('status', 'unknown')}
{alert['annotations'].get('summary', 'No summary')}
**Details:**
{json.dumps(alert.get('labels', {}), indent=2)}
"""
Part 5: Dashboards
Grafana Dashboard Configuration
{
"dashboard": {
"title": "Agentic AI Platform",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "sum(rate(agentic_requests_total[5m])) by (status)",
"legendFormat": "{{status}}"
}
]
},
{
"title": "Latency Distribution",
"type": "heatmap",
"targets": [
{
"expr": "sum(rate(agentic_request_duration_seconds_bucket[5m])) by (le)"
}
]
},
{
"title": "Token Usage",
"type": "graph",
"targets": [
{
"expr": "sum(rate(agentic_tokens_total[5m])) by (direction, model)",
"legendFormat": "{{model}} - {{direction}}"
}
]
},
{
"title": "Cost by Provider",
"type": "piechart",
"targets": [
{
"expr": "sum(increase(agentic_cost_dollars_total[24h])) by (provider)"
}
]
},
{
"title": "Tool Call Success Rate",
"type": "gauge",
"targets": [
{
"expr": "sum(rate(agentic_tool_calls_total{status='success'}[5m])) / sum(rate(agentic_tool_calls_total[5m]))"
}
]
},
{
"title": "Agent Iterations per Task",
"type": "histogram",
"targets": [
{
"expr": "histogram_quantile(0.5, rate(agentic_iterations_bucket[5m]))"
}
]
}
]
}
}
Quick Reference
Key Metrics to Monitor
| Metric | Type | Alert Threshold | Dashboard |
|---|---|---|---|
| Error rate | Counter | >10% | Yes |
| P95 latency | Histogram | >30s | Yes |
| Cost per hour | Counter | 2x normal | Yes |
| Tool failure rate | Counter | >20% | Yes |
| Iterations per task | Histogram | >10 avg | Yes |
| Memory retrieval time | Histogram | >2s P95 | Yes |
Observability Stack
| Component | Tool | Purpose |
|---|---|---|
| Metrics | Prometheus | Time-series metrics |
| Tracing | Jaeger/Tempo | Distributed tracing |
| Logging | Loki | Log aggregation |
| Dashboards | Grafana | Visualization |
| Alerting | Alertmanager | Alert routing |
Document maintained by CODITECT SRE Team. Feedback: sre@coditect.com