Comprehensive Implementation Patterns
8. Security Implementations
A. JWT Authentication Service
from datetime import datetime, timedelta
from jose import JWTError, jwt
from typing import Optional
import secrets
class JWTAuthService:
"""JWT-based authentication service with refresh token support"""
def __init__(
self,
secret_key: str,
algorithm: str = "HS256",
access_token_expire: int = 15, # minutes
refresh_token_expire: int = 7 # days
):
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expire = timedelta(minutes=access_token_expire)
self.refresh_token_expire = timedelta(days=refresh_token_expire)
async def create_tokens(
self,
user_id: str,
scopes: List[str]
) -> Dict[str, str]:
"""Create access and refresh tokens"""
# Access token data
access_data = {
"sub": user_id,
"scopes": scopes,
"exp": datetime.utcnow() + self.access_token_expire,
"type": "access"
}
# Refresh token data
refresh_data = {
"sub": user_id,
"exp": datetime.utcnow() + self.refresh_token_expire,
"type": "refresh",
"jti": secrets.token_urlsafe(32)
}
return {
"access_token": jwt.encode(
access_data,
self.secret_key,
algorithm=self.algorithm
),
"refresh_token": jwt.encode(
refresh_data,
self.secret_key,
algorithm=self.algorithm
)
}
async def verify_token(
self,
token: str,
token_type: str = "access"
) -> Optional[Dict[str, Any]]:
"""Verify and decode token"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
# Verify token type
if payload.get("type") != token_type:
return None
return payload
except JWTError:
return None
B. API Security Middleware
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer
from typing import Optional, List
class SecurityMiddleware:
"""API security middleware with rate limiting and auth"""
def __init__(
self,
auth_service: JWTAuthService,
rate_limiter: TokenBucketLimiter,
excluded_paths: List[str] = None
):
self.auth_service = auth_service
self.rate_limiter = rate_limiter
self.excluded_paths = excluded_paths or []
self.bearer_scheme = HTTPBearer()
async def __call__(
self,
request: Request,
call_next
):
# Check if path is excluded
if request.url.path in self.excluded_paths:
return await call_next(request)
# Rate limiting check
client_ip = request.client.host
allowed, wait_time = await self.rate_limiter.check_rate_limit(
client_ip
)
if not allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {wait_time:.1f} seconds"
)
# Authentication check
try:
auth = await self.bearer_scheme(request)
token = auth.credentials
payload = await self.auth_service.verify_token(token)
if not payload:
raise HTTPException(
status_code=401,
detail="Invalid token"
)
# Add user info to request state
request.state.user = payload
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(
status_code=401,
detail="Authentication failed"
)
# Continue with request
response = await call_next(request)
return response
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Status
Accepted | YYYY-MM-DD
9. Advanced Monitoring Implementations
A. Distributed Tracing
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.span import Span
from functools import wraps
import contextvars
current_span_ctx = contextvars.ContextVar('current_span', default=None)
class TracingService:
"""Distributed tracing implementation"""
def __init__(self, service_name: str):
self.tracer = trace.get_tracer(service_name)
def trace_operation(
self,
operation_name: str,
attributes: Optional[Dict] = None
):
"""Decorator for tracing operations"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
with self.tracer.start_as_current_span(
operation_name,
attributes=attributes
) as span:
try:
# Store span in context
token = current_span_ctx.set(span)
# Execute operation
result = await func(*args, **kwargs)
# Set success status
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
# Record error
span.set_status(
Status(StatusCode.ERROR),
str(e)
)
span.record_exception(e)
raise
finally:
# Cleanup context
current_span_ctx.reset(token)
return wrapper
return decorator
def add_event(
self,
name: str,
attributes: Optional[Dict] = None
):
"""Add event to current span"""
span = current_span_ctx.get()
if span:
span.add_event(name, attributes=attributes)
B. Custom Metrics Collection
from dataclasses import dataclass
from typing import Optional, Dict, List
import time
@dataclass
class MetricPoint:
value: float
timestamp: float
labels: Dict[str, str]
class MetricsAggregator:
"""Custom metrics aggregation with statistical functions"""
def __init__(self, retention_period: int = 3600):
self.metrics: Dict[str, List[MetricPoint]] = {}
self.retention_period = retention_period
def record_metric(
self,
name: str,
value: float,
labels: Optional[Dict[str, str]] = None
):
"""Record a new metric point"""
if name not in self.metrics:
self.metrics[name] = []
self.metrics[name].append(
MetricPoint(
value=value,
timestamp=time.time(),
labels=labels or {}
)
)
# Cleanup old points
self._cleanup(name)
def get_statistics(
self,
name: str,
lookback: Optional[int] = None,
filters: Optional[Dict[str, str]] = None
) -> Dict[str, float]:
"""Calculate statistics for a metric"""
if name not in self.metrics:
return {}
points = self.metrics[name]
# Apply time filter
if lookback:
cutoff = time.time() - lookback
points = [p for p in points if p.timestamp >= cutoff]
# Apply label filters
if filters:
points = [
p for p in points
if all(
p.labels.get(k) == v
for k, v in filters.items()
)
]
if not points:
return {}
# Calculate statistics
values = [p.value for p in points]
return {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"sum": sum(values),
"last": points[-1].value
}
def _cleanup(self, metric_name: str):
"""Remove old metric points"""
if metric_name not in self.metrics:
return
cutoff = time.time() - self.retention_period
self.metrics[metric_name] = [
p for p in self.metrics[metric_name]
if p.timestamp >= cutoff
]
10. Integration Patterns
A. Event Bus Implementation
from typing import Callable, Dict, List, Any
import asyncio
import json
class EventBus:
"""Event bus for system-wide event distribution"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.middleware: List[Callable] = []
async def publish(
self,
event_type: str,
data: Any,
metadata: Optional[Dict] = None
):
"""Publish an event to all subscribers"""
if event_type not in self.subscribers:
return
event = {
"type": event_type,
"data": data,
"metadata": metadata or {}
}
# Apply middleware
for middleware in self.middleware:
event = await middleware(event)
if not event:
return
# Notify subscribers
tasks = [
subscriber(event)
for subscriber in self.subscribers[event_type]
]
await asyncio.gather(*tasks)
def subscribe(
self,
event_type: str,
handler: Callable
):
"""Subscribe to events"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
def add_middleware(self, middleware: Callable):
"""Add middleware for event processing"""
self.middleware.append(middleware)
B. Circuit Breaker Pattern
from enum import Enum
from datetime import datetime, timedelta
import asyncio
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for external service calls"""
def __init__(
self,
failure_threshold: int = 5,
recovery_time: int = 60,
half_open_calls: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_time = timedelta(seconds=recovery_time)
self.half_open_calls = half_open_calls
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
self.half_open_successes = 0
async def execute(
self,
func: Callable,
fallback: Optional[Callable] = None,
*args,
**kwargs
) -> Any:
"""Execute function with circuit breaker logic"""
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time >= self.recovery_time:
self.state = CircuitState.HALF_OPEN
self.half_open_successes = 0
else:
return await self._handle_open_circuit(fallback, *args, **kwargs)
try:
result = await func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_calls:
self.state = CircuitState.CLOSED
self.failures = 0
return result
except Exception as e:
await self._handle_failure(e)
if fallback:
return await fallback(*args, **kwargs)
raise
async def _handle_failure(self, exception: Exception):
"""Handle failed execution"""
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
async def _handle_open_circuit(
self,
fallback: Optional[Callable],
*args,
**kwargs
) -> Any:
"""Handle calls when circuit is open"""
if fallback:
return await fallback(*args, **kwargs)
raise Exception("Circuit breaker is open")
Would you like me to continue with:
- Message queue integration patterns?
- Cache synchronization patterns?
- Error handling patterns?
- More monitoring implementations?