Additional Implementation Patterns
11. Message Queue Integration Patterns​
A. Advanced Message Queue Handler​
from typing import Dict, Any, Callable, Optional, TypeVar, Generic
from aio_pika import connect_robust, Message, ExchangeType
import asyncio
import json
T = TypeVar('T')
class MessageQueueHandler(Generic[T]):
"""Robust message queue handler with retry and DLQ support"""
def __init__(
self,
amqp_url: str,
exchange_name: str,
queue_name: str,
message_type: type[T],
max_retries: int = 3,
retry_delay: int = 5000
):
self.amqp_url = amqp_url
self.exchange_name = exchange_name
self.queue_name = queue_name
self.message_type = message_type
self.max_retries = max_retries
self.retry_delay = retry_delay
self.handlers: Dict[str, Callable] = {}
async def initialize(self):
"""Initialize queue connections and channels"""
# Main connection
self.connection = await connect_robust(self.amqp_url)
self.channel = await self.connection.channel()
# Declare exchanges
self.main_exchange = await self.channel.declare_exchange(
self.exchange_name,
ExchangeType.TOPIC
)
self.dlx_exchange = await self.channel.declare_exchange(
f"{self.exchange_name}.dlx",
ExchangeType.TOPIC
)
# Declare queues
self.main_queue = await self.channel.declare_queue(
self.queue_name,
arguments={
'x-dead-letter-exchange': f"{self.exchange_name}.dlx",
'x-dead-letter-routing-key': f"{self.queue_name}.deadletter"
}
)
self.retry_queue = await self.channel.declare_queue(
f"{self.queue_name}.retry",
arguments={
'x-dead-letter-exchange': self.exchange_name,
'x-dead-letter-routing-key': self.queue_name,
'x-message-ttl': self.retry_delay
}
)
self.dlq = await self.channel.declare_queue(
f"{self.queue_name}.dlq"
)
# Bind queues
await self.main_queue.bind(self.main_exchange, self.queue_name)
await self.retry_queue.bind(self.main_exchange, f"{self.queue_name}.retry")
await self.dlq.bind(self.dlx_exchange, f"{self.queue_name}.deadletter")
async def publish(
self,
message: T,
routing_key: Optional[str] = None
):
"""Publish message to queue"""
if not isinstance(message, self.message_type):
raise ValueError(f"Message must be of type {self.message_type}")
message_data = {
'content': message.__dict__,
'metadata': {
'retry_count': 0,
'first_attempt': datetime.utcnow().isoformat()
}
}
await self.main_exchange.publish(
Message(
body=json.dumps(message_data).encode(),
content_type='application/json',
delivery_mode=2 # Persistent
),
routing_key=routing_key or self.queue_name
)
async def consume(self, callback: Callable[[T], Awaitable[None]]):
"""Start consuming messages"""
async def process_message(message: Message):
async with message.process():
try:
message_data = json.loads(message.body.decode())
content = message_data['content']
metadata = message_data['metadata']
# Create message instance
msg_instance = self.message_type(**content)
try:
await callback(msg_instance)
except Exception as e:
# Handle retry logic
retry_count = metadata.get('retry_count', 0)
if retry_count < self.max_retries:
# Increment retry count
metadata['retry_count'] = retry_count + 1
message_data['metadata'] = metadata
# Publish to retry queue
await self.main_exchange.publish(
Message(
body=json.dumps(message_data).encode(),
content_type='application/json',
delivery_mode=2
),
routing_key=f"{self.queue_name}.retry"
)
else:
# Send to DLQ
await self.dlx_exchange.publish(
Message(
body=message.body,
content_type=message.content_type,
delivery_mode=2,
headers={
'x-error': str(e),
'x-failed-at': datetime.utcnow().isoformat()
}
),
routing_key=f"{self.queue_name}.deadletter"
)
except json.JSONDecodeError:
# Invalid message format, send to DLQ
await self.dlx_exchange.publish(
Message(
body=message.body,
content_type=message.content_type,
delivery_mode=2,
headers={
'x-error': 'Invalid message format',
'x-failed-at': datetime.utcnow().isoformat()
}
),
routing_key=f"{self.queue_name}.deadletter"
)
await self.main_queue.consume(process_message)
Context​
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Status​
Accepted | YYYY-MM-DD
12. Cache Synchronization Patterns​
A. Distributed Cache Manager​
from typing import Optional, Any, List
import hashlib
import asyncio
import json
class DistributedCacheManager:
"""Distributed cache with synchronization and versioning"""
def __init__(
self,
redis_client: Redis,
pubsub_client: Redis,
cache_prefix: str = 'cache',
sync_channel: str = 'cache_sync'
):
self.redis = redis_client
self.pubsub = pubsub_client
self.prefix = cache_prefix
self.sync_channel = sync_channel
self.local_cache = {}
self.version_map = {}
async def initialize(self):
"""Initialize cache manager"""
# Subscribe to sync channel
await self._start_sync_listener()
# Load version map
self.version_map = await self._load_version_map()
async def get(
self,
key: str,
default: Any = None
) -> Optional[Any]:
"""Get value from cache"""
cache_key = f"{self.prefix}:{key}"
# Check local cache
local_version = self.version_map.get(key)
if local_version and key in self.local_cache:
value = await self.redis.get(f"{cache_key}:version")
if value == local_version:
return self.local_cache[key]
# Get from Redis
value = await self.redis.get(cache_key)
if value is None:
return default
try:
data = json.loads(value)
# Update local cache
self.local_cache[key] = data
self.version_map[key] = await self.redis.get(f"{cache_key}:version")
return data
except json.JSONDecodeError:
return default
async def set(
self,
key: str,
value: Any,
expire: Optional[int] = None
):
"""Set value in cache"""
cache_key = f"{self.prefix}:{key}"
version = self._generate_version(value)
# Store in Redis
async with self.redis.pipeline() as pipe:
pipe.set(cache_key, json.dumps(value))
pipe.set(f"{cache_key}:version", version)
if expire:
pipe.expire(cache_key, expire)
pipe.expire(f"{cache_key}:version", expire)
await pipe.execute()
# Update local cache
self.local_cache[key] = value
self.version_map[key] = version
# Notify other instances
await self._notify_update(key, version)
async def invalidate(
self,
key: str
):
"""Invalidate cache entry"""
cache_key = f"{self.prefix}:{key}"
# Remove from Redis
async with self.redis.pipeline() as pipe:
pipe.delete(cache_key)
pipe.delete(f"{cache_key}:version")
await pipe.execute()
# Remove from local cache
self.local_cache.pop(key, None)
self.version_map.pop(key, None)
# Notify other instances
await self._notify_invalidation(key)
async def _start_sync_listener(self):
"""Start listening for cache sync messages"""
pubsub = await self.pubsub.subscribe(self.sync_channel)
async def listener():
while True:
try:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
data = json.loads(message['data'])
action = data.get('action')
key = data.get('key')
if action == 'update':
# Handle update
if self.version_map.get(key) != data.get('version'):
# Version mismatch, invalidate local cache
self.local_cache.pop(key, None)
self.version_map.pop(key, None)
elif action == 'invalidate':
# Handle invalidation
self.local_cache.pop(key, None)
self.version_map.pop(key, None)
except Exception as e:
logger.error(f"Error in cache sync listener: {e}")
await asyncio.sleep(0.1)
# Start listener task
asyncio.create_task(listener())
async def _notify_update(
self,
key: str,
version: str
):
"""Notify other instances of cache update"""
await self.pubsub.publish(
self.sync_channel,
json.dumps({
'action': 'update',
'key': key,
'version': version
})
)
async def _notify_invalidation(
self,
key: str
):
"""Notify other instances of cache invalidation"""
await self.pubsub.publish(
self.sync_channel,
json.dumps({
'action': 'invalidate',
'key': key
})
)
def _generate_version(self, value: Any) -> str:
"""Generate version hash for value"""
return hashlib.sha256(
json.dumps(value, sort_keys=True).encode()
).hexdigest()
async def _load_version_map(self) -> Dict[str, str]:
"""Load version map from Redis"""
version_map = {}
pattern = f"{self.prefix}:*:version"
cursor = 0
while True:
cursor, keys = await self.redis.scan(
cursor,
match=pattern
)
if keys:
for key in keys:
base_key = key.replace(f"{self.prefix}:", '').replace(':version', '')
version = await self.redis.get(key)
if version:
version_map[base_key] = version
if cursor == 0:
break
return version_map
I'll continue with error handling patterns and monitoring implementations in the next section. Would you like me to proceed with those now?