RAG PDF Analyzer
In the era of AI-powered applications, the ability to extract insights from documents is crucial. Whether you're building a knowledge base, a research assistant, or a customer support system, you need to transform static PDFs into queryable, intelligent systems. This is where Retrieval-Augmented Generation (RAG) architecture shines, and where the Motia framework provides an elegant solution.
This comprehensive guide explores how to build a production-ready RAG system that intelligently processes PDFs and answers questions about their content. We'll cover:
- The RAG Architecture: Understanding how document processing, vector storage, and AI generation work together.
- Motia's Event-Driven Approach: How
stepscreate a scalable, maintainable RAG pipeline. - Building the Workflow: A detailed walkthrough of our polyglot processing pipeline.
- Advanced Features: Real-time progress tracking, error handling, and production considerations.
- Hands-On Testing: How to ingest documents and query your knowledge base.
Let's transform your documents into an intelligent AI assistant.
The Power of Intelligent Document Processing
At its core, our RAG agent solves a fundamental challenge: how do you make unstructured documents searchable and queryable by AI? Traditional approaches often involve complex, monolithic systems that are difficult to scale and maintain. Our Motia-powered solution breaks this down into discrete, event-driven steps that each handle a specific aspect of the pipeline.
The magic happens through the integration of three powerful technologies:
- Docling: Advanced PDF parsing with intelligent chunking that preserves document structure
- Weaviate: Cloud-native vector database with built-in OpenAI integration
- Motia: Event-driven framework that orchestrates the entire pipeline
Instead of a brittle, tightly-coupled system, we get a resilient architecture where each component can be scaled, modified, or replaced independently.
The Anatomy of Our RAG Pipeline
Our application consists of seven specialized steps, each handling a specific part of the document processing and querying workflow. Let's explore the complete architecture.
<Tabs items={['api-process-pdfs', 'init-weaviate', 'read-pdfs', 'process-pdfs', 'load-weaviate', 'api-query-rag']}>
```ts
import { Handlers } from 'motia'
import { z } from 'zod'
import { v4 as uuidv4 } from 'uuid'
export const config = {
type: 'api',
name: 'api-process-pdfs',
description: 'API endpoint to start PDF processing pipeline',
path: '/api/rag/process-pdfs',
method: 'POST',
emits: ['rag.read.pdfs'],
bodySchema: z.object({
folderPath: z.string().min(1, 'folderPath is required'),
}),
flows: ['rag-workflow'],
} as const
export const handler: Handlers['api-process-pdfs'] = async (req, { emit, logger }) => {
const { folderPath } = req.body
const streamId = uuidv4()
logger.info('Starting PDF processing pipeline', { folderPath, streamId })
// Emit event to start the processing chain
await emit({
topic: 'rag.read.pdfs',
data: { folderPath, streamId },
})
return {
status: 200,
body: {
message: 'PDF processing started',
streamId,
status: 'processing'
},
}
}
```
```ts
import weaviate, { WeaviateClient, vectorizer, generative } from 'weaviate-client'
import { EventConfig, Handlers } from 'motia'
import { z } from 'zod'
export const config: EventConfig = {
type: 'event',
name: 'init-weaviate',
subscribes: ['rag.read.pdfs'],
emits: [],
flows: ['rag-workflow'],
input: z.object({
folderPath: z.string(),
streamId: z.string().optional(),
}),
}
const WEAVIATE_SCHEMA = {
name: 'Books',
description: 'Document chunks with metadata',
vectorizers: vectorizer.text2VecOpenAI({
model: 'text-embedding-3-small',
sourceProperties: ['text'],
}),
generative: generative.openAI({
model: 'gpt-4o',
maxTokens: 4096,
}),
properties: [
{ name: 'text', dataType: 'text' as const },
{ name: 'title', dataType: 'text' as const },
{ name: 'source', dataType: 'text' as const },
{ name: 'page', dataType: 'number' as const },
],
}
export const handler: Handlers['init-weaviate'] = async (input, { logger }) => {
logger.info('Initializing Weaviate client')
const client = await weaviate.connectToWeaviateCloud(process.env.WEAVIATE_URL!, {
authCredentials: new weaviate.ApiKey(process.env.WEAVIATE_API_KEY!),
headers: { 'X-OpenAI-Api-Key': process.env.OPENAI_API_KEY! },
})
try {
const exists = await client.collections.get('Books').exists()
if (!exists) {
logger.info('Creating Books collection with OpenAI integration...')
await client.collections.create(WEAVIATE_SCHEMA)
logger.info('Collection created successfully')
} else {
logger.info('Books collection already exists')
}
} catch (error) {
logger.error('Error initializing Weaviate', { error })
throw error
} finally {
await client.close()
}
}
```
```ts
import { readdir } from 'fs/promises'
import { join, resolve, basename } from 'path'
import { EventConfig, Handlers } from 'motia'
import { z } from 'zod'
export const config: EventConfig = {
type: 'event',
name: 'read-pdfs',
flows: ['rag-workflow'],
subscribes: ['rag.read.pdfs'],
emits: [{ topic: 'rag.process.pdfs', label: 'Start processing PDFs' }],
input: z.object({
folderPath: z.string(),
streamId: z.string().optional(),
}),
}
export const handler: Handlers['read-pdfs'] = async (input, { emit, logger }) => {
const { folderPath: inputFolderPath, streamId } = input
logger.info(`Reading PDFs from folder: ${inputFolderPath}`)
// Intelligent path resolution to prevent ENOENT errors
const currentDirName = basename(process.cwd())
let resolvedFolderPath = resolve(inputFolderPath)
// Handle duplicated path segments
const duplicatedSegment = `${currentDirName}/${currentDirName}`
if (resolvedFolderPath.includes(duplicatedSegment)) {
resolvedFolderPath = resolvedFolderPath.replace(duplicatedSegment, currentDirName)
}
logger.info(`Resolved folder path: ${resolvedFolderPath}`)
try {
const files = await readdir(resolvedFolderPath)
const pdfFiles = files.filter((file) => file.endsWith('.pdf'))
logger.info(`Found ${pdfFiles.length} PDF files`)
const filesInfo = await Promise.all(
pdfFiles.map(async (pdfFile) => {
const filePath = join(resolvedFolderPath, pdfFile)
return {
filePath,
fileName: pdfFile,
}
})
)
await emit({
topic: 'rag.process.pdfs',
data: { files: filesInfo, streamId },
})
} catch (error) {
logger.error(`Failed to read PDFs from folder: ${resolvedFolderPath}`, { error })
throw error
}
}
```
```python
import json
import os
from pathlib import Path
from typing import Any, Dict, List
from docling.document_converter import DocumentConverter
from docling.chunking import HybridChunker
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import PdfFormatOption
def handler(input_data: Dict[str, Any], context: Dict[str, Any]) -> None:
"""Process PDFs using Docling with intelligent chunking"""
logger = context['logger']
emit = context['emit']
files = input_data.get('files', [])
stream_id = input_data.get('streamId')
logger.info(f"Processing {len(files)} PDF files with Docling")
# Configure Docling with optimized settings
pipeline_options = PdfPipelineOptions(
do_ocr=True,
do_table_structure=True,
table_structure_options={
"do_cell_matching": True,
}
)
doc_converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
}
)
# Initialize the hybrid chunker for intelligent document segmentation
chunker = HybridChunker(
tokenizer="cl100k_base",
max_tokens=512,
overlap_tokens=50,
heading_hierarchies=True,
split_by_page=False
)
all_chunks = []
for file_info in files:
file_path = file_info['filePath']
file_name = file_info['fileName']
logger.info(f"Processing file: {file_name}")
try:
# Convert PDF to structured document
result = doc_converter.convert(file_path)
doc = result.document
logger.info(f"Converted {file_name}: {len(doc.pages)} pages")
# Apply intelligent chunking
chunks = list(chunker.chunk(doc))
logger.info(f"Generated {len(chunks)} chunks for {file_name}")
# Prepare chunks for Weaviate
for i, chunk in enumerate(chunks):
chunk_data = {
'text': chunk.text,
'title': file_name,
'source': file_path,
'page': getattr(chunk, 'page_no', i + 1),
'chunk_id': f"{file_name}_chunk_{i}"
}
all_chunks.append(chunk_data)
except Exception as e:
logger.error(f"Error processing {file_name}: {str(e)}")
continue
logger.info(f"Total chunks generated: {len(all_chunks)}")
if all_chunks:
# Emit chunks for Weaviate ingestion
emit({
'topic': 'rag.load.weaviate',
'data': {
'chunks': all_chunks,
'streamId': stream_id,
'totalFiles': len(files),
'totalChunks': len(all_chunks)
}
})
else:
logger.warning("No chunks generated from PDF processing")
```
```ts
import weaviate from 'weaviate-client'
import { EventConfig, Handlers } from 'motia'
import { z } from 'zod'
const ChunkSchema = z.object({
text: z.string(),
title: z.string(),
source: z.string(),
page: z.number(),
chunk_id: z.string(),
})
export const config: EventConfig = {
type: 'event',
name: 'load-weaviate',
subscribes: ['rag.load.weaviate'],
emits: [],
flows: ['rag-workflow'],
input: z.object({
chunks: z.array(ChunkSchema),
streamId: z.string().optional(),
totalFiles: z.number().optional(),
totalChunks: z.number().optional(),
}),
}
export const handler: Handlers['load-weaviate'] = async (input, { logger }) => {
const { chunks, streamId, totalFiles, totalChunks } = input
logger.info('Loading chunks into Weaviate', {
chunkCount: chunks.length,
totalFiles,
totalChunks,
streamId
})
const client = await weaviate.connectToWeaviateCloud(process.env.WEAVIATE_URL!, {
authCredentials: new weaviate.ApiKey(process.env.WEAVIATE_API_KEY!),
headers: { 'X-OpenAI-Api-Key': process.env.OPENAI_API_KEY! },
})
try {
const collection = client.collections.get('Books')
const BATCH_SIZE = 100
// Process chunks in batches for optimal performance
for (let i = 0; i < chunks.length; i += BATCH_SIZE) {
const batch = chunks.slice(i, i + BATCH_SIZE)
const batchNumber = Math.floor(i / BATCH_SIZE) + 1
const totalBatches = Math.ceil(chunks.length / BATCH_SIZE)
logger.info(`Inserting batch ${batchNumber}/${totalBatches}`, {
batchSize: batch.length,
streamId
})
const objects = batch.map(chunk => ({
properties: {
text: chunk.text,
title: chunk.title,
source: chunk.source,
page: chunk.page,
}
}))
const result = await collection.data.insertMany(objects)
if (result.hasErrors) {
logger.error('Batch insertion had errors', {
errors: result.errors,
batchNumber,
streamId
})
} else {
logger.info(`Successfully inserted batch ${batchNumber}/${totalBatches}`)
}
}
logger.info('Successfully loaded all chunks into Weaviate', {
totalChunks: chunks.length,
streamId
})
} catch (error) {
logger.error('Error loading chunks into Weaviate', { error, streamId })
throw error
} finally {
await client.close()
}
}
```
```ts
import weaviate from 'weaviate-client'
import { Handlers } from 'motia'
import { z } from 'zod'
const RAGResponse = z.object({
answer: z.string(),
chunks: z.array(z.object({
text: z.string(),
title: z.string(),
source: z.string(),
page: z.number(),
})),
query: z.string(),
timestamp: z.string(),
})
export const config = {
type: 'api',
name: 'api-query-rag',
description: 'Query the RAG system for answers',
path: '/api/rag/query',
method: 'POST',
emits: [],
bodySchema: z.object({
query: z.string().min(1, 'Query is required'),
limit: z.number().min(1).max(10).default(3),
}),
flows: ['rag-workflow'],
} as const
export const handler: Handlers['api-query-rag'] = async (req, { logger }) => {
const { query, limit = 3 } = req.body
logger.info('Processing RAG query', { query, limit })
const client = await weaviate.connectToWeaviateCloud(process.env.WEAVIATE_URL!, {
authCredentials: new weaviate.ApiKey(process.env.WEAVIATE_API_KEY!),
headers: { 'X-OpenAI-Api-Key': process.env.OPENAI_API_KEY! },
})
try {
const collection = client.collections.get('Books')
// Perform semantic search with AI generation
const results = await collection.generate.nearText(
query,
{ limit, distance: 0.6 },
{
singlePrompt: `Answer this question based on the provided context: ${query}.
Be specific and cite the sources when possible.`
}
)
// Extract the generated answer and source chunks
const generatedAnswer = results.generated || 'No answer could be generated.'
const chunks = results.objects.map(obj => ({
text: obj.properties.text as string,
title: obj.properties.title as string,
source: obj.properties.source as string,
page: obj.properties.page as number,
}))
const response = RAGResponse.parse({
answer: generatedAnswer,
chunks,
query,
timestamp: new Date().toISOString(),
})
logger.info('RAG query completed successfully', {
query,
chunksFound: chunks.length,
answerLength: generatedAnswer.length
})
return {
status: 200,
body: response,
}
} catch (error) {
logger.error('Error processing RAG query', { error, query })
return {
status: 500,
body: { error: 'Failed to process query' },
}
} finally {
await client.close()
}
}
```
Explore the Workbench
The Motia Workbench provides a visual representation of your RAG pipeline, making it easy to understand the flow and debug any issues.
You can monitor real-time processing, view logs, and trace the execution of each step directly in the Workbench interface. This makes development and debugging significantly easier compared to traditional monolithic approaches.
Key Features & Benefits
🚀 Event-Driven Architecture
Each step is independent and communicates through events, making the system highly scalable and maintainable.
🧠 Intelligent Document Processing
Docling's hybrid chunking preserves document structure while creating optimal chunks for embedding.
⚡ High-Performance Vector Search
Weaviate's cloud-native architecture provides fast, scalable similarity search with built-in OpenAI integration.
🔄 Real-Time Progress Tracking
Monitor document processing progress with detailed logging and status updates.
🌐 Polyglot Support
Seamlessly combine Python (Docling) and TypeScript (orchestration) in a single workflow.
🛡️ Production-Ready
Built-in error handling, batch processing, and resource cleanup ensure reliability.
Trying It Out
Ready to build your own intelligent document assistant? Let's get the system running.
Install Dependencies
Install both Node.js and Python dependencies. The prepare script automatically sets up the Python virtual environment.
npm install
Set Your Environment Variables
You'll need API keys for OpenAI and Weaviate Cloud. Create a .env file:
OPENAI_API_KEY="sk-..."
WEAVIATE_URL="https://your-cluster.weaviate.network"
WEAVIATE_API_KEY="your-weaviate-api-key"
Run the Project
Start the Motia development server to begin processing documents.
npm run dev
Process Your First Documents
Add some PDF files to the docs/pdfs/ folder, then start the ingestion pipeline:
curl -X POST http://localhost:3000/api/rag/process-pdfs \
-H "Content-Type: application/json" \
-d '{"folderPath":"docs/pdfs"}'
Watch the logs as your documents are processed through the pipeline:
- PDF Reading: Files are discovered and queued
- Docling Processing: Intelligent chunking with structure preservation
- Weaviate Loading: Chunks are embedded and stored
Query Your Knowledge Base
Once processing is complete, you can ask questions about your documents:
General Query
curl -X POST http://localhost:3000/api/rag/query \
-H "Content-Type: application/json" \
-d '{"query":"What are the main topics covered in these documents?","limit":3}'
Specific Question
curl -X POST http://localhost:3000/api/rag/query \
-H "Content-Type: application/json" \
-d '{"query":"What methodology was used in the research?","limit":5}'
The response includes both a generated answer and the source chunks with page numbers for verification.
Advanced Usage
Custom Chunking Strategies
Modify the Python processing step to implement custom chunking logic:
# In process-pdfs.step.py
chunker = HybridChunker(
tokenizer="cl100k_base",
max_tokens=1024, # Larger chunks for more context
overlap_tokens=100, # More overlap for better continuity
heading_hierarchies=True,
split_by_page=True # Preserve page boundaries
)
Batch Processing Optimization
Adjust batch sizes in the Weaviate loading step for optimal performance:
// In load-weaviate.step.ts
const BATCH_SIZE = 50 // Smaller batches for large documents
Multi-Collection Support
Extend the system to handle different document types by creating separate Weaviate collections:
const COLLECTIONS = {
research: 'ResearchPapers',
manuals: 'TechnicalManuals',
reports: 'BusinessReports'
}
Troubleshooting
Common Issues
ENOENT Path Errors: The system automatically handles path normalization, but ensure your folderPath is relative to the project root.
Empty Answers: Check that documents were successfully processed by examining the logs. Verify your OpenAI API key is valid.
Weaviate Connection Issues: Ensure your WEAVIATE_URL and WEAVIATE_API_KEY are correct and your cluster is running.
Performance Tips
- Document Size: For large PDFs, consider preprocessing to split them into smaller files
- Batch Size: Adjust the Weaviate batch size based on your cluster's capacity
- Chunking Strategy: Experiment with different chunk sizes and overlap for your specific use case
💻 Dive into the Code
Want to explore the complete RAG implementation? Check out the full source code, including all steps, configuration files, and setup instructions:
Complete RAG Implementation
Access the full source code for this RAG agent, including Python processing scripts, TypeScript orchestration, and production configuration.
Conclusion: The Future of Document Intelligence
This RAG system demonstrates the power of combining best-in-class technologies with Motia's event-driven architecture. By breaking down complex document processing into discrete, manageable steps, we've created a system that's not only powerful but also maintainable and scalable.
The polyglot nature of the solution: Python for document processing, TypeScript for orchestration, shows how Motia enables you to use the right tool for each job without sacrificing integration or maintainability.
From here, you can extend the system by:
- Adding support for other document formats (Word, PowerPoint, etc.)
- Implementing document classification and routing
- Adding real-time document updates and synchronization
- Building a web interface for document management
- Integrating with existing business systems
The event-driven architecture makes all of these extensions straightforward to implement without disrupting the existing pipeline.
Ready to transform your documents into intelligent, queryable knowledge bases? Start building with Motia today!