Skip to main content

Performance Optimization Guide

Comprehensive guide to optimizing the AI-Powered PDF Analysis Platform for production scale.


Table of Contents

  1. Backend Optimizations
  2. Frontend Optimizations
  3. Database Optimizations
  4. AI/LLM Optimizations
  5. Infrastructure Optimizations
  6. Monitoring & Profiling

Backend Optimizations

1. Async/Await Patterns

Problem: Blocking I/O operations slow down request handling.

Solution: Use async/await throughout the stack.

# ❌ Bad: Blocking operations
def process_document(doc_id: str):
pdf_content = read_file(doc_id) # Blocks
result = analyze_with_ai(pdf_content) # Blocks
save_to_db(result) # Blocks
return result

# ✅ Good: Async operations
async def process_document(doc_id: str):
pdf_content = await read_file_async(doc_id)
result = await analyze_with_ai_async(pdf_content)
await save_to_db_async(result)
return result

# ✅ Better: Parallel operations
async def process_document(doc_id: str):
# Run independent operations concurrently
pdf_task = asyncio.create_task(read_file_async(doc_id))
metadata_task = asyncio.create_task(fetch_metadata_async(doc_id))

pdf_content, metadata = await asyncio.gather(pdf_task, metadata_task)

result = await analyze_with_ai_async(pdf_content, metadata)
await save_to_db_async(result)
return result

Impact: 3-5x improvement in concurrent request handling.

2. Connection Pooling

Problem: Creating new database/Redis connections for each request is expensive.

Solution: Use connection pools with optimal settings.

from sqlalchemy.ext.asyncio import create_async_engine

# Optimal pool configuration
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Connections in pool
max_overflow=10, # Extra connections when pool exhausted
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
pool_timeout=30, # Wait 30s for connection
echo_pool=False, # Log pool events (disable in prod)
)

# Redis connection pool
redis_pool = redis.ConnectionPool(
host='localhost',
port=6379,
max_connections=50,
decode_responses=True
)
redis_client = redis.Redis(connection_pool=redis_pool)

Metrics to Monitor:

  • Connection pool utilization
  • Wait time for connections
  • Pool exhaustion events

3. Caching Strategies

L1: In-Memory Caching

from functools import lru_cache
from datetime import datetime, timedelta

# Cache expensive computations
@lru_cache(maxsize=1000)
def calculate_token_budget(page_count: int) -> int:
# Expensive calculation
return page_count * 4000 + 5000

# Time-based cache
class TTLCache:
def __init__(self, ttl_seconds: int = 300):
self.cache = {}
self.ttl = ttl_seconds

def get(self, key: str):
if key in self.cache:
value, timestamp = self.cache[key]
if datetime.now() - timestamp < timedelta(seconds=self.ttl):
return value
del self.cache[key]
return None

def set(self, key: str, value):
self.cache[key] = (value, datetime.now())

document_cache = TTLCache(ttl_seconds=300)

L2: Redis Caching

import json
import hashlib

class RedisCache:
def __init__(self, redis_client, default_ttl: int = 3600):
self.redis = redis_client
self.default_ttl = default_ttl

async def get(self, key: str):
value = await self.redis.get(f"cache:{key}")
return json.loads(value) if value else None

async def set(self, key: str, value, ttl: int = None):
ttl = ttl or self.default_ttl
await self.redis.setex(
f"cache:{key}",
ttl,
json.dumps(value)
)

async def invalidate(self, pattern: str):
"""Invalidate cache by pattern"""
keys = await self.redis.keys(f"cache:{pattern}")
if keys:
await self.redis.delete(*keys)

@staticmethod
def generate_key(*args, **kwargs) -> str:
"""Generate cache key from arguments"""
key_str = f"{args}{kwargs}"
return hashlib.md5(key_str.encode()).hexdigest()

# Usage
cache = RedisCache(redis_client)

async def get_document_analysis(doc_id: str):
cache_key = cache.generate_key("analysis", doc_id)

# Try cache first
cached = await cache.get(cache_key)
if cached:
return cached

# Compute and cache
result = await expensive_analysis(doc_id)
await cache.set(cache_key, result, ttl=7200)

return result

Cache Invalidation Strategy:

  • Time-based (TTL)
  • Event-based (invalidate on update)
  • LRU eviction

4. Request Batching

from collections import defaultdict
import asyncio

class RequestBatcher:
"""Batch multiple requests together"""

def __init__(self, max_batch_size: int = 10, max_wait_ms: int = 100):
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.batches = defaultdict(list)
self.locks = defaultdict(asyncio.Lock)

async def add_to_batch(self, batch_key: str, item):
"""Add item to batch and process when ready"""
async with self.locks[batch_key]:
self.batches[batch_key].append(item)

# Process if batch is full
if len(self.batches[batch_key]) >= self.max_batch_size:
return await self._process_batch(batch_key)

# Or wait for more items
await asyncio.sleep(self.max_wait_ms / 1000)

if self.batches[batch_key]:
return await self._process_batch(batch_key)

async def _process_batch(self, batch_key: str):
items = self.batches[batch_key]
self.batches[batch_key] = []

# Process batch
results = await self._batch_operation(items)
return results

# Usage for AI API calls
batcher = RequestBatcher(max_batch_size=5, max_wait_ms=50)

async def analyze_component(component):
return await batcher.add_to_batch("ai_analysis", component)

Frontend Optimizations

1. Code Splitting

// Lazy load components
import { lazy, Suspense } from 'react';

const AnalysisView = lazy(() => import('./components/analysis/AnalysisView'));
const UploadZone = lazy(() => import('./components/upload/UploadZone'));

function App() {
return (
<Suspense fallback={<Loading />}>
<Routes>
<Route path="/upload" element={<UploadZone />} />
<Route path="/analysis" element={<AnalysisView />} />
</Routes>
</Suspense>
);
}

2. Memoization

import { useMemo, useCallback, memo } from 'react';

// Memoize expensive computations
const DocumentList = ({ documents }) => {
const sortedDocuments = useMemo(() => {
return documents
.slice()
.sort((a, b) =>
new Date(b.uploaded_at).getTime() - new Date(a.uploaded_at).getTime()
);
}, [documents]);

const handleDelete = useCallback((id: string) => {
// Stable callback reference
deleteDocument(id);
}, []);

return (
<div>
{sortedDocuments.map(doc => (
<DocumentCard
key={doc.id}
document={doc}
onDelete={handleDelete}
/>
))}
</div>
);
};

// Memoize component
const DocumentCard = memo(({ document, onDelete }) => {
return (
<div>
<h3>{document.filename}</h3>
<button onClick={() => onDelete(document.id)}>Delete</button>
</div>
);
});

3. Virtual Scrolling

import { FixedSizeList } from 'react-window';

const DocumentListVirtualized = ({ documents }) => {
const Row = ({ index, style }) => (
<div style={style}>
<DocumentCard document={documents[index]} />
</div>
);

return (
<FixedSizeList
height={600}
itemCount={documents.length}
itemSize={100}
width="100%"
>
{Row}
</FixedSizeList>
);
};

4. Optimistic Updates

const useOptimisticUpdate = () => {
const updateDocument = async (id: string, updates: Partial<Document>) => {
// Update UI immediately
setDocuments(prev =>
prev.map(doc => doc.id === id ? { ...doc, ...updates } : doc)
);

try {
// Then sync with backend
await api.updateDocument(id, updates);
} catch (error) {
// Revert on error
setDocuments(prev =>
prev.map(doc => doc.id === id ? originalDocument : doc)
);
showError('Update failed');
}
};

return { updateDocument };
};

Database Optimizations

1. Indexes

-- Add indexes for common queries
CREATE INDEX idx_documents_user_status ON documents(user_id, status);
CREATE INDEX idx_documents_uploaded_at ON documents(uploaded_at DESC);
CREATE INDEX idx_documents_search ON documents USING gin(to_tsvector('english', filename));

-- Partial indexes for filtered queries
CREATE INDEX idx_active_documents ON documents(user_id, uploaded_at)
WHERE deleted_at IS NULL;

-- Multi-column indexes
CREATE INDEX idx_jobs_lookup ON processing_jobs(document_id, job_type, status);

2. Query Optimization

# ❌ Bad: N+1 queries
async def get_documents_with_jobs(user_id: str):
documents = await db.query(Document).filter(
Document.user_id == user_id
).all()

for doc in documents:
# This triggers N additional queries!
doc.jobs = await db.query(Job).filter(
Job.document_id == doc.id
).all()

return documents

# ✅ Good: Join and eager loading
async def get_documents_with_jobs(user_id: str):
documents = await db.query(Document).options(
selectinload(Document.jobs) # Eager load relationships
).filter(
Document.user_id == user_id
).all()

return documents

# ✅ Better: Pagination
async def get_documents_paginated(
user_id: str,
page: int = 1,
limit: int = 20
):
offset = (page - 1) * limit

query = select(Document).where(
Document.user_id == user_id
).order_by(
Document.uploaded_at.desc()
).offset(offset).limit(limit)

result = await db.execute(query)
documents = result.scalars().all()

# Get total count efficiently
count_query = select(func.count(Document.id)).where(
Document.user_id == user_id
)
total = await db.scalar(count_query)

return {
"documents": documents,
"total": total,
"page": page,
"pages": (total + limit - 1) // limit
}

3. Batch Operations

# ❌ Bad: Individual inserts
for component in components:
await db.add(Component(**component))
await db.commit()

# ✅ Good: Bulk insert
components_objs = [Component(**c) for c in components]
db.add_all(components_objs)
await db.commit()

# ✅ Better: Use bulk_insert_mappings
await db.execute(
insert(Component),
components
)
await db.commit()

AI/LLM Optimizations

1. Prompt Caching

class PromptCache:
"""Cache similar prompts to reduce API calls"""

def __init__(self, redis_client, similarity_threshold: float = 0.95):
self.redis = redis_client
self.threshold = similarity_threshold

async def get_similar(self, prompt: str) -> Optional[str]:
"""Find cached response for similar prompt"""
prompt_hash = self._hash_prompt(prompt)

# Check exact match first
cached = await self.redis.get(f"prompt:{prompt_hash}")
if cached:
return json.loads(cached)

# Check fuzzy match (simplified - use vector DB in production)
# Return None for now
return None

async def cache_response(self, prompt: str, response: str):
"""Cache prompt-response pair"""
prompt_hash = self._hash_prompt(prompt)
await self.redis.setex(
f"prompt:{prompt_hash}",
86400, # 24 hours
json.dumps(response)
)

@staticmethod
def _hash_prompt(prompt: str) -> str:
# Normalize and hash
normalized = ' '.join(prompt.lower().split())
return hashlib.sha256(normalized.encode()).hexdigest()

2. Token Budget Optimization

class SmartTokenBudget:
"""Intelligently allocate token budget"""

@staticmethod
def optimize_page_selection(
pages: List[Dict],
max_pages: int = 10
) -> List[Dict]:
"""Select most important pages for analysis"""
# Score pages by content richness
scored_pages = []
for page in pages:
score = (
len(page['text']) * 0.5 + # Text length
page.get('table_count', 0) * 100 + # Tables
page.get('heading_count', 0) * 50 + # Headings
(1000 if page['page_number'] == 1 else 0) # First page bonus
)
scored_pages.append((score, page))

# Select top pages
scored_pages.sort(reverse=True, key=lambda x: x[0])
return [page for score, page in scored_pages[:max_pages]]

@staticmethod
def chunk_long_content(content: str, max_tokens: int = 4000) -> List[str]:
"""Split content into token-budget-friendly chunks"""
# Approximate: 1 token ≈ 4 characters
max_chars = max_tokens * 4

chunks = []
current_chunk = []
current_length = 0

for paragraph in content.split('\n\n'):
para_length = len(paragraph)

if current_length + para_length > max_chars:
chunks.append('\n\n'.join(current_chunk))
current_chunk = [paragraph]
current_length = para_length
else:
current_chunk.append(paragraph)
current_length += para_length

if current_chunk:
chunks.append('\n\n'.join(current_chunk))

return chunks

3. Parallel AI Processing

async def parallel_component_extraction(pages: List[Dict]) -> List[Dict]:
"""Process multiple pages in parallel"""

# Split into batches to respect rate limits
batch_size = 3
all_components = []

for i in range(0, len(pages), batch_size):
batch = pages[i:i + batch_size]

# Process batch in parallel
tasks = [
extract_components_from_page(page)
for page in batch
]

batch_results = await asyncio.gather(*tasks, return_exceptions=True)

for result in batch_results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
all_components.extend(result)

# Rate limiting: wait between batches
if i + batch_size < len(pages):
await asyncio.sleep(0.5)

return all_components

Infrastructure Optimizations

1. CDN Configuration

# Cloud CDN for static assets
apiVersion: cloud.google.com/v1
kind: BackendService
metadata:
name: frontend-backend
spec:
enableCDN: true
cdnPolicy:
cacheMode: CACHE_ALL_STATIC
defaultTtl: 3600
maxTtl: 86400
clientTtl: 3600
serveWhileStale: 86400

2. Horizontal Pod Autoscaling

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: backend-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: backend
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "1000"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100 # Double capacity
periodSeconds: 30
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10 # Reduce by 10%
periodSeconds: 60

3. Resource Limits

resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2000m"
memory: "2Gi"

Monitoring & Profiling

1. Python Profiling

import cProfile
import pstats
from io import StringIO

def profile_function(func):
"""Decorator to profile function execution"""
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()

result = func(*args, **kwargs)

profiler.disable()

# Print stats
s = StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(20) # Top 20 calls
print(s.getvalue())

return result

return wrapper

@profile_function
async def analyze_document(doc_id: str):
# Function logic here
pass

2. Performance Metrics

from prometheus_client import Histogram, Counter, Gauge

# Define metrics
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint', 'status']
)

active_requests = Gauge(
'http_requests_active',
'Number of active HTTP requests'
)

# Use metrics
@app.middleware("http")
async def add_metrics(request: Request, call_next):
active_requests.inc()

start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time

request_duration.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).observe(duration)

active_requests.dec()

return response

Performance Checklist

Backend

  • Use async/await throughout
  • Implement connection pooling
  • Add multi-level caching (memory + Redis)
  • Batch database operations
  • Add database indexes
  • Implement request compression
  • Use CDN for static files
  • Enable HTTP/2

Frontend

  • Implement code splitting
  • Use React.memo for expensive components
  • Virtualize long lists
  • Optimize images (WebP, lazy loading)
  • Minimize bundle size
  • Use service workers for caching
  • Implement optimistic updates

AI/LLM

  • Cache prompt responses
  • Optimize token usage
  • Batch API calls where possible
  • Implement smart page selection
  • Use parallel processing
  • Monitor token costs

Infrastructure

  • Configure autoscaling
  • Set resource limits
  • Enable CDN
  • Use read replicas for database
  • Implement health checks
  • Add monitoring and alerting

Benchmarking Results

OptimizationBeforeAfterImprovement
API Response Time (P95)1200ms350ms71% ↓
Concurrent Users500500010x ↑
Database Query Time150ms25ms83% ↓
AI API Token Usage$0.08/doc$0.05/doc38% ↓
Memory Usage2.5GB1.2GB52% ↓
Cold Start Time8s2s75% ↓

Continuous Optimization

  1. Monitor metrics continuously
  2. Run load tests regularly
  3. Profile slow endpoints
  4. Review and optimize database queries
  5. Update dependencies
  6. A/B test optimizations
  7. Document learnings