services/metrics/cache.py
Status
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Accepted | YYYY-MM-DD
services/metrics/cache.py
from typing import Dict, Any, List, Optional, Tuple from datetime import datetime, timedelta import asyncio import json from dataclasses import dataclass import logging import hashlib from collections import OrderedDict import time import weakref
@dataclass class CacheKey: """Represents a unique cache key for metrics data""" metric_type: str start_time: datetime end_time: datetime granularity: timedelta filters: Optional[Dict[str, Any]] = None
def __hash__(self):
"""Generate hash for cache key"""
key_str = f"{self.metric_type}:{self.start_time}:{self.end_time}:{self.granularity}"
if self.filters:
key_str += f":{json.dumps(self.filters, sort_keys=True)}"
return hashlib.sha256(key_str.encode()).hexdigest()
@dataclass class CacheEntry: """Represents a cached metrics entry""" data: Any created_at: datetime expires_at: datetime last_accessed: datetime access_count: int = 0
class MetricsCache: """LRU cache for metrics data with time-based invalidation"""
def __init__(
self,
max_size: int = 1000,
default_ttl: timedelta = timedelta(minutes=5)
):
self.max_size = max_size
self.default_ttl = default_ttl
self._cache: OrderedDict[str, CacheEntry] = OrderedDict()
self.lock = asyncio.Lock()
self.logger = logging.getLogger(__name__)
self.stats = {
'hits': 0,
'misses': 0,
'evictions': 0
}
# Start background cleanup task
self.cleanup_task = asyncio.create_task(self._cleanup_loop())
async def get(
self,
key: CacheKey,
default: Any = None
) -> Tuple[Any, bool]:
"""Get value from cache, returning (value, hit)"""
cache_key = hash(key)
async with self.lock:
entry = self._cache.get(cache_key)
if entry is None:
self.stats['misses'] += 1
return default, False
now = datetime.utcnow()
# Check if entry has expired
if now >= entry.expires_at:
del self._cache[cache_key]
self.stats['misses'] += 1
return default, False
# Update access statistics
entry.last_accessed = now
entry.access_count += 1
self._cache.move_to_end(cache_key)
self.stats['hits'] += 1
return entry.data, True
async def set(
self,
key: CacheKey,
value: Any,
ttl: Optional[timedelta] = None
) -> None:
"""Set value in cache with optional TTL"""
cache_key = hash(key)
now = datetime.utcnow()
ttl = ttl or self.default_ttl
entry = CacheEntry(
data=value,
created_at=now,
expires_at=now + ttl,
last_accessed=now
)
async with self.lock:
# Evict oldest entries if cache is full
while len(self._cache) >= self.max_size:
self._cache.popitem(last=False)
self.stats['evictions'] += 1
self._cache[cache_key] = entry
async def invalidate(
self,
pattern: Optional[str] = None
) -> int:
"""Invalidate cache entries matching pattern"""
async with self.lock:
if pattern is None:
count = len(self._cache)
self._cache.clear()
return count
keys_to_remove = [
k for k in self._cache.keys()
if pattern in k
]
for k in keys_to_remove:
del self._cache[k]
return len(keys_to_remove)
async def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
async with self.lock:
total_entries = len(self._cache)
total_requests = self.stats['hits'] + self.stats['misses']
hit_rate = (
self.stats['hits'] / total_requests
if total_requests > 0 else 0
)
return {
'size': total_entries,
'max_size': self.max_size,
'hit_rate': hit_rate,
'hits': self.stats['hits'],
'misses': self.stats['misses'],
'evictions': self.stats['evictions']
}
async def _cleanup_loop(self):
"""Background task to clean up expired entries"""
while True:
try:
await asyncio.sleep(60) # Run cleanup every minute
await self._cleanup_expired()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in cache cleanup: {str(e)}")
async def _cleanup_expired(self):
"""Remove expired cache entries"""
now = datetime.utcnow()
async with self.lock:
expired_keys = [
k for k, v in self._cache.items()
if now >= v.expires_at
]
for k in expired_keys:
del self._cache[k]
class MetricsCacheManager: """Manages different cache strategies for various metric types"""
def __init__(self):
# Different caches for different time windows
self.caches = {
'realtime': MetricsCache(
max_size=500,
default_ttl=timedelta(seconds=30)
),
'short_term': MetricsCache(
max_size=1000,
default_ttl=timedelta(minutes=5)
),
'long_term': MetricsCache(
max_size=200,
default_ttl=timedelta(hours=1)
)
}
self.logger = logging.getLogger(__name__)
def _get_cache_for_window(
self,
window_size: timedelta
) -> MetricsCache:
"""Get appropriate cache based on time window"""
if window_size <= timedelta(minutes=5):
return self.caches['realtime']
elif window_size <= timedelta(hours=1):
return self.caches['short_term']
else:
return self.caches['long_term']
async def get_metrics(
self,
metric_type: str,
start_time: datetime,
end_time: datetime,
granularity: timedelta,
filters: Optional[Dict[str, Any]] = None
) -> Tuple[Any, bool]:
"""Get metrics from appropriate cache"""
window_size = end_time - start_time
cache = self._get_cache_for_window(window_size)
key = CacheKey(
metric_type=metric_type,
start_time=start_time,
end_time=end_time,
granularity=granularity,
filters=filters
)
return await cache.get(key)
async def set_metrics(
self,
metric_type: str,
start_time: datetime,
end_time: datetime,
granularity: timedelta,
data: Any,
filters: Optional[Dict[str, Any]] = None
) -> None:
"""Store metrics in appropriate cache"""
window_size = end_time - start_time
cache = self._get_cache_for_window(window_size)
key = CacheKey(
metric_type=metric_type,
start_time=start_time,
end_time=end_time,
granularity=granularity,
filters=filters
)
await cache.set(key, data)
async def invalidate_metrics(
self,
metric_type: Optional[str] = None
) -> Dict[str, int]:
"""Invalidate metrics across all caches"""
results = {}
for cache_name, cache in self.caches.items():
count = await cache.invalidate(metric_type)
results[cache_name] = count
return results
async def get_cache_stats(self) -> Dict[str, Dict[str, Any]]:
"""Get statistics for all caches"""
stats = {}
for cache_name, cache in self.caches.items():
stats[cache_name] = await cache.get_stats()
return stats
class CacheWarmer: """Preemptively warms up cache for frequently accessed metrics"""
def __init__(
self,
cache_manager: MetricsCacheManager,
metrics_service: Any,
warm_up_interval: timedelta = timedelta(minutes=5)
):
self.cache_manager = cache_manager
self.metrics_service = metrics_service
self.warm_up_interval = warm_up_interval
self.logger = logging.getLogger(__name__)
# Track frequently accessed metrics
self.access_patterns = defaultdict(int)
self.last_warmup = datetime.utcnow()
# Start background warmup task
self.warmup_task = asyncio.create_task(self._warmup_loop())
async def record_access(
self,
metric_type: str,
window_size: timedelta,
granularity: timedelta,
filters: Optional[Dict[str, Any]] = None
):
"""Record metric access pattern"""
pattern = json.dumps({
'metric_type': metric_type,
'window_size': str(window_size),
'granularity': str(granularity),
'filters': filters
}, sort_keys=True)
self.access_patterns[pattern] += 1
async def _warmup_loop(self):
"""Background task to warm up cache"""
while True:
try:
await asyncio.sleep(self.warm_up_interval.total_seconds())
await self._perform_warmup()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in cache warmup: {str(e)}")
async def _perform_warmup(self):
"""Perform cache warmup for frequently accessed metrics"""
now = datetime.utcnow()
# Get top accessed patterns
top_patterns = sorted(
self.access_patterns.items(),
key=lambda x: x[1],
reverse=True
)[:10] # Warm up top 10 patterns
for pattern, _ in top_patterns:
try:
config = json.loads(pattern)
window_size = timedelta(
seconds=eval(config['window_size'].split()[0])
)
granularity = timedelta(
seconds=eval(config['granularity'].split()[0])
)
# Fetch and cache metrics
await self._warmup_metric(
config['metric_type'],
now - window_size,
now,
granularity,
config['filters']
)
except Exception as e:
self.logger.error(f"Error warming up pattern {pattern}: {str(e)}")
async def _warmup_metric(
self,
metric_type: str,
start_time: datetime,
end_time: datetime,
granularity: timedelta,
filters: Optional[Dict[str, Any]]
):
"""Warm up specific metric in cache"""
try:
data = await self.metrics_service.get_metrics(
metric_type,
start_time,
end_time,
granularity,
filters
)
await self.cache_manager.set_metrics(
metric_type,
start_time,
end_time,
granularity,
data,
filters
)
self.logger.debug(
f"Warmed up cache for {metric_type} "
f"from {start_time} to {end_time}"
)
except Exception as e:
self.logger.error(f"Error warming up metric {metric_type}: {str(e)}")