Skip to main content

services/metrics/query_service.py

Status

Context

The current situation requires a decision because:

  • Requirement 1
  • Constraint 2
  • Need 3

Accepted | YYYY-MM-DD

services/metrics/query_service.py

from typing import Dict, Any, List, Optional, Tuple from datetime import datetime, timedelta import asyncio from dataclasses import dataclass import json import logging from collections import defaultdict import numpy as np

@dataclass class QueryWindow: """Defines a time window for metric queries""" start_time: datetime end_time: datetime interval: timedelta

@property
def duration(self) -> timedelta:
return self.end_time - self.start_time

@property
def points(self) -> int:
return int(self.duration / self.interval)

class MetricsQueryService: """Service for querying and processing metrics data"""

def __init__(self, db_connection):
self.db = db_connection
self.logger = logging.getLogger(__name__)
self.cache = {}
self.cache_lock = asyncio.Lock()

async def get_system_metrics(
self,
window: QueryWindow,
metric_names: Optional[List[str]] = None
) -> Dict[str, List[Dict[str, Any]]]:
"""Get system metrics for the specified time window"""
try:
query = """
SELECT
time_bucket($1, timestamp) as bucket,
metric_name,
jsonb_agg(metrics ORDER BY timestamp) as metrics
FROM metrics.system_metrics
WHERE timestamp BETWEEN $2 AND $3
"""

params = [window.interval, window.start_time, window.end_time]

if metric_names:
query += " AND metric_name = ANY($4)"
params.append(metric_names)

query += """
GROUP BY bucket, metric_name
ORDER BY bucket DESC
"""

async with self.db.transaction() as conn:
rows = await conn.fetch(query, *params)

return self._process_system_metrics(rows)
except Exception as e:
self.logger.error(f"Error fetching system metrics: {str(e)}")
raise

async def get_pipeline_metrics(
self,
window: QueryWindow,
pipeline_ids: Optional[List[str]] = None
) -> Dict[str, List[Dict[str, Any]]]:
"""Get pipeline processing metrics"""
try:
query = """
WITH pipeline_stats AS (
SELECT
time_bucket($1, timestamp) as bucket,
pipeline_id,
stage,
AVG(duration_ms) as avg_duration,
SUM(error_count) as errors,
COUNT(*) as total_operations,
jsonb_object_agg(
stage,
jsonb_build_object(
'duration', AVG(duration_ms),
'errors', SUM(error_count),
'input_size', AVG(input_size),
'output_size', AVG(output_size)
)
) as stage_metrics
FROM metrics.pipeline_metrics
WHERE timestamp BETWEEN $2 AND $3
"""

params = [window.interval, window.start_time, window.end_time]

if pipeline_ids:
query += " AND pipeline_id = ANY($4)"
params.append(pipeline_ids)

query += """
GROUP BY bucket, pipeline_id, stage
)
SELECT
bucket,
jsonb_object_agg(
pipeline_id::text,
jsonb_build_object(
'stages', stage_metrics,
'total_duration', avg_duration,
'error_count', errors,
'operations', total_operations
)
) as pipeline_data
FROM pipeline_stats
GROUP BY bucket
ORDER BY bucket DESC
"""

async with self.db.transaction() as conn:
rows = await conn.fetch(query, *params)

return self._process_pipeline_metrics(rows)
except Exception as e:
self.logger.error(f"Error fetching pipeline metrics: {str(e)}")
raise

async def get_vector_metrics(
self,
window: QueryWindow,
operations: Optional[List[str]] = None
) -> Dict[str, List[Dict[str, Any]]]:
"""Get vector store performance metrics"""
try:
query = """
SELECT
time_bucket($1, timestamp) as bucket,
operation,
AVG(duration_ms) as avg_duration,
SUM(vector_count) as total_vectors,
MAX(index_size) as index_size,
jsonb_agg(metadata) as operation_metadata
FROM metrics.vector_metrics
WHERE timestamp BETWEEN $2 AND $3
"""

params = [window.interval, window.start_time, window.end_time]

if operations:
query += " AND operation = ANY($4)"
params.append(operations)

query += """
GROUP BY bucket, operation
ORDER BY bucket DESC, operation
"""

async with self.db.transaction() as conn:
rows = await conn.fetch(query, *params)

return self._process_vector_metrics(rows)
except Exception as e:
self.logger.error(f"Error fetching vector metrics: {str(e)}")
raise

async def get_aggregated_metrics(
self,
metric_names: List[str],
window: QueryWindow,
tags: Optional[Dict[str, str]] = None
) -> Dict[str, List[Dict[str, Any]]]:
"""Get aggregated metrics with optional tag filtering"""
try:
# Determine appropriate aggregation table based on window size
table_name = self._get_aggregation_table(window.duration)

query = f"""
SELECT
metric_name,
bucket,
min_value,
max_value,
avg_value,
sum_value,
count,
p95_value
FROM metrics.{table_name}
WHERE metric_name = ANY($1)
AND bucket BETWEEN $2 AND $3
"""

params = [metric_names, window.start_time, window.end_time]

if tags:
query += " AND tags @> $4"
params.append(json.dumps(tags))

query += " ORDER BY bucket DESC"

async with self.db.transaction() as conn:
rows = await conn.fetch(query, *params)

return self._process_aggregated_metrics(rows)
except Exception as e:
self.logger.error(f"Error fetching aggregated metrics: {str(e)}")
raise

async def get_error_metrics(
self,
window: QueryWindow
) -> List[Dict[str, Any]]:
"""Get error metrics and statistics"""
try:
async with self.db.transaction() as conn:
rows = await conn.fetch("""
SELECT
time_bucket($1, timestamp) as bucket,
COUNT(*) as error_count,
jsonb_agg(DISTINCT metadata) as error_details
FROM metrics.pipeline_metrics
WHERE timestamp BETWEEN $2 AND $3
AND error_count > 0
GROUP BY bucket
ORDER BY bucket DESC
""", window.interval, window.start_time, window.end_time)

return self._process_error_metrics(rows)
except Exception as e:
self.logger.error(f"Error fetching error metrics: {str(e)}")
raise

def _get_aggregation_table(self, duration: timedelta) -> str:
"""Determine appropriate aggregation table based on duration"""
if duration <= timedelta(hours=1):
return "aggregate_5m"
elif duration <= timedelta(days=1):
return "aggregate_1h"
else:
return "aggregate_24h"

def _process_system_metrics(
self,
rows: List[Any]
) -> Dict[str, List[Dict[str, Any]]]:
"""Process system metrics rows into structured data"""
result = defaultdict(list)

for row in rows:
bucket = row['bucket']
metric_name = row['metric_name']
metrics = row['metrics']

# Calculate statistics for the bucket
values = [float(m[metric_name]) for m in metrics]

result[metric_name].append({
'timestamp': bucket,
'min': np.min(values),
'max': np.max(values),
'avg': np.mean(values),
'count': len(values)
})

return dict(result)

def _process_pipeline_metrics(
self,
rows: List[Any]
) -> Dict[str, List[Dict[str, Any]]]:
"""Process pipeline metrics rows into structured data"""
result = defaultdict(list)

for row in rows:
bucket = row['bucket']
pipeline_data = row['pipeline_data']

for pipeline_id, data in pipeline_data.items():
result[pipeline_id].append({
'timestamp': bucket,
'stages': data['stages'],
'total_duration': data['total_duration'],
'error_count': data['error_count'],
'operations': data['operations']
})

return dict(result)

def _process_vector_metrics(
self,
rows: List[Any]
) -> Dict[str, List[Dict[str, Any]]]:
"""Process vector metrics rows into structured data"""
result = defaultdict(list)

for row in rows:
operation = row['operation']
result[operation].append({
'timestamp': row['bucket'],
'duration': row['avg_duration'],
'vectors': row['total_vectors'],
'index_size': row['index_size'],
'metadata': self._aggregate_metadata(row['operation_metadata'])
})

return dict(result)

def _process_aggregated_metrics(
self,
rows: List[Any]
) -> Dict[str, List[Dict[str, Any]]]:
"""Process aggregated metrics rows into structured data"""
result = defaultdict(list)

for row in rows:
metric_name = row['metric_name']
result[metric_name].append({
'timestamp': row['bucket'],
'min': row['min_value'],
'max': row['max_value'],
'avg': row['avg_value'],
'sum': row['sum_value'],
'count': row['count'],
'p95': row['p95_value']
})

return dict(result)

def _process_error_metrics(
self,
rows: List[Any]
) -> List[Dict[str, Any]]:
"""Process error metrics rows into structured data"""
return [{
'timestamp': row['bucket'],
'count': row['error_count'],
'details': self._aggregate_error_details(row['error_details'])
} for row in rows]

def _aggregate_metadata(
self,
metadata_list: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Aggregate metadata from multiple records"""
aggregated = defaultdict(list)

for metadata in metadata_list:
for key, value in metadata.items():
aggregated[key].append(value)

return {
key: self._summarize_values(values)
for key, values in aggregated.items()
}

def _aggregate_error_details(
self,
error_details: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Aggregate error details into summary statistics"""
error_types = defaultdict(int)
error_contexts = defaultdict(int)

for detail in error_details:
error_types[detail.get('error_type', 'unknown')] += 1
error_contexts[detail.get('context', 'unknown')] += 1

return {
'types': dict(error_types),
'contexts': dict(error_contexts),
'total': len(error_details)
}

def _summarize_values(self, values: List[Any]) -> Any:
"""Summarize a list of values based on their type"""
if not values:
return None

if isinstance(values[0], (int, float)):
return {
'min': np.min(values),
'max': np.max(values),
'avg': np.mean(values),
'count': len(values)
}
elif isinstance(values[0], str):
return list(set(values))
else:
return values[0] # Return first value for other types