services/metrics/config.py
Status
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Accepted | YYYY-MM-DD
services/metrics/config.py
from typing import Dict, Any, List, Optional, TypedDict from datetime import timedelta import json import yaml from dataclasses import dataclass from enum import Enum import logging import os
class MetricType(Enum): COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" SUMMARY = "summary"
class CacheStrategy(Enum): NONE = "none" MINIMAL = "minimal" AGGRESSIVE = "aggressive" ADAPTIVE = "adaptive"
@dataclass class MetricConfig: """Configuration for individual metrics""" name: str type: MetricType description: str cache_strategy: CacheStrategy retention_period: timedelta aggregation_rules: List[str] alert_thresholds: Optional[Dict[str, float]] = None
class TimeWindowConfig(TypedDict): window_size: timedelta granularity: timedelta retention_period: timedelta cache_ttl: timedelta
class MetricsConfiguration: """Manages configuration for metrics collection and caching"""
def __init__(self, config_path: str = None):
self.logger = logging.getLogger(__name__)
self.config_path = config_path or os.getenv(
'METRICS_CONFIG_PATH',
'config/metrics.yaml'
)
self.metrics: Dict[str, MetricConfig] = {}
self.time_windows: Dict[str, TimeWindowConfig] = {}
self.cache_config: Dict[str, Any] = {}
self.alert_config: Dict[str, Any] = {}
self._load_configuration()
def _load_configuration(self):
"""Load configuration from YAML file"""
try:
with open(self.config_path, 'r') as f:
config = yaml.safe_load(f)
# Load time windows configuration
self.time_windows = {
name: TimeWindowConfig(
window_size=self._parse_duration(window['window_size']),
granularity=self._parse_duration(window['granularity']),
retention_period=self._parse_duration(window['retention_period']),
cache_ttl=self._parse_duration(window['cache_ttl'])
)
for name, window in config.get('time_windows', {}).items()
}
# Load metrics configuration
self.metrics = {
name: MetricConfig(
name=name,
type=MetricType(metric['type']),
description=metric.get('description', ''),
cache_strategy=CacheStrategy(metric.get('cache_strategy', 'minimal')),
retention_period=self._parse_duration(metric['retention_period']),
aggregation_rules=metric.get('aggregation_rules', []),
alert_thresholds=metric.get('alert_thresholds')
)
for name, metric in config.get('metrics', {}).items()
}
# Load cache configuration
self.cache_config = config.get('cache', {})
# Load alert configuration
self.alert_config = config.get('alerts', {})
except Exception as e:
self.logger.error(f"Error loading metrics configuration: {str(e)}")
raise
def _parse_duration(self, duration_str: str) -> timedelta:
"""Parse duration string into timedelta"""
unit_multipliers = {
's': 1,
'm': 60,
'h': 3600,
'd': 86400,
'w': 604800
}
try:
value = int(duration_str[:-1])
unit = duration_str[-1].lower()
if unit not in unit_multipliers:
raise ValueError(f"Invalid duration unit: {unit}")
return timedelta(seconds=value * unit_multipliers[unit])
except Exception as e:
raise ValueError(f"Invalid duration format: {duration_str}")
def get_metric_config(self, metric_name: str) -> Optional[MetricConfig]:
"""Get configuration for specific metric"""
return self.metrics.get(metric_name)
def get_time_window(self, window_name: str) -> Optional[TimeWindowConfig]:
"""Get configuration for specific time window"""
return self.time_windows.get(window_name)
def get_cache_config(self, metric_name: str) -> Dict[str, Any]:
"""Get cache configuration for specific metric"""
metric_config = self.get_metric_config(metric_name)
if not metric_config:
return self.cache_config.get('default', {})
strategy = metric_config.cache_strategy
return self.cache_config.get(strategy.value, {})
def get_alert_config(self, metric_name: str) -> Dict[str, Any]:
"""Get alert configuration for specific metric"""
metric_config = self.get_metric_config(metric_name)
if not metric_config or not metric_config.alert_thresholds:
return self.alert_config.get('default', {})
return metric_config.alert_thresholds
class DynamicConfiguration: """Handles dynamic configuration updates"""
def __init__(self, base_config: MetricsConfiguration):
self.base_config = base_config
self.overrides: Dict[str, Any] = {}
self.logger = logging.getLogger(__name__)
async def update_cache_strategy(
self,
metric_name: str,
strategy: CacheStrategy
):
"""Update cache strategy for specific metric"""
if metric_name not in self.base_config.metrics:
raise ValueError(f"Unknown metric: {metric_name}")
self.overrides[f"cache_strategy.{metric_name}"] = strategy.value
self.logger.info(
f"Updated cache strategy for {metric_name} to {strategy.value}"
)
async def update_time_window(
self,
window_name: str,
config: TimeWindowConfig
):
"""Update time window configuration"""
self.overrides[f"time_window.{window_name}"] = config
self.logger.info(f"Updated time window configuration for {window_name}")
async def update_alert_thresholds(
self,
metric_name: str,
thresholds: Dict[str, float]
):
"""Update alert thresholds for specific metric"""
if metric_name not in self.base_config.metrics:
raise ValueError(f"Unknown metric: {metric_name}")
self.overrides[f"alert_thresholds.{metric_name}"] = thresholds
self.logger.info(
f"Updated alert thresholds for {metric_name}"
)
def get_effective_config(
self,
metric_name: str
) -> Dict[str, Any]:
"""Get effective configuration including overrides"""
base_metric = self.base_config.get_metric_config(metric_name)
if not base_metric:
return {}
config = {
'name': base_metric.name,
'type': base_metric.type.value,
'description': base_metric.description,
'cache_strategy': base_metric.cache_strategy.value,
'retention_period': str(base_metric.retention_period),
'aggregation_rules': base_metric.aggregation_rules,
'alert_thresholds': base_metric.alert_thresholds
}
# Apply overrides
cache_override = self.overrides.get(f"cache_strategy.{metric_name}")
if cache_override:
config['cache_strategy'] = cache_override
alert_override = self.overrides.get(f"alert_thresholds.{metric_name}")
if alert_override:
config['alert_thresholds'] = alert_override
return config
Example configuration file (metrics.yaml)
""" time_windows: realtime: window_size: 5m granularity: 10s retention_period: 1h cache_ttl: 30s short_term: window_size: 1h granularity: 1m retention_period: 1d cache_ttl: 5m long_term: window_size: 24h granularity: 15m retention_period: 7d cache_ttl: 1h
metrics: document_processing_time: type: histogram description: Document processing duration cache_strategy: adaptive retention_period: 30d aggregation_rules: - avg - p95 - count alert_thresholds: warning: 5.0 critical: 10.0
vector_search_latency: type: histogram description: Vector search latency cache_strategy: aggressive retention_period: 7d aggregation_rules: - avg - p99 alert_thresholds: warning: 0.1 critical: 0.5
cache: default: max_size: 1000 ttl: 5m aggressive: max_size: 5000 ttl: 1m adaptive: initial_size: 1000 max_size: 10000 min_ttl: 30s max_ttl: 10m
alerts: default: warning_threshold: 0.8 critical_threshold: 0.9 notification_interval: 5m channels: - email - slack """