Skip to main content

Master Prompt: AI-Powered PDF Analysis Platform

Purpose: Generate a complete, production-ready, cloud-native PDF analysis platform with AI integration deployed on Google Kubernetes Engine.


System Overview

Create a full-stack web application that enables users to upload PDF documents, processes them using AI (Claude), extracts structured components, performs cross-validation, and displays results in real-time through WebSocket connections.

Core Requirements

  1. Frontend: React 18 + TypeScript SPA with Material-UI
  2. Backend: FastAPI (Python 3.11) with WebSocket support
  3. AI Integration: Anthropic Claude Sonnet 4 for analysis
  4. Infrastructure: Google Kubernetes Engine (GKE) deployment
  5. Storage: PostgreSQL (metadata), Redis (cache/pub-sub), GCS (files)
  6. Real-time: WebSocket for progress updates
  7. Architecture: Event-driven, microservices-ready

Detailed Technical Specifications

1. Frontend Application

Technology Stack:

{
"framework": "React 18.2+",
"language": "TypeScript 5.2+",
"build_tool": "Vite 5.0+",
"ui_library": "Material-UI 5.14+",
"state_management": "Zustand 4.4+",
"http_client": "Axios 1.5+",
"websocket": "Native WebSocket API"
}

Features to Implement:

  1. File Upload Interface

    • Drag-and-drop zone using react-dropzone
    • Multi-file selection support
    • File type validation (PDF only)
    • Size limit validation (50MB max)
    • Upload progress indicator
    • Preview thumbnail generation
  2. Document Management Dashboard

    • Grid/List view toggle
    • Document cards with metadata
    • Status badges (uploaded, processing, completed, failed)
    • Real-time progress bars during processing
    • Search and filter capabilities
    • Sorting (by date, name, size)
  3. Analysis Results Viewer

    • Document statistics panel
    • Extracted components list
    • Component type filtering
    • Confidence score visualization
    • Page-by-page navigation
    • Export results (JSON, CSV)
  4. Real-time Updates

    • WebSocket connection management
    • Automatic reconnection with exponential backoff
    • Connection status indicator
    • Toast notifications for events
  5. Error Handling

    • User-friendly error messages
    • Retry mechanisms
    • Offline detection
    • Network error recovery

Component Structure:

src/
├── components/
│ ├── layout/
│ │ ├── AppShell.tsx
│ │ ├── header.tsx
│ │ └── Navigation.tsx
│ ├── upload/
│ │ ├── UploadZone.tsx
│ │ ├── FileList.tsx
│ │ └── UploadProgress.tsx
│ ├── documents/
│ │ ├── DocumentGrid.tsx
│ │ ├── DocumentCard.tsx
│ │ └── DocumentFilters.tsx
│ └── analysis/
│ ├── AnalysisPanel.tsx
│ ├── ComponentList.tsx
│ └── StatisticsView.tsx
├── hooks/
│ ├── use-web-socket.ts
│ ├── useDocuments.ts
│ ├── useUpload.ts
│ └── useAnalysis.ts
├── services/
│ ├── api.ts
│ ├── websocket.ts
│ └── storage.ts
├── store/
│ ├── document-store.ts
│ └── uiStore.ts
├── types/
│ ├── document.ts
│ └── analysis.ts
└── utils/
├── formatters.ts
└── validators.ts

2. Backend Application

Technology Stack:

fastapi==0.104.1
uvicorn[standard]==0.24.0
pdfplumber==0.10.3
anthropic==0.7.0
redis==5.0.1
sqlalchemy[asyncio]==2.0.23
asyncpg==0.29.0
google-cloud-storage==2.10.0
pydantic==2.5.0
python-multipart==0.0.6
prometheus-client==0.19.0

Architecture Patterns:

  1. Dependency Injection

    from fastapi import Depends

    async def get_db():
    async with AsyncSessionLocal() as session:
    yield session

    async def get_redis():
    return redis_client

    @app.post("/documents/upload")
    async def upload(
    db: AsyncSession = Depends(get_db),
    cache: Redis = Depends(get_redis)
    ):
    pass
  2. Event-Driven Processing

    class EventBus:
    async def publish(self, event: Event):
    await redis_client.publish(
    f"events.{event.type}",
    event.json()
    )

    async def subscribe(self, pattern: str, handler):
    pubsub = redis_client.pubsub()
    await pubsub.psubscribe(pattern)
    async for message in pubsub.listen():
    await handler(Event.parse_raw(message['data']))
  3. Background Processing

    from fastapi import BackgroundTasks

    @app.post("/documents/upload")
    async def upload(
    file: UploadFile,
    background_tasks: BackgroundTasks
    ):
    # Save file
    doc_id = await save_file(file)

    # Queue processing
    background_tasks.add_task(process_pdf, doc_id)

    return {"document_id": doc_id}

API Endpoints to Implement:

  1. Document Management

    • POST /api/v1/documents/upload - Upload PDF
    • GET /api/v1/documents - List documents
    • GET /api/v1/documents/{id} - Get document details
    • DELETE /api/v1/documents/{id} - Delete document
    • GET /api/v1/documents/{id}/download - Download PDF
  2. Analysis

    • GET /api/v1/documents/{id}/analysis - Get analysis results
    • POST /api/v1/documents/{id}/analyze - Trigger re-analysis
    • GET /api/v1/documents/{id}/components - Get extracted components
  3. WebSocket

    • WS /ws - WebSocket connection
    • Message types: connection_ack, subscribe, document., analysis.

AI Integration Pattern:

class AIAnalysisService:
"""Hierarchical AI analysis with token management"""

async def analyze_document(self, document_id: str) -> AnalysisResult:
# Step 1: Extract raw content
pdf_data = await self.pdf_service.extract(document_id)

# Step 2: Structure analysis (2K tokens)
structure = await self.analyze_structure(
pdf_data['pages'][0]['text']
)

# Step 3: Component extraction (4K tokens/page)
components = []
for page in pdf_data['pages'][:10]: # Limit pages
page_components = await self.extract_components(page)
components.extend(page_components)

# Step 4: Cross-validation (3K tokens)
validation = await self.cross_validate(
pdf_data,
components
)

# Step 5: Synthesis (5K tokens)
summary = await self.synthesize(
structure,
components,
validation
)

return AnalysisResult(
document_id=document_id,
structure=structure,
components=components,
validation=validation,
summary=summary
)

async def analyze_structure(self, content: str) -> Dict:
prompt = f"""Analyze document structure and return JSON:

{content[:5000]}

Return:
{{
"document_type": "report|article|manual|other",
"sections": [
{{"title": "str", "level": 1-3, "summary": "str"}}
],
"key_topics": ["topic1", "topic2"],
"reading_time_minutes": int
}}"""

response = await self.claude.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
temperature=0.3,
messages=[{"role": "user", "content": prompt}]
)

return self._parse_json_response(response)

async def extract_components(self, page: Dict) -> List[Dict]:
prompt = f"""Extract components from this page:

{page['text']}

Return JSON array:
[
{{
"type": "heading|paragraph|list|table|figure",
"content": "extracted text",
"importance": "high|medium|low",
"metadata": {{}}}
}}
]"""

response = await self.claude.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4000,
temperature=0.2,
messages=[{"role": "user", "content": prompt}]
)

return self._parse_json_response(response)

async def cross_validate(
self,
original: Dict,
extracted: List[Dict]
) -> Dict:
prompt = f"""Validate extraction quality:

Original stats:
- Pages: {original['total_pages']}
- Characters: {original['total_chars']}

Extracted: {len(extracted)} components

Assess completeness and accuracy. Return JSON:
{{
"completeness_score": 0.0-1.0,
"accuracy_score": 0.0-1.0,
"overall_confidence": 0.0-1.0,
"issues": ["issue1", "issue2"],
"recommendations": ["rec1", "rec2"]
}}"""

response = await self.claude.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=3000,
temperature=0.1,
messages=[{"role": "user", "content": prompt}]
)

return self._parse_json_response(response)

def _parse_json_response(self, response) -> Dict:
content = response.content[0].text

# Extract JSON from markdown
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
elif "```" in content:
content = content.split("```")[1].split("```")[0]

return json.loads(content.strip())

Token Budget Management:

class TokenBudget:
"""Token usage tracking and budgeting"""

# Token limits per operation
STRUCTURE_ANALYSIS = 2_000
COMPONENT_EXTRACTION_PER_PAGE = 4_000
VALIDATION = 3_000
SYNTHESIS = 5_000

# Maximum budget per document
MAX_BUDGET = 100_000

@classmethod
def calculate_budget(cls, page_count: int) -> int:
"""Calculate total token budget for document"""
extraction_budget = page_count * cls.COMPONENT_EXTRACTION_PER_PAGE

return (
cls.STRUCTURE_ANALYSIS +
extraction_budget +
cls.VALIDATION +
cls.SYNTHESIS
)

@classmethod
def can_process(cls, page_count: int) -> bool:
"""Check if document fits within budget"""
required = cls.calculate_budget(page_count)
return required <= cls.MAX_BUDGET

@classmethod
def recommend_strategy(cls, page_count: int) -> str:
"""Recommend processing strategy"""
if page_count <= 10:
return "full_analysis"
elif page_count <= 50:
return "selective_analysis" # Key pages only
else:
return "batch_processing" # Split into chunks

3. Infrastructure as Code

GKE Cluster Setup (Terraform):

# terraform/main.tf

resource "google_container_cluster" "primary" {
name = "pdf-analysis-cluster"
location = var.region

# Autopilot mode for managed infrastructure
enable_autopilot = true

release_channel {
channel = "REGULAR"
}

workload_identity_config {
workload_pool = "${var.project_id}.svc.id.goog"
}

network_policy {
enabled = true
}

ip_allocation_policy {
cluster_ipv4_cidr_block = ""
services_ipv4_cidr_block = ""
}
}

resource "google_sql_database_instance" "postgres" {
name = "pdf-analysis-db"
database_version = "POSTGRES_15"
region = var.region

settings {
tier = "db-custom-2-7680"

backup_configuration {
enabled = true
point_in_time_recovery_enabled = true
}

ip_configuration {
ipv4_enabled = false
private_network = google_compute_network.vpc.id
}
}
}

resource "google_redis_instance" "cache" {
name = "pdf-analysis-redis"
memory_size_gb = 5
region = var.region
tier = "STANDARD_HA"
redis_version = "REDIS_7_0"
}

resource "google_storage_bucket" "pdfs" {
name = "${var.project_id}-pdf-storage"
location = var.region
storage_class = "STANDARD"

versioning {
enabled = true
}

lifecycle_rule {
condition {
age = 90
}
action {
type = "Delete"
}
}
}

Kubernetes Deployment Strategy:

  1. Multi-environment setup (dev, staging, prod)
  2. GitOps workflow with ArgoCD or Flux
  3. Helm charts for templating
  4. Blue-Green deployments for zero-downtime
  5. Canary releases for gradual rollout

4. Monitoring & Observability

Metrics to Collect:

from prometheus_client import Counter, Histogram, Gauge

# Business metrics
pdf_uploads_total = Counter(
'pdf_uploads_total',
'Total PDF uploads',
['status']
)

processing_duration = Histogram(
'pdf_processing_duration_seconds',
'PDF processing time',
['stage'],
buckets=[1, 5, 10, 30, 60, 120, 300]
)

ai_api_calls = Counter(
'ai_api_calls_total',
'Claude API calls',
['operation', 'status']
)

ai_tokens_used = Counter(
'ai_tokens_used_total',
'Tokens consumed',
['operation']
)

# System metrics
active_websockets = Gauge(
'websocket_connections_active',
'Active WebSocket connections'
)

redis_operations = Counter(
'redis_operations_total',
'Redis operations',
['operation', 'status']
)

Logging Strategy:

import structlog

logger = structlog.get_logger()

# Structured logging
logger.info(
"document_uploaded",
document_id=doc_id,
user_id=user_id,
filename=filename,
size_bytes=size
)

logger.error(
"ai_analysis_failed",
document_id=doc_id,
error=str(e),
stage="component_extraction"
)

Distributed Tracing:

from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

tracer = trace.get_tracer(__name__)

@app.post("/documents/upload")
async def upload(file: UploadFile):
with tracer.start_as_current_span("upload_document") as span:
span.set_attribute("filename", file.filename)
span.set_attribute("size", file.size)

# Processing logic
pass

5. CI/CD Pipeline

GitHub Actions Workflow:

name: CI/CD Pipeline

on:
push:
branches: [main, develop]
pull_request:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Run backend tests
run: |
cd backend
pip install -r requirements.txt
pytest --cov --cov-report=xml

- name: Run frontend tests
run: |
cd frontend
npm install
npm run test

- name: Upload coverage
uses: codecov/codecov-action@v3

build:
needs: test
runs-on: ubuntu-latest
steps:
- name: Build backend image
run: |
docker build -t gcr.io/$PROJECT/backend:$SHA backend/
docker push gcr.io/$PROJECT/backend:$SHA

- name: Build frontend image
run: |
docker build -t gcr.io/$PROJECT/frontend:$SHA frontend/
docker push gcr.io/$PROJECT/frontend:$SHA

deploy:
needs: build
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to GKE
run: |
gcloud container clusters get-credentials pdf-analysis-cluster
kubectl set image deployment/backend backend=gcr.io/$PROJECT/backend:$SHA
kubectl set image deployment/frontend frontend=gcr.io/$PROJECT/frontend:$SHA
kubectl rollout status deployment/backend
kubectl rollout status deployment/frontend

6. Security Implementation

Security Checklist:

  1. Authentication

    • JWT tokens with refresh mechanism
    • OAuth 2.0 integration (Google, GitHub)
    • Multi-factor authentication (MFA)
  2. Authorization

    • Role-based access control (RBAC)
    • Document ownership validation
    • API rate limiting per user
  3. Data Protection

    • TLS 1.3 for all connections
    • Encryption at rest (AES-256)
    • PII detection and masking
    • Secure file upload validation
  4. Infrastructure

    • Network policies in Kubernetes
    • Cloud Armor WAF
    • DDoS protection
    • Secret management with GCP Secret Manager
    • Workload Identity for service accounts
  5. Compliance

    • GDPR compliance (data deletion, export)
    • SOC 2 audit logging
    • Data retention policies

Security Implementation:

from fastapi import Security, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

security = HTTPBearer()

async def verify_token(
credentials: HTTPAuthorizationCredentials = Security(security)
) -> dict:
try:
payload = jwt.decode(
credentials.credentials,
SECRET_KEY,
algorithms=["HS256"]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.JWTError:
raise HTTPException(401, "Invalid token")

@app.post("/documents/upload")
async def upload(
file: UploadFile,
user: dict = Depends(verify_token)
):
# Validate file
if not file.filename.endswith('.pdf'):
raise HTTPException(400, "Invalid file type")

# Check size
if file.size > MAX_FILE_SIZE:
raise HTTPException(413, "File too large")

# Scan for malware (integration with VirusTotal or similar)
await scan_file(file)

# Process upload
pass

Implementation Checklist

Phase 1: Foundation (Week 1-2)

  • Set up GCP project and enable APIs
  • Create GKE cluster with Terraform
  • Set up Cloud SQL (PostgreSQL)
  • Set up Memorystore (Redis)
  • Set up Cloud Storage bucket
  • Configure IAM and Workload Identity

Phase 2: Backend Core (Week 3-4)

  • FastAPI application skeleton
  • Database models and migrations
  • File upload endpoint
  • PDF processing service
  • Redis integration for caching
  • WebSocket manager
  • Background task processing

Phase 3: AI Integration (Week 5-6)

  • Anthropic Claude SDK integration
  • Prompt engineering framework
  • Structure analysis
  • Component extraction
  • Cross-validation
  • Token budget management
  • Error handling and retries

Phase 4: Frontend Development (Week 7-8)

  • React application setup
  • Material-UI theme configuration
  • Upload interface
  • Document management dashboard
  • WebSocket integration
  • Analysis results viewer
  • Error handling and notifications

Phase 5: Infrastructure & DevOps (Week 9-10)

  • Kubernetes manifests
  • Helm charts
  • CI/CD pipeline
  • Monitoring setup (Prometheus, Grafana)
  • Logging (Cloud Logging, Loki)
  • Distributed tracing (Jaeger)
  • Alerting rules

Phase 6: Testing & Quality (Week 11-12)

  • Unit tests (backend)
  • Integration tests
  • End-to-end tests (frontend)
  • Load testing (k6)
  • Security scanning
  • Performance optimization
  • Documentation

Phase 7: Production Readiness (Week 13-14)

  • Security hardening
  • Disaster recovery setup
  • Backup automation
  • Monitoring dashboards
  • Runbooks and playbooks
  • Load balancer configuration
  • SSL/TLS certificates

Success Criteria

Functional Requirements

✅ Users can upload PDF documents up to 50MB
✅ System processes PDFs and extracts text/tables
✅ AI analysis identifies document structure
✅ Components are extracted with 95%+ accuracy
✅ Cross-validation provides confidence scores
✅ Real-time progress updates via WebSocket
✅ Results displayed in intuitive dashboard

Non-Functional Requirements

Performance: <500ms API response time (P95)
Scalability: Support 10,000 concurrent users
Availability: 99.9% uptime SLA
Processing: <45s for 10-page PDF analysis
Cost: <$0.10 per document processed
Security: Pass SOC 2 security audit


Deployment Instructions

  1. Prerequisites

    # Install tools
    gcloud components install kubectl
    terraform install
    helm install

    # Authenticate
    gcloud auth login
    gcloud config set project PROJECT_ID
  2. Infrastructure Setup

    cd terraform
    terraform init
    terraform plan
    terraform apply
  3. Application Deployment

    # Build images
    docker build -t gcr.io/PROJECT_ID/backend:v1 backend/
    docker build -t gcr.io/PROJECT_ID/frontend:v1 frontend/

    # Push images
    docker push gcr.io/PROJECT_ID/backend:v1
    docker push gcr.io/PROJECT_ID/frontend:v1

    # Deploy to GKE
    kubectl apply -f k8s/

    # Verify deployment
    kubectl get pods -n pdf-analysis
    kubectl get services -n pdf-analysis
  4. Configure DNS

    # Get external IP
    kubectl get ingress -n pdf-analysis

    # Update DNS records
    # A record: pdfanalysis.example.com -> EXTERNAL_IP
  5. Verify Installation

    # Health check
    curl https://pdfanalysis.example.com/

    # Upload test
    curl -X POST https://pdfanalysis.example.com/api/v1/documents/upload \
    -F "file=@test.pdf"

Troubleshooting Guide

Common Issues

Issue: WebSocket connection fails
Solution: Check firewall rules, verify WebSocket upgrade headers, ensure load balancer supports WebSocket

Issue: AI analysis timeouts
Solution: Increase Claude API timeout, implement retry logic, check token budget limits

Issue: High memory usage
Solution: Reduce concurrent processing, implement pagination, use streaming responses

Issue: Slow PDF processing
Solution: Optimize pdfplumber settings, use multiprocessing, cache intermediate results


Cost Optimization

Estimated Monthly Costs (1000 documents/day):

ServiceCost
GKE Autopilot$150
Cloud SQL$100
Memorystore Redis$80
Cloud Storage$20
Claude API (30K docs)$1,500
Load Balancer$20
Total~$1,870/month

Optimization Strategies:

  • Use committed use discounts (30% savings)
  • Implement aggressive caching
  • Batch processing during off-peak hours
  • Use spot instances for non-critical workloads
  • Optimize AI prompts to reduce token usage

Next Steps & Enhancements

Phase 2 Features

  • OCR for scanned PDFs
  • Multi-language support
  • Batch upload and processing
  • Advanced search capabilities
  • Document comparison
  • Custom AI models fine-tuning

Phase 3 Features

  • Collaborative annotations
  • Version control for documents
  • API for third-party integrations
  • Mobile applications
  • Enterprise SSO integration
  • Advanced analytics dashboard

References


Generated by: Claude (Anthropic)
Version: 1.0
Last Updated: 2025-10-31