Skip to main content

monitoring/metrics_collector.py

Status

Context

The current situation requires a decision because:

  • Requirement 1
  • Constraint 2
  • Need 3

Accepted | YYYY-MM-DD

monitoring/metrics_collector.py

from typing import Dict, Any, List, Optional import time import asyncio from datetime import datetime, timedelta import statistics from dataclasses import dataclass from enum import Enum import prometheus_client as prom

class MetricType(Enum): COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" SUMMARY = "summary"

@dataclass class MetricDefinition: name: str type: MetricType description: str labels: List[str] buckets: Optional[List[float]] = None # For histograms

class MetricsCollector: def init(self, db_connection): self.db = db_connection self.metrics: Dict[str, Any] = {} self._setup_metrics()

def _setup_metrics(self):
"""Initialize Prometheus metrics"""
# Document processing metrics
self.metrics['doc_processing_duration'] = prom.Histogram(
'doc_processing_duration_seconds',
'Time spent processing documents',
['status'],
buckets=[.01, .05, .1, .5, 1, 5, 10, 30, 60, 120]
)

self.metrics['chunk_count'] = prom.Counter(
'chunks_processed_total',
'Total number of chunks processed',
['status']
)

self.metrics['embedding_generation_duration'] = prom.Histogram(
'embedding_generation_duration_seconds',
'Time spent generating embeddings',
['status'],
buckets=[.01, .05, .1, .5, 1, 5, 10]
)

self.metrics['active_tasks'] = prom.Gauge(
'active_tasks',
'Number of currently active tasks',
['task_type']
)

self.metrics['queue_size'] = prom.Gauge(
'task_queue_size',
'Number of tasks in queue',
['task_type']
)

self.metrics['vector_search_duration'] = prom.Histogram(
'vector_search_duration_seconds',
'Time spent on vector search operations',
['status'],
buckets=[.001, .005, .01, .05, .1, .5, 1]
)

self.metrics['api_requests'] = prom.Counter(
'api_requests_total',
'Total number of API requests',
['endpoint', 'method', 'status']
)

self.metrics['db_connection_pool'] = prom.Gauge(
'db_pool_connections',
'Database connection pool statistics',
['state']
)

class PerformanceMonitor: def init(self, metrics_collector: MetricsCollector): self.metrics = metrics_collector self.performance_data = {}

async def record_operation_duration(
self,
operation: str,
duration: float,
status: str = 'success',
labels: Dict[str, str] = None
):
"""Record duration of an operation"""
if operation == 'doc_processing':
self.metrics.metrics['doc_processing_duration'].labels(
status=status
).observe(duration)
elif operation == 'embedding_generation':
self.metrics.metrics['embedding_generation_duration'].labels(
status=status
).observe(duration)
elif operation == 'vector_search':
self.metrics.metrics['vector_search_duration'].labels(
status=status
).observe(duration)

class SystemHealthMonitor: def init(self, db_connection, metrics_collector: MetricsCollector): self.db = db_connection self.metrics = metrics_collector

async def check_system_health(self) -> Dict[str, Any]:
"""Perform comprehensive system health check"""
return {
'database': await self._check_database_health(),
'task_queue': await self._check_task_queue_health(),
'vector_store': await self._check_vector_store_health(),
'processing_pipeline': await self._check_processing_pipeline_health()
}

async def _check_database_health(self) -> Dict[str, Any]:
"""Check database health and performance"""
async with self.db.transaction() as conn:
start_time = time.time()

# Check connection pool
pool_stats = await conn.fetchrow("""
SELECT count(*) as active_connections
FROM pg_stat_activity
WHERE state = 'active'
""")

# Check table statistics
table_stats = await conn.fetch("""
SELECT
relname as table_name,
n_live_tup as row_count,
n_dead_tup as dead_tuples
FROM pg_stat_user_tables
""")

# Check index usage
index_stats = await conn.fetch("""
SELECT
schemaname,
relname,
indexrelname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
""")

query_duration = time.time() - start_time

return {
'connection_pool': dict(pool_stats),
'table_statistics': [dict(stat) for stat in table_stats],
'index_statistics': [dict(stat) for stat in index_stats],
'query_duration': query_duration
}

async def _check_task_queue_health(self) -> Dict[str, Any]:
"""Check task queue health"""
async with self.db.transaction() as conn:
queue_stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_tasks,
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending,
COUNT(CASE WHEN status = 'processing' THEN 1 END) as processing,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed,
AVG(CASE
WHEN status = 'completed'
THEN EXTRACT(EPOCH FROM (completed_at - created_at))
END) as avg_processing_time
FROM processing_jobs
WHERE created_at > NOW() - INTERVAL '1 hour'
""")

return dict(queue_stats)

async def _check_vector_store_health(self) -> Dict[str, Any]:
"""Check vector store health"""
async with self.db.transaction() as conn:
vector_stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_vectors,
COUNT(CASE WHEN embedding IS NOT NULL THEN 1 END) as vectors_with_embedding,
pg_size_pretty(pg_total_relation_size('chunks')) as total_size
FROM chunks
""")

# Check vector index performance
start_time = time.time()
await conn.fetch("""
SELECT chunk_uuid
FROM chunks
WHERE embedding IS NOT NULL
ORDER BY embedding <=> array_fill(0, ARRAY[1536])::vector
LIMIT 1
""")
query_duration = time.time() - start_time

return {
**dict(vector_stats),
'vector_query_duration': query_duration
}

class AlertManager: def init(self): self.alert_thresholds = { 'processing_duration': 300, # 5 minutes 'queue_size': 1000, 'error_rate': 0.1, # 10% 'vector_query_duration': 1.0 # 1 second }

async def check_alerts(self, health_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Check for alert conditions"""
alerts = []

# Check processing duration
if health_data['task_queue']['avg_processing_time'] > self.alert_thresholds['processing_duration']:
alerts.append({
'level': 'warning',
'message': f"High average processing time: {health_data['task_queue']['avg_processing_time']}s",
'timestamp': datetime.utcnow()
})

# Check queue size
if health_data['task_queue']['pending'] > self.alert_thresholds['queue_size']:
alerts.append({
'level': 'warning',
'message': f"Large queue size: {health_data['task_queue']['pending']} pending tasks",
'timestamp': datetime.utcnow()
})

# Check vector query performance
if health_data['vector_store']['vector_query_duration'] > self.alert_thresholds['vector_query_duration']:
alerts.append({
'level': 'warning',
'message': f"Slow vector queries: {health_data['vector_store']['vector_query_duration']}s",
'timestamp': datetime.utcnow()
})

return alerts

class MonitoringService: def init( self, db_connection, metrics_collector: MetricsCollector, check_interval: int = 60 ): self.db = db_connection self.metrics = metrics_collector self.health_monitor = SystemHealthMonitor(db_connection, metrics_collector) self.performance_monitor = PerformanceMonitor(metrics_collector) self.alert_manager = AlertManager() self.check_interval = check_interval

async def start_monitoring(self):
"""Start the monitoring service"""
while True:
try:
# Collect health metrics
health_data = await self.health_monitor.check_system_health()

# Check for alerts
alerts = await self.alert_manager.check_alerts(health_data)

# Store metrics
await self._store_metrics(health_data)

# Handle alerts
if alerts:
await self._handle_alerts(alerts)

except Exception as e:
logging.error(f"Error in monitoring service: {str(e)}")

await asyncio.sleep(self.check_interval)

async def _store_metrics(self, health_data: Dict[str, Any]):
"""Store metrics in database"""
async with self.db.transaction() as conn:
await conn.execute("""
INSERT INTO system_metrics (
timestamp,
metric_data,
system_state
) VALUES ($1, $2, $3)
""", datetime.utcnow(), json.dumps(health_data),
'healthy' if not health_data.get('alerts') else 'warning')

async def _handle_alerts(self, alerts: List[Dict[str, Any]]):
"""Handle system alerts"""
for alert in alerts:
# Log alert
logging.warning(f"System alert: {alert['message']}")

# Store alert in database
async with self.db.transaction() as conn:
await conn.execute("""
INSERT INTO system_alerts (
level,
message,
timestamp
) VALUES ($1, $2, $3)
""", alert['level'], alert['message'], alert['timestamp'])