Monitoring Implementation Patterns
1. Core Monitoring Framework
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Status
Accepted | YYYY-MM-DD
A. Metrics Collection Service
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import asyncio
import statistics
@dataclass
class MetricPoint:
"""Single metric measurement"""
value: float
timestamp: datetime
labels: Dict[str, str]
metadata: Optional[Dict[str, Any]] = None
class MetricsCollector:
"""Centralized metrics collection service"""
def __init__(
self,
prometheus_client,
time_series_db,
collection_interval: int = 10 # seconds
):
self.prom = prometheus_client
self.tsdb = time_series_db
self.collection_interval = collection_interval
self.metrics: Dict[str, List[MetricPoint]] = {}
self.gauges: Dict[str, Any] = {}
self.histograms: Dict[str, Any] = {}
self.counters: Dict[str, Any] = {}
async def initialize(self):
"""Initialize metrics collection"""
# Start collection loop
asyncio.create_task(self._collection_loop())
async def record_metric(
self,
name: str,
value: float,
labels: Optional[Dict[str, str]] = None,
metadata: Optional[Dict[str, Any]] = None
):
"""Record a metric value"""
if name not in self.metrics:
self.metrics[name] = []
point = MetricPoint(
value=value,
timestamp=datetime.utcnow(),
labels=labels or {},
metadata=metadata
)
self.metrics[name].append(point)
# Update Prometheus metric
if name not in self.gauges:
self.gauges[name] = self.prom.Gauge(
name,
documentation=f"Gauge for {name}",
labelnames=list(point.labels.keys())
)
self.gauges[name].labels(**point.labels).set(value)
async def record_histogram(
self,
name: str,
value: float,
labels: Optional[Dict[str, str]] = None,
buckets: Optional[List[float]] = None
):
"""Record a histogram metric"""
if name not in self.histograms:
self.histograms[name] = self.prom.Histogram(
name,
documentation=f"Histogram for {name}",
labelnames=list(labels.keys()) if labels else [],
buckets=buckets or self.prom.DefBuckets
)
self.histograms[name].labels(**labels or {}).observe(value)
async def increment_counter(
self,
name: str,
labels: Optional[Dict[str, str]] = None,
value: float = 1.0
):
"""Increment a counter metric"""
if name not in self.counters:
self.counters[name] = self.prom.Counter(
name,
documentation=f"Counter for {name}",
labelnames=list(labels.keys()) if labels else []
)
self.counters[name].labels(**labels or {}).inc(value)
async def get_metric_statistics(
self,
name: str,
window: timedelta,
labels: Optional[Dict[str, str]] = None
) -> Dict[str, float]:
"""Get statistics for a metric over time window"""
if name not in self.metrics:
return {}
cutoff = datetime.utcnow() - window
points = [
p.value for p in self.metrics[name]
if p.timestamp >= cutoff
and all(
p.labels.get(k) == v
for k, v in (labels or {}).items()
)
]
if not points:
return {}
return {
'min': min(points),
'max': max(points),
'avg': statistics.mean(points),
'median': statistics.median(points),
'std_dev': statistics.stdev(points) if len(points) > 1 else 0,
'count': len(points)
}
async def _collection_loop(self):
"""Background metric collection loop"""
while True:
try:
# Store metrics in time series database
await self._store_metrics()
# Clean up old metrics
await self._cleanup_metrics()
except Exception as e:
logging.error(f"Error in metrics collection: {str(e)}")
await asyncio.sleep(self.collection_interval)
async def _store_metrics(self):
"""Store metrics in time series database"""
timestamp = datetime.utcnow()
for name, points in self.metrics.items():
if not points:
continue
# Group points by labels
grouped_points = {}
for point in points:
label_key = tuple(sorted(point.labels.items()))
if label_key not in grouped_points:
grouped_points[label_key] = []
grouped_points[label_key].append(point.value)
# Store aggregated values
for labels, values in grouped_points.items():
await self.tsdb.store_metric(
name=name,
timestamp=timestamp,
value=statistics.mean(values),
labels=dict(labels)
)
async def _cleanup_metrics(self):
"""Clean up old metric points"""
cutoff = datetime.utcnow() - timedelta(hours=1)
for name in self.metrics:
self.metrics[name] = [
p for p in self.metrics[name]
if p.timestamp >= cutoff
]
B. Performance Monitoring Service
from typing import Optional, Dict, Any
import time
import psutil
import os
class PerformanceMonitor:
"""System performance monitoring service"""
def __init__(
self,
metrics_collector: MetricsCollector
):
self.metrics = metrics_collector
self.process = psutil.Process(os.getpid())
async def collect_system_metrics(self):
"""Collect system performance metrics"""
# CPU metrics
await self.metrics.record_metric(
'system_cpu_percent',
psutil.cpu_percent(interval=None),
{'type': 'system'}
)
await self.metrics.record_metric(
'process_cpu_percent',
self.process.cpu_percent(interval=None),
{'type': 'process'}
)
# Memory metrics
mem = psutil.virtual_memory()
await self.metrics.record_metric(
'system_memory_usage',
mem.percent,
{'type': 'system'}
)
process_mem = self.process.memory_info()
await self.metrics.record_metric(
'process_memory_usage',
process_mem.rss / 1024 / 1024, # MB
{'type': 'process'}
)
# Disk metrics
disk = psutil.disk_usage('/')
await self.metrics.record_metric(
'disk_usage_percent',
disk.percent,
{'path': '/'}
)
# Network metrics
net_io = psutil.net_io_counters()
await self.metrics.record_metric(
'network_bytes_sent',
net_io.bytes_sent,
{'type': 'sent'}
)
await self.metrics.record_metric(
'network_bytes_recv',
net_io.bytes_recv,
{'type': 'received'}
)
C. Application Monitoring Service
from typing import Optional, Dict, Any, List
from datetime import datetime
class ApplicationMonitor:
"""Application-specific monitoring service"""
def __init__(
self,
metrics_collector: MetricsCollector
):
self.metrics = metrics_collector
self.request_times: List[float] = []
async def record_request(
self,
method: str,
path: str,
status_code: int,
duration: float,
user_id: Optional[str] = None
):
"""Record API request metrics"""
labels = {
'method': method,
'path': path,
'status': str(status_code)
}
if user_id:
labels['user_id'] = user_id
# Record request duration
await self.metrics.record_histogram(
'http_request_duration_seconds',
duration,
labels
)
# Increment request counter
await self.metrics.increment_counter(
'http_requests_total',
labels
)
# Record status code
await self.metrics.increment_counter(
'http_status_codes',
{'status': str(status_code)}
)
async def record_document_processing(
self,
doc_id: str,
processing_time: float,
status: str,
metadata: Optional[Dict[str, Any]] = None
):
"""Record document processing metrics"""
labels = {
'status': status,
'doc_id': doc_id
}
await self.metrics.record_histogram(
'document_processing_seconds',
processing_time,
labels
)
await self.metrics.increment_counter(
'documents_processed_total',
labels
)
if metadata:
await self.metrics.record_metric(
'document_size_bytes',
metadata.get('size', 0),
labels
)
D. Health Check Service
from typing import Dict, Any, List
from enum import Enum
import asyncio
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
class HealthCheck:
"""System health monitoring service"""
def __init__(
self,
metrics_collector: MetricsCollector
):
self.metrics = metrics_collector
self.checks: Dict[str, callable] = {}
def register_check(
self,
name: str,
check_func: callable
):
"""Register health check function"""
self.checks[name] = check_func
async def run_checks(self) -> Dict[str, Any]:
"""Run all health checks"""
results = {}
overall_status = HealthStatus.HEALTHY
for name, check_func in self.checks.items():
try:
status = await check_func()
results[name] = {
'status': status.value,
'timestamp': datetime.utcnow().isoformat()
}
# Update overall status
if status == HealthStatus.UNHEALTHY:
overall_status = HealthStatus.UNHEALTHY
elif status == HealthStatus.DEGRADED and overall_status != HealthStatus.UNHEALTHY:
overall_status = HealthStatus.DEGRADED
# Record metric
await self.metrics.record_metric(
'health_check_status',
1 if status == HealthStatus.HEALTHY else 0,
{'check': name}
)
except Exception as e:
results[name] = {
'status': HealthStatus.UNHEALTHY.value,
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}
overall_status = HealthStatus.UNHEALTHY
# Record error metric
await self.metrics.increment_counter(
'health_check_errors',
{'check': name}
)
return {
'status': overall_status.value,
'timestamp': datetime.utcnow().isoformat(),
'checks': results
}
E. Monitoring Dashboard Data Service
from typing import Dict, Any, List
from datetime import datetime, timedelta
class DashboardDataService:
"""Service for preparing monitoring dashboard data"""
def __init__(
self,
metrics_collector: MetricsCollector,
health_check: HealthCheck
):
self.metrics = metrics_collector
self.health_check = health_check
async def get_dashboard_data(
self,
time_window: timedelta = timedelta(hours=1)
) -> Dict[str, Any]:
"""Get aggregated data for dashboard"""
# Get health status
health_status = await self.health_check.run_checks()
# Get system metrics
system_metrics = {
'cpu': await self.metrics.get_metric_statistics(
'system_cpu_percent',
time_window,
{'type': 'system'}
),
'memory': await self.metrics.get_metric_statistics(
'system_memory_usage',
time_window,
{'type': 'system'}
),
'disk': await self.metrics.get_metric_statistics(
'disk_usage_percent',
time_window,
{'path': '/'}
)
}
# Get request metrics
request_metrics = {
'total': await self.metrics.get_metric_statistics(
'http_requests_total',
time_window
),
'duration': await self.metrics.get_metric_statistics(
'http_request_duration_seconds',
time_window
)
}
# Get document processing metrics
doc_metrics = {
'processed': await self.metrics.get_metric_statistics(
'documents_processed_total',
time_window
),
'processing_time': await self.metrics.get_metric_statistics(
'document_processing_seconds',
time_window
)
}
return {
'timestamp': datetime.utcnow().isoformat(),
'health_status': health_status,
'system_metrics': system_metrics,
'request_metrics': request_metrics,
'document_metrics': doc_metrics
}
Would you like me to:
- Add more monitoring components?
- Create detailed alert configurations?
- Add visualization components?
- Create specific monitoring scenarios?