Skip to main content

Monitoring Implementation Patterns

1. Core Monitoring Framework

Context

The current situation requires a decision because:

  • Requirement 1
  • Constraint 2
  • Need 3

Status

Accepted | YYYY-MM-DD

A. Metrics Collection Service

from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import asyncio
import statistics

@dataclass
class MetricPoint:
"""Single metric measurement"""
value: float
timestamp: datetime
labels: Dict[str, str]
metadata: Optional[Dict[str, Any]] = None

class MetricsCollector:
"""Centralized metrics collection service"""

def __init__(
self,
prometheus_client,
time_series_db,
collection_interval: int = 10 # seconds
):
self.prom = prometheus_client
self.tsdb = time_series_db
self.collection_interval = collection_interval
self.metrics: Dict[str, List[MetricPoint]] = {}
self.gauges: Dict[str, Any] = {}
self.histograms: Dict[str, Any] = {}
self.counters: Dict[str, Any] = {}

async def initialize(self):
"""Initialize metrics collection"""
# Start collection loop
asyncio.create_task(self._collection_loop())

async def record_metric(
self,
name: str,
value: float,
labels: Optional[Dict[str, str]] = None,
metadata: Optional[Dict[str, Any]] = None
):
"""Record a metric value"""
if name not in self.metrics:
self.metrics[name] = []

point = MetricPoint(
value=value,
timestamp=datetime.utcnow(),
labels=labels or {},
metadata=metadata
)

self.metrics[name].append(point)

# Update Prometheus metric
if name not in self.gauges:
self.gauges[name] = self.prom.Gauge(
name,
documentation=f"Gauge for {name}",
labelnames=list(point.labels.keys())
)

self.gauges[name].labels(**point.labels).set(value)

async def record_histogram(
self,
name: str,
value: float,
labels: Optional[Dict[str, str]] = None,
buckets: Optional[List[float]] = None
):
"""Record a histogram metric"""
if name not in self.histograms:
self.histograms[name] = self.prom.Histogram(
name,
documentation=f"Histogram for {name}",
labelnames=list(labels.keys()) if labels else [],
buckets=buckets or self.prom.DefBuckets
)

self.histograms[name].labels(**labels or {}).observe(value)

async def increment_counter(
self,
name: str,
labels: Optional[Dict[str, str]] = None,
value: float = 1.0
):
"""Increment a counter metric"""
if name not in self.counters:
self.counters[name] = self.prom.Counter(
name,
documentation=f"Counter for {name}",
labelnames=list(labels.keys()) if labels else []
)

self.counters[name].labels(**labels or {}).inc(value)

async def get_metric_statistics(
self,
name: str,
window: timedelta,
labels: Optional[Dict[str, str]] = None
) -> Dict[str, float]:
"""Get statistics for a metric over time window"""
if name not in self.metrics:
return {}

cutoff = datetime.utcnow() - window
points = [
p.value for p in self.metrics[name]
if p.timestamp >= cutoff
and all(
p.labels.get(k) == v
for k, v in (labels or {}).items()
)
]

if not points:
return {}

return {
'min': min(points),
'max': max(points),
'avg': statistics.mean(points),
'median': statistics.median(points),
'std_dev': statistics.stdev(points) if len(points) > 1 else 0,
'count': len(points)
}

async def _collection_loop(self):
"""Background metric collection loop"""
while True:
try:
# Store metrics in time series database
await self._store_metrics()

# Clean up old metrics
await self._cleanup_metrics()

except Exception as e:
logging.error(f"Error in metrics collection: {str(e)}")

await asyncio.sleep(self.collection_interval)

async def _store_metrics(self):
"""Store metrics in time series database"""
timestamp = datetime.utcnow()

for name, points in self.metrics.items():
if not points:
continue

# Group points by labels
grouped_points = {}
for point in points:
label_key = tuple(sorted(point.labels.items()))
if label_key not in grouped_points:
grouped_points[label_key] = []
grouped_points[label_key].append(point.value)

# Store aggregated values
for labels, values in grouped_points.items():
await self.tsdb.store_metric(
name=name,
timestamp=timestamp,
value=statistics.mean(values),
labels=dict(labels)
)

async def _cleanup_metrics(self):
"""Clean up old metric points"""
cutoff = datetime.utcnow() - timedelta(hours=1)

for name in self.metrics:
self.metrics[name] = [
p for p in self.metrics[name]
if p.timestamp >= cutoff
]

B. Performance Monitoring Service

from typing import Optional, Dict, Any
import time
import psutil
import os

class PerformanceMonitor:
"""System performance monitoring service"""

def __init__(
self,
metrics_collector: MetricsCollector
):
self.metrics = metrics_collector
self.process = psutil.Process(os.getpid())

async def collect_system_metrics(self):
"""Collect system performance metrics"""
# CPU metrics
await self.metrics.record_metric(
'system_cpu_percent',
psutil.cpu_percent(interval=None),
{'type': 'system'}
)

await self.metrics.record_metric(
'process_cpu_percent',
self.process.cpu_percent(interval=None),
{'type': 'process'}
)

# Memory metrics
mem = psutil.virtual_memory()
await self.metrics.record_metric(
'system_memory_usage',
mem.percent,
{'type': 'system'}
)

process_mem = self.process.memory_info()
await self.metrics.record_metric(
'process_memory_usage',
process_mem.rss / 1024 / 1024, # MB
{'type': 'process'}
)

# Disk metrics
disk = psutil.disk_usage('/')
await self.metrics.record_metric(
'disk_usage_percent',
disk.percent,
{'path': '/'}
)

# Network metrics
net_io = psutil.net_io_counters()
await self.metrics.record_metric(
'network_bytes_sent',
net_io.bytes_sent,
{'type': 'sent'}
)
await self.metrics.record_metric(
'network_bytes_recv',
net_io.bytes_recv,
{'type': 'received'}
)

C. Application Monitoring Service

from typing import Optional, Dict, Any, List
from datetime import datetime

class ApplicationMonitor:
"""Application-specific monitoring service"""

def __init__(
self,
metrics_collector: MetricsCollector
):
self.metrics = metrics_collector
self.request_times: List[float] = []

async def record_request(
self,
method: str,
path: str,
status_code: int,
duration: float,
user_id: Optional[str] = None
):
"""Record API request metrics"""
labels = {
'method': method,
'path': path,
'status': str(status_code)
}
if user_id:
labels['user_id'] = user_id

# Record request duration
await self.metrics.record_histogram(
'http_request_duration_seconds',
duration,
labels
)

# Increment request counter
await self.metrics.increment_counter(
'http_requests_total',
labels
)

# Record status code
await self.metrics.increment_counter(
'http_status_codes',
{'status': str(status_code)}
)

async def record_document_processing(
self,
doc_id: str,
processing_time: float,
status: str,
metadata: Optional[Dict[str, Any]] = None
):
"""Record document processing metrics"""
labels = {
'status': status,
'doc_id': doc_id
}

await self.metrics.record_histogram(
'document_processing_seconds',
processing_time,
labels
)

await self.metrics.increment_counter(
'documents_processed_total',
labels
)

if metadata:
await self.metrics.record_metric(
'document_size_bytes',
metadata.get('size', 0),
labels
)

D. Health Check Service

from typing import Dict, Any, List
from enum import Enum
import asyncio

class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"

class HealthCheck:
"""System health monitoring service"""

def __init__(
self,
metrics_collector: MetricsCollector
):
self.metrics = metrics_collector
self.checks: Dict[str, callable] = {}

def register_check(
self,
name: str,
check_func: callable
):
"""Register health check function"""
self.checks[name] = check_func

async def run_checks(self) -> Dict[str, Any]:
"""Run all health checks"""
results = {}
overall_status = HealthStatus.HEALTHY

for name, check_func in self.checks.items():
try:
status = await check_func()
results[name] = {
'status': status.value,
'timestamp': datetime.utcnow().isoformat()
}

# Update overall status
if status == HealthStatus.UNHEALTHY:
overall_status = HealthStatus.UNHEALTHY
elif status == HealthStatus.DEGRADED and overall_status != HealthStatus.UNHEALTHY:
overall_status = HealthStatus.DEGRADED

# Record metric
await self.metrics.record_metric(
'health_check_status',
1 if status == HealthStatus.HEALTHY else 0,
{'check': name}
)

except Exception as e:
results[name] = {
'status': HealthStatus.UNHEALTHY.value,
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}
overall_status = HealthStatus.UNHEALTHY

# Record error metric
await self.metrics.increment_counter(
'health_check_errors',
{'check': name}
)

return {
'status': overall_status.value,
'timestamp': datetime.utcnow().isoformat(),
'checks': results
}

E. Monitoring Dashboard Data Service

from typing import Dict, Any, List
from datetime import datetime, timedelta

class DashboardDataService:
"""Service for preparing monitoring dashboard data"""

def __init__(
self,
metrics_collector: MetricsCollector,
health_check: HealthCheck
):
self.metrics = metrics_collector
self.health_check = health_check

async def get_dashboard_data(
self,
time_window: timedelta = timedelta(hours=1)
) -> Dict[str, Any]:
"""Get aggregated data for dashboard"""
# Get health status
health_status = await self.health_check.run_checks()

# Get system metrics
system_metrics = {
'cpu': await self.metrics.get_metric_statistics(
'system_cpu_percent',
time_window,
{'type': 'system'}
),
'memory': await self.metrics.get_metric_statistics(
'system_memory_usage',
time_window,
{'type': 'system'}
),
'disk': await self.metrics.get_metric_statistics(
'disk_usage_percent',
time_window,
{'path': '/'}
)
}

# Get request metrics
request_metrics = {
'total': await self.metrics.get_metric_statistics(
'http_requests_total',
time_window
),
'duration': await self.metrics.get_metric_statistics(
'http_request_duration_seconds',
time_window
)
}

# Get document processing metrics
doc_metrics = {
'processed': await self.metrics.get_metric_statistics(
'documents_processed_total',
time_window
),
'processing_time': await self.metrics.get_metric_statistics(
'document_processing_seconds',
time_window
)
}

return {
'timestamp': datetime.utcnow().isoformat(),
'health_status': health_status,
'system_metrics': system_metrics,
'request_metrics': request_metrics,
'document_metrics': doc_metrics
}

Would you like me to:

  1. Add more monitoring components?
  2. Create detailed alert configurations?
  3. Add visualization components?
  4. Create specific monitoring scenarios?