Skip to main content

Specialized Implementations and Guides - Continued

5. Metrics Collection System

A. Prometheus Integration

1. Prometheus Client Implementation
Description: Custom metrics collection with Prometheus
URL: https://prometheus.io/docs/practices/instrumentation/
Support: Metrics collection

Implementation Example:
```python
from prometheus_client import Counter, Histogram, Gauge
from functools import wraps
import time

class MetricsCollector:
def __init__(self):
# Define metrics
self.document_counter = Counter(
'document_processed_total',
'Number of documents processed',
['status']
)

self.processing_time = Histogram(
'document_processing_seconds',
'Time spent processing documents',
['operation'],
buckets=(0.1, 0.5, 1, 2.5, 5, 10, 30)
)

self.active_processes = Gauge(
'active_processing_jobs',
'Number of active processing jobs'
)

def track_processing_time(self, operation: str):
"""Decorator for tracking operation duration"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
self.active_processes.inc()
result = await func(*args, **kwargs)
self.document_counter.labels(
status='success'
).inc()
return result
except Exception as e:
self.document_counter.labels(
status='error'
).inc()
raise
finally:
duration = time.time() - start_time
self.processing_time.labels(
operation=operation
).observe(duration)
self.active_processes.dec()
return wrapper
return decorator

B. Time Series Management

1. TimescaleDB Integration Guide
Description: Efficient time series data management
URL: https://docs.timescale.com/latest/getting-started/compression/
Support: Time series storage

Implementation Example:
```sql
-- Create hypertable for metrics
CREATE TABLE metrics (
time TIMESTAMPTZ NOT NULL,
metric_name TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
labels JSONB DEFAULT '{}'
);

SELECT create_hypertable('metrics', 'time');

-- Enable compression
ALTER TABLE metrics SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'metric_name'
);

-- Create continuous aggregates
CREATE MATERIALIZED VIEW metrics_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) as bucket,
metric_name,
labels,
avg(value) as avg_value,
min(value) as min_value,
max(value) as max_value,
count(*) as sample_count
FROM metrics
GROUP BY bucket, metric_name, labels;

Context

The current situation requires a decision because:

  • Requirement 1
  • Constraint 2
  • Need 3

Status

Accepted | YYYY-MM-DD

6. Background Task Processing

A. Celery Implementation

1. Celery Task Management
Description: Distributed task processing patterns
URL: https://docs.celeryq.dev/en/stable/userguide/tasks.html
Support: Background processing

Implementation Example:
```python
from celery import Celery
from celery.schedules import crontab
from typing import Optional

class TaskProcessor:
def __init__(self, app_name: str, broker_url: str):
self.celery = Celery(
app_name,
broker=broker_url,
backend=broker_url
)

self.configure()

def configure(self):
self.celery.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=3600,
worker_prefetch_multiplier=1,
task_acks_late=True
)

# Configure periodic tasks
self.celery.conf.beat_schedule = {
'cleanup-old-documents': {
'task': 'tasks.cleanup_documents',
'schedule': crontab(hour=3, minute=0)
},
'update-search-index': {
'task': 'tasks.update_search_index',
'schedule': crontab(minute='*/15')
}
}

def register_task(
self,
retry_limit: int = 3,
retry_delay: int = 300
):
"""Task registration decorator"""
def decorator(func):
@self.celery.task(
bind=True,
max_retries=retry_limit,
default_retry_delay=retry_delay
)
def wrapped_task(self, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
self.retry(exc=exc)
return wrapped_task
return decorator

B. Error Handling and Recovery

1. Task Recovery Patterns
Description: Robust error handling in distributed systems
URL: https://microservices.io/patterns/reliability/circuit-breaker.html
Support: Error management

Implementation Example:
```python
class TaskRecovery:
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries

async def execute_with_recovery(
self,
task_func: Callable,
fallback_func: Optional[Callable] = None,
*args,
**kwargs
) -> Any:
retries = 0
last_exception = None

while retries < self.max_retries:
try:
return await task_func(*args, **kwargs)
except Exception as e:
last_exception = e
retries += 1

# Exponential backoff
await asyncio.sleep(2 ** retries)

# All retries failed
if fallback_func:
return await fallback_func(*args, **kwargs)

raise last_exception

7. API Rate Limiting

A. Rate Limiter Implementation

1. Token Bucket Algorithm
Description: Rate limiting implementation guide
URL: https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm
Support: API protection

Implementation Example:
```python
class TokenBucketLimiter:
def __init__(
self,
redis_client: Redis,
rate: int,
capacity: int
):
self.redis = redis_client
self.rate = rate # tokens per second
self.capacity = capacity

async def check_rate_limit(
self,
key: str
) -> Tuple[bool, float]:
"""
Check if request can be processed
Returns: (allowed, wait_time)
"""
now = time.time()
key = f"ratelimit:{key}"

# Get current bucket state
tokens, last_update = await self.redis.hmget(
key,
['tokens', 'last_update']
)

if not tokens:
# Initialize bucket
await self.redis.hmset(key, {
'tokens': self.capacity,
'last_update': now
})
return True, 0

# Calculate token refill
tokens = float(tokens)
last_update = float(last_update)
time_passed = now - last_update
refill = time_passed * self.rate
current_tokens = min(
self.capacity,
tokens + refill
)

if current_tokens >= 1:
# Request allowed
await self.redis.hmset(key, {
'tokens': current_tokens - 1,
'last_update': now
})
return True, 0
else:
# Calculate wait time
wait_time = (1 - current_tokens) / self.rate
return False, wait_time

Would you like me to continue with:

  1. More component implementations?
  2. Additional integration patterns?
  3. Security implementations?
  4. Monitoring implementations?