Skip to main content

Observability and Monitoring Guide

Comprehensive Monitoring for Agentic AI Systems

Document ID: C2-OBSERVABILITY
Version: 1.0
Category: P3 - Technical Deep Dives
Audience: DevOps, SRE, Platform Engineers


Executive Summary

Agentic systems introduce unique observability challenges: multi-step reasoning traces, tool call chains, memory operations, and emergent behaviors. This guide covers metrics, logging, tracing, and alerting patterns for production agentic deployments.


Part 1: Metrics Framework

Core Metrics

from dataclasses import dataclass
from enum import Enum
from typing import Dict, List
import time

class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
SUMMARY = "summary"

@dataclass
class AgentMetrics:
"""Core metrics for agentic systems."""

# Request metrics
requests_total: int = 0
requests_success: int = 0
requests_failed: int = 0

# Latency metrics (ms)
latency_p50: float = 0.0
latency_p95: float = 0.0
latency_p99: float = 0.0

# Token metrics
tokens_input_total: int = 0
tokens_output_total: int = 0
tokens_per_request_avg: float = 0.0

# Tool metrics
tool_calls_total: int = 0
tool_calls_success: int = 0
tool_calls_failed: int = 0

# Cost metrics
cost_total: float = 0.0
cost_per_request_avg: float = 0.0

# Agent-specific metrics
iterations_per_task_avg: float = 0.0
reflexion_triggers: int = 0
memory_retrievals: int = 0


class MetricsCollector:
"""Collect and export metrics."""

def __init__(self, namespace: str = "agentic"):
self.namespace = namespace
self.metrics: Dict[str, List[float]] = {}
self.counters: Dict[str, int] = {}

def increment(self, name: str, value: int = 1, labels: Dict = None):
"""Increment a counter."""
key = self._make_key(name, labels)
self.counters[key] = self.counters.get(key, 0) + value

def observe(self, name: str, value: float, labels: Dict = None):
"""Record an observation for histogram/summary."""
key = self._make_key(name, labels)
if key not in self.metrics:
self.metrics[key] = []
self.metrics[key].append(value)
# Keep last 1000 observations
self.metrics[key] = self.metrics[key][-1000:]

def gauge(self, name: str, value: float, labels: Dict = None):
"""Set a gauge value."""
key = self._make_key(name, labels)
self.counters[key] = value

def get_percentile(self, name: str, percentile: float, labels: Dict = None) -> float:
"""Get percentile value for a metric."""
key = self._make_key(name, labels)
values = self.metrics.get(key, [])
if not values:
return 0.0
sorted_values = sorted(values)
index = int(len(sorted_values) * percentile / 100)
return sorted_values[min(index, len(sorted_values) - 1)]

def export_prometheus(self) -> str:
"""Export metrics in Prometheus format."""
lines = []

for key, value in self.counters.items():
lines.append(f"{self.namespace}_{key} {value}")

for key, values in self.metrics.items():
if values:
lines.append(f"{self.namespace}_{key}_count {len(values)}")
lines.append(f"{self.namespace}_{key}_sum {sum(values)}")
lines.append(f"{self.namespace}_{key}_p50 {self.get_percentile(key.split('{')[0], 50)}")
lines.append(f"{self.namespace}_{key}_p95 {self.get_percentile(key.split('{')[0], 95)}")
lines.append(f"{self.namespace}_{key}_p99 {self.get_percentile(key.split('{')[0], 99)}")

return "\n".join(lines)

Prometheus Integration

# prometheus.yml
scrape_configs:
- job_name: 'agentic-platform'
scrape_interval: 15s
static_configs:
- targets: ['localhost:8000']
metrics_path: '/metrics'

# Key metrics to track
metrics:
# Request metrics
- agentic_requests_total{status, paradigm, task_type}
- agentic_request_duration_seconds{paradigm, task_type}

# Token metrics
- agentic_tokens_total{direction, model, provider}
- agentic_tokens_per_request{model}

# Tool metrics
- agentic_tool_calls_total{tool_name, status}
- agentic_tool_duration_seconds{tool_name}

# Cost metrics
- agentic_cost_dollars_total{model, provider}

# Agent metrics
- agentic_iterations_total{paradigm, task_type}
- agentic_reflexion_triggers_total{reason}
- agentic_memory_operations_total{operation_type}

Part 2: Distributed Tracing

OpenTelemetry Integration

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.trace import Status, StatusCode
from contextlib import contextmanager

# Initialize tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("agentic-platform")

# Add exporter
otlp_exporter = OTLPSpanExporter(endpoint="localhost:4317")
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(otlp_exporter)
)


class AgentTracer:
"""Trace agentic workflows."""

def __init__(self):
self.tracer = trace.get_tracer("agent-tracer")

@contextmanager
def trace_task(self, task_id: str, task_type: str):
"""Trace entire task execution."""
with self.tracer.start_as_current_span(
"agent_task",
attributes={
"task.id": task_id,
"task.type": task_type
}
) as span:
try:
yield span
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise

@contextmanager
def trace_iteration(self, iteration_num: int, paradigm: str):
"""Trace single agent iteration."""
with self.tracer.start_as_current_span(
"agent_iteration",
attributes={
"iteration.number": iteration_num,
"iteration.paradigm": paradigm
}
) as span:
yield span

@contextmanager
def trace_llm_call(self, provider: str, model: str):
"""Trace LLM API call."""
with self.tracer.start_as_current_span(
"llm_call",
attributes={
"llm.provider": provider,
"llm.model": model
}
) as span:
start_time = time.time()
try:
yield span
finally:
duration = time.time() - start_time
span.set_attribute("llm.duration_ms", duration * 1000)

@contextmanager
def trace_tool_call(self, tool_name: str, tool_category: str):
"""Trace tool execution."""
with self.tracer.start_as_current_span(
"tool_call",
attributes={
"tool.name": tool_name,
"tool.category": tool_category
}
) as span:
yield span

@contextmanager
def trace_memory_operation(self, operation: str, memory_type: str):
"""Trace memory system operations."""
with self.tracer.start_as_current_span(
"memory_operation",
attributes={
"memory.operation": operation,
"memory.type": memory_type
}
) as span:
yield span

Trace Visualization

@dataclass
class TraceVisualization:
"""Generate trace visualizations."""

def render_trace_timeline(self, trace_id: str) -> str:
"""Render trace as ASCII timeline."""
spans = self._get_spans(trace_id)

# Sort by start time
spans.sort(key=lambda s: s.start_time)

timeline = []
min_time = spans[0].start_time if spans else 0

for span in spans:
offset = int((span.start_time - min_time) * 100)
duration = int(span.duration * 100)

line = " " * offset + "█" * max(duration, 1)
label = f" {span.name} ({span.duration:.2f}s)"

timeline.append(line + label)

return "\n".join(timeline)

def generate_flame_graph(self, trace_id: str) -> Dict:
"""Generate flame graph data structure."""
spans = self._get_spans(trace_id)

def build_tree(parent_id: Optional[str] = None) -> List[Dict]:
children = [s for s in spans if s.parent_id == parent_id]
return [
{
"name": span.name,
"value": span.duration * 1000, # ms
"attributes": span.attributes,
"children": build_tree(span.span_id)
}
for span in children
]

return {"name": "root", "children": build_tree()}

Part 3: Structured Logging

Log Schema

import json
import logging
from dataclasses import dataclass, asdict
from typing import Optional, Any

@dataclass
class AgentLogRecord:
"""Structured log record for agentic systems."""

# Standard fields
timestamp: str
level: str
message: str

# Request context
request_id: Optional[str] = None
session_id: Optional[str] = None
user_id: Optional[str] = None

# Agent context
agent_id: Optional[str] = None
paradigm: Optional[str] = None
iteration: Optional[int] = None

# Operation details
operation: Optional[str] = None
duration_ms: Optional[float] = None

# LLM details
llm_provider: Optional[str] = None
llm_model: Optional[str] = None
tokens_input: Optional[int] = None
tokens_output: Optional[int] = None

# Tool details
tool_name: Optional[str] = None
tool_input: Optional[Dict] = None
tool_output: Optional[Dict] = None

# Error details
error_type: Optional[str] = None
error_message: Optional[str] = None
stack_trace: Optional[str] = None

# Custom attributes
attributes: Optional[Dict[str, Any]] = None


class StructuredLogger:
"""Structured logging for agentic systems."""

def __init__(self, name: str, output: str = "stdout"):
self.logger = logging.getLogger(name)
self.output = output
self._context: Dict[str, Any] = {}

def set_context(self, **kwargs):
"""Set persistent context for all logs."""
self._context.update(kwargs)

def clear_context(self):
"""Clear persistent context."""
self._context.clear()

def log(self, level: str, message: str, **kwargs):
"""Log structured message."""
record = AgentLogRecord(
timestamp=datetime.utcnow().isoformat(),
level=level,
message=message,
**self._context,
**kwargs
)

log_line = json.dumps(asdict(record), default=str)

if level == "ERROR":
self.logger.error(log_line)
elif level == "WARN":
self.logger.warning(log_line)
elif level == "DEBUG":
self.logger.debug(log_line)
else:
self.logger.info(log_line)

def log_llm_call(
self,
provider: str,
model: str,
tokens_input: int,
tokens_output: int,
duration_ms: float,
success: bool
):
"""Log LLM API call."""
self.log(
"INFO" if success else "ERROR",
"LLM call completed",
operation="llm_call",
llm_provider=provider,
llm_model=model,
tokens_input=tokens_input,
tokens_output=tokens_output,
duration_ms=duration_ms
)

def log_tool_call(
self,
tool_name: str,
tool_input: Dict,
tool_output: Dict,
duration_ms: float,
success: bool
):
"""Log tool execution."""
self.log(
"INFO" if success else "ERROR",
f"Tool call: {tool_name}",
operation="tool_call",
tool_name=tool_name,
tool_input=tool_input,
tool_output=tool_output if success else None,
duration_ms=duration_ms
)

def log_iteration(
self,
iteration: int,
paradigm: str,
action_taken: str,
outcome: str
):
"""Log agent iteration."""
self.log(
"INFO",
f"Agent iteration {iteration}",
iteration=iteration,
paradigm=paradigm,
attributes={
"action": action_taken,
"outcome": outcome
}
)

Part 4: Alerting

Alert Rules

# alerting_rules.yml
groups:
- name: agentic_alerts
rules:
# High error rate
- alert: HighAgentErrorRate
expr: |
sum(rate(agentic_requests_total{status="error"}[5m]))
/ sum(rate(agentic_requests_total[5m])) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High agent error rate ({{ $value | humanizePercentage }})"

# High latency
- alert: HighAgentLatency
expr: |
histogram_quantile(0.95, rate(agentic_request_duration_seconds_bucket[5m])) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "P95 latency above 30s ({{ $value | humanizeDuration }})"

# Cost anomaly
- alert: CostSpike
expr: |
sum(increase(agentic_cost_dollars_total[1h]))
> 2 * sum(increase(agentic_cost_dollars_total[1h] offset 1d))
for: 15m
labels:
severity: warning
annotations:
summary: "Cost spike detected - 2x normal hourly rate"

# Tool failures
- alert: ToolFailureRate
expr: |
sum(rate(agentic_tool_calls_total{status="error"}[5m])) by (tool_name)
/ sum(rate(agentic_tool_calls_total[5m])) by (tool_name) > 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "Tool {{ $labels.tool_name }} failing at {{ $value | humanizePercentage }}"

# LLM provider issues
- alert: LLMProviderDown
expr: |
sum(rate(agentic_requests_total{status="error", error_type="provider_error"}[5m])) by (provider)
/ sum(rate(agentic_requests_total[5m])) by (provider) > 0.5
for: 2m
labels:
severity: critical
annotations:
summary: "LLM provider {{ $labels.provider }} experiencing errors"

# Memory system issues
- alert: MemoryRetrievalSlow
expr: |
histogram_quantile(0.95, rate(agentic_memory_operation_seconds_bucket{operation="retrieve"}[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "Memory retrieval P95 above 2s"

Alert Handler

class AlertHandler:
"""Handle and route alerts."""

def __init__(self, config: Dict):
self.config = config
self.notification_channels = self._setup_channels()

async def handle_alert(self, alert: Dict):
"""Process incoming alert."""
severity = alert.get("labels", {}).get("severity", "info")

# Determine notification channels
channels = self._get_channels_for_severity(severity)

# Format message
message = self._format_alert_message(alert)

# Send to channels
for channel in channels:
await channel.send(message)

def _format_alert_message(self, alert: Dict) -> str:
return f"""
🚨 **{alert['labels'].get('alertname', 'Unknown Alert')}**

**Severity:** {alert['labels'].get('severity', 'unknown')}
**Status:** {alert.get('status', 'unknown')}

{alert['annotations'].get('summary', 'No summary')}

**Details:**
{json.dumps(alert.get('labels', {}), indent=2)}
"""

Part 5: Dashboards

Grafana Dashboard Configuration

{
"dashboard": {
"title": "Agentic AI Platform",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "sum(rate(agentic_requests_total[5m])) by (status)",
"legendFormat": "{{status}}"
}
]
},
{
"title": "Latency Distribution",
"type": "heatmap",
"targets": [
{
"expr": "sum(rate(agentic_request_duration_seconds_bucket[5m])) by (le)"
}
]
},
{
"title": "Token Usage",
"type": "graph",
"targets": [
{
"expr": "sum(rate(agentic_tokens_total[5m])) by (direction, model)",
"legendFormat": "{{model}} - {{direction}}"
}
]
},
{
"title": "Cost by Provider",
"type": "piechart",
"targets": [
{
"expr": "sum(increase(agentic_cost_dollars_total[24h])) by (provider)"
}
]
},
{
"title": "Tool Call Success Rate",
"type": "gauge",
"targets": [
{
"expr": "sum(rate(agentic_tool_calls_total{status='success'}[5m])) / sum(rate(agentic_tool_calls_total[5m]))"
}
]
},
{
"title": "Agent Iterations per Task",
"type": "histogram",
"targets": [
{
"expr": "histogram_quantile(0.5, rate(agentic_iterations_bucket[5m]))"
}
]
}
]
}
}

Quick Reference

Key Metrics to Monitor

MetricTypeAlert ThresholdDashboard
Error rateCounter>10%Yes
P95 latencyHistogram>30sYes
Cost per hourCounter2x normalYes
Tool failure rateCounter>20%Yes
Iterations per taskHistogram>10 avgYes
Memory retrieval timeHistogram>2s P95Yes

Observability Stack

ComponentToolPurpose
MetricsPrometheusTime-series metrics
TracingJaeger/TempoDistributed tracing
LoggingLokiLog aggregation
DashboardsGrafanaVisualization
AlertingAlertmanagerAlert routing

Document maintained by CODITECT SRE Team. Feedback: sre@coditect.com