services/metrics/aggregator.py
Status
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Accepted | YYYY-MM-DD
services/metrics/aggregator.py
from typing import Dict, Any, List, Optional import asyncio from datetime import datetime, timedelta import json from collections import defaultdict import numpy as np from dataclasses import dataclass import logging
@dataclass class MetricWindow: """Represents a time window for metric aggregation""" window_size: timedelta granularity: timedelta max_bins: int
@property
def bin_size(self) -> timedelta:
return self.window_size / self.max_bins
class MetricsAggregator: """Handles collection and aggregation of system metrics"""
def __init__(self, db_connection):
self.db = db_connection
self.logger = logging.getLogger(__name__)
self.windows = {
'5m': MetricWindow(
window_size=timedelta(minutes=5),
granularity=timedelta(seconds=10),
max_bins=30
),
'1h': MetricWindow(
window_size=timedelta(hours=1),
granularity=timedelta(minutes=1),
max_bins=60
),
'24h': MetricWindow(
window_size=timedelta(hours=24),
granularity=timedelta(minutes=15),
max_bins=96
)
}
self.metric_buffers = defaultdict(list)
self.buffer_lock = asyncio.Lock()
async def start(self):
"""Start the metrics aggregation service"""
self.logger.info("Starting metrics aggregation service")
self.aggregation_task = asyncio.create_task(self._run_aggregation())
self.cleanup_task = asyncio.create_task(self._run_cleanup())
async def stop(self):
"""Stop the metrics aggregation service"""
self.logger.info("Stopping metrics aggregation service")
self.aggregation_task.cancel()
self.cleanup_task.cancel()
try:
await self.aggregation_task
await self.cleanup_task
except asyncio.CancelledError:
pass
async def collect_metric(
self,
metric_name: str,
value: float,
tags: Dict[str, str] = None,
timestamp: Optional[datetime] = None
):
"""Collect a new metric value"""
timestamp = timestamp or datetime.utcnow()
async with self.buffer_lock:
self.metric_buffers[metric_name].append({
'timestamp': timestamp,
'value': value,
'tags': tags or {}
})
async def _run_aggregation(self):
"""Run continuous metric aggregation"""
while True:
try:
await self._aggregate_metrics()
await asyncio.sleep(10) # Aggregate every 10 seconds
except Exception as e:
self.logger.error(f"Error in metric aggregation: {str(e)}")
await asyncio.sleep(1)
async def _run_cleanup(self):
"""Run periodic cleanup of old metrics"""
while True:
try:
await self._cleanup_old_metrics()
await asyncio.sleep(3600) # Clean up every hour
except Exception as e:
self.logger.error(f"Error in metric cleanup: {str(e)}")
await asyncio.sleep(60)
async def _aggregate_metrics(self):
"""Aggregate metrics across different time windows"""
now = datetime.utcnow()
async with self.buffer_lock:
# Process each metric type
for metric_name, values in self.metric_buffers.items():
if not values:
continue
# Group by tags
tag_groups = defaultdict(list)
for value in values:
tag_key = frozenset(value['tags'].items())
tag_groups[tag_key].append(value)
# Process each tag group
for tag_key, group_values in tag_groups.items():
tags = dict(tag_key)
# Aggregate for each time window
for window_name, window in self.windows.items():
await self._aggregate_window(
metric_name,
group_values,
window,
tags,
now
)
# Clear processed values
self.metric_buffers[metric_name] = []
async def _aggregate_window(
self,
metric_name: str,
values: List[Dict[str, Any]],
window: MetricWindow,
tags: Dict[str, str],
now: datetime
):
"""Aggregate metrics for a specific time window"""
window_start = now - window.window_size
# Filter values within window
window_values = [
v for v in values
if v['timestamp'] >= window_start
]
if not window_values:
return
# Create time bins
bins = []
current_time = window_start
while current_time <= now:
bin_end = current_time + window.granularity
bin_values = [
v['value'] for v in window_values
if current_time <= v['timestamp'] < bin_end
]
if bin_values:
bins.append({
'timestamp': current_time,
'min': np.min(bin_values),
'max': np.max(bin_values),
'avg': np.mean(bin_values),
'count': len(bin_values)
})
current_time = bin_end
# Store aggregated values
async with self.db.transaction() as conn:
await conn.execute("""
INSERT INTO metric_aggregates (
metric_name,
window_size,
timestamp,
tags,
min_value,
max_value,
avg_value,
count
)
SELECT
$1, $2, timestamp, $3,
min_value, max_value, avg_value, count
FROM unnest($4::timestamptz[], $5::float[], $6::float[],
$7::float[], $8::int[])
AS t(timestamp, min_value, max_value, avg_value, count)
""", metric_name, window_name, json.dumps(tags),
[b['timestamp'] for b in bins],
[b['min'] for b in bins],
[b['max'] for b in bins],
[b['avg'] for b in bins],
[b['count'] for b in bins])
async def _cleanup_old_metrics(self):
"""Clean up old metrics from the database"""
async with self.db.transaction() as conn:
# Define retention periods for different windows
retention = {
'5m': timedelta(hours=1),
'1h': timedelta(days=1),
'24h': timedelta(days=7)
}
for window_name, retention_period in retention.items():
cutoff = datetime.utcnow() - retention_period
await conn.execute("""
DELETE FROM metric_aggregates
WHERE window_size = $1
AND timestamp < $2
""", window_name, cutoff)
async def get_metrics(
self,
metric_name: str,
window_size: str,
tags: Dict[str, str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""Retrieve aggregated metrics"""
if window_size not in self.windows:
raise ValueError(f"Invalid window size: {window_size}")
end_time = end_time or datetime.utcnow()
start_time = start_time or (end_time - self.windows[window_size].window_size)
# Build query conditions
conditions = ["metric_name = $1", "window_size = $2"]
params = [metric_name, window_size]
if tags:
conditions.append("tags @> $3")
params.append(json.dumps(tags))
conditions.extend([
"timestamp >= $" + str(len(params) + 1),
"timestamp <= $" + str(len(params) + 2)
])
params.extend([start_time, end_time])
# Execute query
async with self.db.transaction() as conn:
rows = await conn.fetch(f"""
SELECT
timestamp,
min_value,
max_value,
avg_value,
count,
tags
FROM metric_aggregates
WHERE {' AND '.join(conditions)}
ORDER BY timestamp ASC
""", *params)
return [dict(row) for row in rows]
services/metrics/collector.py
class MetricCollector: """Collects and forwards metrics to the aggregator"""
def __init__(self, aggregator: MetricsAggregator):
self.aggregator = aggregator
self.logger = logging.getLogger(__name__)
async def record_processing_time(
self,
operation: str,
duration: float,
status: str = 'success',
additional_tags: Dict[str, str] = None
):
"""Record processing time for an operation"""
tags = {'operation': operation, 'status': status}
if additional_tags:
tags.update(additional_tags)
await self.aggregator.collect_metric(
'processing_time',
duration,
tags
)
async def record_queue_size(
self,
queue_name: str,
size: int,
additional_tags: Dict[str, str] = None
):
"""Record queue size"""
tags = {'queue': queue_name}
if additional_tags:
tags.update(additional_tags)
await self.aggregator.collect_metric(
'queue_size',
size,
tags
)
async def record_error(
self,
error_type: str,
context: str,
additional_tags: Dict[str, str] = None
):
"""Record error occurrence"""
tags = {'error_type': error_type, 'context': context}
if additional_tags:
tags.update(additional_tags)
await self.aggregator.collect_metric(
'error_count',
1,
tags
)
async def record_vector_operation(
self,
operation: str,
duration: float,
vector_count: int,
additional_tags: Dict[str, str] = None
):
"""Record vector operation metrics"""
tags = {'operation': operation}
if additional_tags:
tags.update(additional_tags)
# Record duration
await self.aggregator.collect_metric(
'vector_operation_time',
duration,
tags
)
# Record vector count
await self.aggregator.collect_metric(
'vector_count',
vector_count,
tags
)
async def record_system_metrics(
self,
metrics: Dict[str, float],
additional_tags: Dict[str, str] = None
):
"""Record system-level metrics"""
tags = additional_tags or {}
for metric_name, value in metrics.items():
await self.aggregator.collect_metric(
f'system_{metric_name}',
value,
tags
)