Skip to main content

Comprehensive Implementation Patterns

8. Security Implementations

A. JWT Authentication Service

from datetime import datetime, timedelta
from jose import JWTError, jwt
from typing import Optional
import secrets

class JWTAuthService:
"""JWT-based authentication service with refresh token support"""

def __init__(
self,
secret_key: str,
algorithm: str = "HS256",
access_token_expire: int = 15, # minutes
refresh_token_expire: int = 7 # days
):
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expire = timedelta(minutes=access_token_expire)
self.refresh_token_expire = timedelta(days=refresh_token_expire)

async def create_tokens(
self,
user_id: str,
scopes: List[str]
) -> Dict[str, str]:
"""Create access and refresh tokens"""
# Access token data
access_data = {
"sub": user_id,
"scopes": scopes,
"exp": datetime.utcnow() + self.access_token_expire,
"type": "access"
}

# Refresh token data
refresh_data = {
"sub": user_id,
"exp": datetime.utcnow() + self.refresh_token_expire,
"type": "refresh",
"jti": secrets.token_urlsafe(32)
}

return {
"access_token": jwt.encode(
access_data,
self.secret_key,
algorithm=self.algorithm
),
"refresh_token": jwt.encode(
refresh_data,
self.secret_key,
algorithm=self.algorithm
)
}

async def verify_token(
self,
token: str,
token_type: str = "access"
) -> Optional[Dict[str, Any]]:
"""Verify and decode token"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)

# Verify token type
if payload.get("type") != token_type:
return None

return payload
except JWTError:
return None

B. API Security Middleware

from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer
from typing import Optional, List

class SecurityMiddleware:
"""API security middleware with rate limiting and auth"""

def __init__(
self,
auth_service: JWTAuthService,
rate_limiter: TokenBucketLimiter,
excluded_paths: List[str] = None
):
self.auth_service = auth_service
self.rate_limiter = rate_limiter
self.excluded_paths = excluded_paths or []
self.bearer_scheme = HTTPBearer()

async def __call__(
self,
request: Request,
call_next
):
# Check if path is excluded
if request.url.path in self.excluded_paths:
return await call_next(request)

# Rate limiting check
client_ip = request.client.host
allowed, wait_time = await self.rate_limiter.check_rate_limit(
client_ip
)

if not allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {wait_time:.1f} seconds"
)

# Authentication check
try:
auth = await self.bearer_scheme(request)
token = auth.credentials
payload = await self.auth_service.verify_token(token)

if not payload:
raise HTTPException(
status_code=401,
detail="Invalid token"
)

# Add user info to request state
request.state.user = payload

except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(
status_code=401,
detail="Authentication failed"
)

# Continue with request
response = await call_next(request)
return response

Context

The current situation requires a decision because:

  • Requirement 1
  • Constraint 2
  • Need 3

Status

Accepted | YYYY-MM-DD

9. Advanced Monitoring Implementations

A. Distributed Tracing

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.span import Span
from functools import wraps
import contextvars

current_span_ctx = contextvars.ContextVar('current_span', default=None)

class TracingService:
"""Distributed tracing implementation"""

def __init__(self, service_name: str):
self.tracer = trace.get_tracer(service_name)

def trace_operation(
self,
operation_name: str,
attributes: Optional[Dict] = None
):
"""Decorator for tracing operations"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
with self.tracer.start_as_current_span(
operation_name,
attributes=attributes
) as span:
try:
# Store span in context
token = current_span_ctx.set(span)

# Execute operation
result = await func(*args, **kwargs)

# Set success status
span.set_status(Status(StatusCode.OK))

return result
except Exception as e:
# Record error
span.set_status(
Status(StatusCode.ERROR),
str(e)
)
span.record_exception(e)
raise
finally:
# Cleanup context
current_span_ctx.reset(token)
return wrapper
return decorator

def add_event(
self,
name: str,
attributes: Optional[Dict] = None
):
"""Add event to current span"""
span = current_span_ctx.get()
if span:
span.add_event(name, attributes=attributes)

B. Custom Metrics Collection

from dataclasses import dataclass
from typing import Optional, Dict, List
import time

@dataclass
class MetricPoint:
value: float
timestamp: float
labels: Dict[str, str]

class MetricsAggregator:
"""Custom metrics aggregation with statistical functions"""

def __init__(self, retention_period: int = 3600):
self.metrics: Dict[str, List[MetricPoint]] = {}
self.retention_period = retention_period

def record_metric(
self,
name: str,
value: float,
labels: Optional[Dict[str, str]] = None
):
"""Record a new metric point"""
if name not in self.metrics:
self.metrics[name] = []

self.metrics[name].append(
MetricPoint(
value=value,
timestamp=time.time(),
labels=labels or {}
)
)

# Cleanup old points
self._cleanup(name)

def get_statistics(
self,
name: str,
lookback: Optional[int] = None,
filters: Optional[Dict[str, str]] = None
) -> Dict[str, float]:
"""Calculate statistics for a metric"""
if name not in self.metrics:
return {}

points = self.metrics[name]

# Apply time filter
if lookback:
cutoff = time.time() - lookback
points = [p for p in points if p.timestamp >= cutoff]

# Apply label filters
if filters:
points = [
p for p in points
if all(
p.labels.get(k) == v
for k, v in filters.items()
)
]

if not points:
return {}

# Calculate statistics
values = [p.value for p in points]
return {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"sum": sum(values),
"last": points[-1].value
}

def _cleanup(self, metric_name: str):
"""Remove old metric points"""
if metric_name not in self.metrics:
return

cutoff = time.time() - self.retention_period
self.metrics[metric_name] = [
p for p in self.metrics[metric_name]
if p.timestamp >= cutoff
]

10. Integration Patterns

A. Event Bus Implementation

from typing import Callable, Dict, List, Any
import asyncio
import json

class EventBus:
"""Event bus for system-wide event distribution"""

def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.middleware: List[Callable] = []

async def publish(
self,
event_type: str,
data: Any,
metadata: Optional[Dict] = None
):
"""Publish an event to all subscribers"""
if event_type not in self.subscribers:
return

event = {
"type": event_type,
"data": data,
"metadata": metadata or {}
}

# Apply middleware
for middleware in self.middleware:
event = await middleware(event)
if not event:
return

# Notify subscribers
tasks = [
subscriber(event)
for subscriber in self.subscribers[event_type]
]

await asyncio.gather(*tasks)

def subscribe(
self,
event_type: str,
handler: Callable
):
"""Subscribe to events"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)

def add_middleware(self, middleware: Callable):
"""Add middleware for event processing"""
self.middleware.append(middleware)

B. Circuit Breaker Pattern

from enum import Enum
from datetime import datetime, timedelta
import asyncio

class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"

class CircuitBreaker:
"""Circuit breaker for external service calls"""

def __init__(
self,
failure_threshold: int = 5,
recovery_time: int = 60,
half_open_calls: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_time = timedelta(seconds=recovery_time)
self.half_open_calls = half_open_calls

self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
self.half_open_successes = 0

async def execute(
self,
func: Callable,
fallback: Optional[Callable] = None,
*args,
**kwargs
) -> Any:
"""Execute function with circuit breaker logic"""
if self.state == CircuitState.OPEN:
if datetime.now() - self.last_failure_time >= self.recovery_time:
self.state = CircuitState.HALF_OPEN
self.half_open_successes = 0
else:
return await self._handle_open_circuit(fallback, *args, **kwargs)

try:
result = await func(*args, **kwargs)

if self.state == CircuitState.HALF_OPEN:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_calls:
self.state = CircuitState.CLOSED
self.failures = 0

return result

except Exception as e:
await self._handle_failure(e)

if fallback:
return await fallback(*args, **kwargs)
raise

async def _handle_failure(self, exception: Exception):
"""Handle failed execution"""
self.failures += 1
self.last_failure_time = datetime.now()

if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN

async def _handle_open_circuit(
self,
fallback: Optional[Callable],
*args,
**kwargs
) -> Any:
"""Handle calls when circuit is open"""
if fallback:
return await fallback(*args, **kwargs)
raise Exception("Circuit breaker is open")

Would you like me to continue with:

  1. Message queue integration patterns?
  2. Cache synchronization patterns?
  3. Error handling patterns?
  4. More monitoring implementations?