api/routes/monitoring.py
Status
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Accepted | YYYY-MM-DD
api/routes/monitoring.py
from fastapi import APIRouter, HTTPException, Depends from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import asyncio from prometheus_client import generate_latest, CONTENT_TYPE_LATEST from fastapi.responses import Response from fastapi.security import APIKeyHeader
router = APIRouter(prefix="/monitoring", tags=["monitoring"])
Security scheme for monitoring endpoints
API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)
class MonitoringAPI: def init( self, monitoring_service: MonitoringService, metrics_collector: MetricsCollector ): self.monitoring = monitoring_service self.metrics = metrics_collector
async def verify_api_key(self, api_key: str = Depends(API_KEY_HEADER)) -> bool:
"""Verify monitoring API key"""
if not api_key or api_key != os.getenv("MONITORING_API_KEY"):
raise HTTPException(
status_code=403,
detail="Invalid or missing API key"
)
return True
@router.get("/health")
async def get_system_health(
self,
_: bool = Depends(verify_api_key)
) -> Dict[str, Any]:
"""Get current system health status"""
try:
health_data = await self.monitoring.health_monitor.check_system_health()
return {
"status": "success",
"timestamp": datetime.utcnow().isoformat(),
"data": health_data
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error checking system health: {str(e)}"
)
@router.get("/metrics")
async def get_prometheus_metrics(
self,
_: bool = Depends(verify_api_key)
):
"""Expose Prometheus metrics"""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
@router.get("/performance")
async def get_performance_metrics(
self,
time_range: str = "1h",
_: bool = Depends(verify_api_key)
) -> Dict[str, Any]:
"""Get system performance metrics"""
try:
# Parse time range
duration = self._parse_time_range(time_range)
start_time = datetime.utcnow() - duration
async with self.monitoring.db.transaction() as conn:
metrics = await conn.fetch("""
SELECT
timestamp,
metric_data,
system_state
FROM system_metrics
WHERE timestamp >= $1
ORDER BY timestamp DESC
""", start_time)
return {
"status": "success",
"time_range": time_range,
"metrics": [dict(m) for m in metrics]
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error fetching performance metrics: {str(e)}"
)
@router.get("/alerts")
async def get_alerts(
self,
time_range: str = "24h",
min_level: str = "warning",
_: bool = Depends(verify_api_key)
) -> Dict[str, Any]:
"""Get system alerts"""
try:
duration = self._parse_time_range(time_range)
start_time = datetime.utcnow() - duration
async with self.monitoring.db.transaction() as conn:
alerts = await conn.fetch("""
SELECT *
FROM system_alerts
WHERE
timestamp >= $1
AND level >= $2
ORDER BY timestamp DESC
""", start_time, min_level)
return {
"status": "success",
"time_range": time_range,
"alerts": [dict(a) for a in alerts]
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error fetching alerts: {str(e)}"
)
@router.get("/pipeline/status")
async def get_pipeline_status(
self,
_: bool = Depends(verify_api_key)
) -> Dict[str, Any]:
"""Get document processing pipeline status"""
try:
async with self.monitoring.db.transaction() as conn:
stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_documents,
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending,
COUNT(CASE WHEN status = 'processing' THEN 1 END) as processing,
COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed
FROM documents
WHERE created_at >= NOW() - INTERVAL '24 hours'
""")
pipeline_metrics = await conn.fetchrow("""
SELECT
AVG(EXTRACT(EPOCH FROM (completed_at - created_at))) as avg_processing_time,
PERCENTILE_CONT(0.95) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (completed_at - created_at))
) as p95_processing_time,
COUNT(*) as total_jobs,
COUNT(CASE WHEN status = 'failed' THEN 1 END)::float /
NULLIF(COUNT(*), 0) as error_rate
FROM processing_jobs
WHERE created_at >= NOW() - INTERVAL '24 hours'
""")
return {
"status": "success",
"document_stats": dict(stats),
"pipeline_metrics": dict(pipeline_metrics)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error fetching pipeline status: {str(e)}"
)
@router.get("/vector/status")
async def get_vector_store_status(
self,
_: bool = Depends(verify_api_key)
) -> Dict[str, Any]:
"""Get vector store status and performance metrics"""
try:
async with self.monitoring.db.transaction() as conn:
stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_vectors,
COUNT(CASE WHEN embedding IS NOT NULL THEN 1 END) as indexed_vectors,
pg_size_pretty(pg_total_relation_size('chunks')) as total_size,
pg_size_pretty(pg_indexes_size('chunks')) as index_size
FROM chunks
""")
performance = await conn.fetchrow("""
SELECT
AVG(EXTRACT(EPOCH FROM (completed_at - created_at))) as avg_embedding_time,
COUNT(*) as total_operations,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed_operations
FROM processing_jobs
WHERE
job_type = 'embedding_generation'
AND created_at >= NOW() - INTERVAL '1 hour'
""")
return {
"status": "success",
"store_stats": dict(stats),
"performance_metrics": dict(performance)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error fetching vector store status: {str(e)}"
)
def _parse_time_range(self, time_range: str) -> timedelta:
"""Parse time range string into timedelta"""
unit = time_range[-1].lower()
value = int(time_range[:-1])
if unit == 'h':
return timedelta(hours=value)
elif unit == 'd':
return timedelta(days=value)
elif unit == 'w':
return timedelta(weeks=value)
else:
raise ValueError(f"Invalid time range format: {time_range}")
api/routes/dashboard.py
@router.get("/dashboard/summary") async def get_dashboard_summary( _: bool = Depends(verify_api_key) ) -> Dict[str, Any]: """Get summary statistics for dashboard""" try: async with self.monitoring.db.transaction() as conn: # Get overall system health health = await self.monitoring.health_monitor.check_system_health()
# Get recent alerts
alerts = await conn.fetch("""
SELECT *
FROM system_alerts
WHERE timestamp >= NOW() - INTERVAL '24 hours'
ORDER BY timestamp DESC
LIMIT 5
""")
# Get processing statistics
stats = await conn.fetchrow("""
SELECT
COUNT(*) as total_documents,
COUNT(CASE WHEN status = 'completed' THEN 1 END)::float /
NULLIF(COUNT(*), 0) * 100 as success_rate,
AVG(EXTRACT(EPOCH FROM (completed_at - created_at)))
as avg_processing_time
FROM documents
WHERE created_at >= NOW() - INTERVAL '24 hours'
""")
# Get system load metrics
load = await conn.fetchrow("""
SELECT
(SELECT count(*) FROM pg_stat_activity) as db_connections,
(SELECT count(*) FROM processing_jobs
WHERE status = 'processing') as active_jobs,
(SELECT count(*) FROM processing_jobs
WHERE status = 'pending') as queued_jobs
""")
return {
"status": "success",
"timestamp": datetime.utcnow().isoformat(),
"health": health,
"recent_alerts": [dict(a) for a in alerts],
"processing_stats": dict(stats),
"system_load": dict(load)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error fetching dashboard summary: {str(e)}"
)