Skip to main content

ADR-031: Document Pre-Processing Pipeline

Status

PROPOSED

Date

2026-01-15

Context

Raw documents contain significant noise that wastes LLM tokens:

  • Headers, footers, page numbers
  • Boilerplate legal text
  • Duplicate content across documents
  • Irrelevant sections for the analysis task

Research shows pre-processing can reduce token consumption by 60-80% while improving extraction quality. The key insight from Pieces.app: minimal pre-processing is better than aggressive pre-processing to avoid introducing hallucinations.

Token Economics Impact

StageToken ReductionQuality Impact
Format parsing10-20%Positive (cleaner text)
Deduplication10-30%Positive (no redundancy)
Keyword filtering50-80%Depends on filter accuracy
Extractive summary60-90%Risk of information loss
OCR cleanupVariableCritical for accuracy

At 15x multi-agent multiplier, a 70% token reduction saves $10.50 per $15 in API costs.

Decision

Implement a Document Pre-Processing Pipeline with configurable stages and conservative defaults.

Pipeline Architecture

┌─────────────────────────────────────────────────────────────────┐
│ DOCUMENT PRE-PROCESSING PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ INPUT: Raw Documents (PDF, DOCX, TXT, images) │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ STAGE 1: FORMAT DETECTION & PARSING │ │
│ │ • Detect document type (PDF/DOCX/TXT/Image) │ │
│ │ • Extract text with layout preservation │ │
│ │ • Handle tables, lists, structured content │ │
│ │ • OCR for images/scanned PDFs │ │
│ │ Token Impact: -10-20% │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ STAGE 2: NOISE REMOVAL │ │
│ │ • Remove headers/footers/page numbers │ │
│ │ • Strip boilerplate (confidentiality notices) │ │
│ │ • Clean OCR artifacts │ │
│ │ • Normalize whitespace │ │
│ │ Token Impact: -5-15% │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ STAGE 3: ENTITY EXTRACTION (Traditional NLP) │ │
│ │ • Named Entity Recognition (spaCy/stanza) │ │
│ │ • Date/number normalization │ │
│ │ • Key term extraction (TF-IDF) │ │
│ │ • Output: Structured metadata │ │
│ │ Token Impact: Metadata only, no reduction │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ STAGE 4: DEDUPLICATION │ │
│ │ • Document-level dedup (hash-based) │ │
│ │ • Paragraph-level dedup (MinHash/SimHash) │ │
│ │ • Cross-document redundancy removal │ │
│ │ Token Impact: -10-30% │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ STAGE 5: RELEVANCE FILTERING (Optional) │ │
│ │ • Keyword-based section filtering │ │
│ │ • Semantic relevance scoring │ │
│ │ • Configurable threshold │ │
│ │ Token Impact: -50-80% (if enabled) │ │
│ │ ⚠️ Risk: May filter relevant content │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ STAGE 6: SEMANTIC CHUNKING │ │
│ │ • Detect semantic boundaries │ │
│ │ • Respect section/paragraph structure │ │
│ │ • Configurable chunk size with overlap │ │
│ │ • Output: Chunk metadata (positions, relations) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
│ OUTPUT: ProcessedDocument with metadata and chunks │
│ │
└─────────────────────────────────────────────────────────────────┘

Implementation

# /coditect/preprocessing/pipeline.py

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import spacy
from sklearn.feature_extraction.text import TfidfVectorizer
import hashlib

class PreProcessingStage(Enum):
FORMAT_PARSING = "format_parsing"
NOISE_REMOVAL = "noise_removal"
ENTITY_EXTRACTION = "entity_extraction"
DEDUPLICATION = "deduplication"
RELEVANCE_FILTERING = "relevance_filtering"
SEMANTIC_CHUNKING = "semantic_chunking"

@dataclass
class PreProcessingConfig:
"""Configuration for pre-processing pipeline"""

# Stage enablement
enabled_stages: List[PreProcessingStage] = field(default_factory=lambda: [
PreProcessingStage.FORMAT_PARSING,
PreProcessingStage.NOISE_REMOVAL,
PreProcessingStage.ENTITY_EXTRACTION,
PreProcessingStage.DEDUPLICATION,
PreProcessingStage.SEMANTIC_CHUNKING
])

# Format parsing
ocr_enabled: bool = True
ocr_language: str = "eng"
preserve_tables: bool = True

# Noise removal
remove_headers_footers: bool = True
remove_page_numbers: bool = True
boilerplate_patterns: List[str] = field(default_factory=lambda: [
r"CONFIDENTIAL",
r"Page \d+ of \d+",
r"©\s*\d{4}",
])

# Entity extraction
ner_model: str = "en_core_web_lg"
extract_entities: List[str] = field(default_factory=lambda: [
"PERSON", "ORG", "DATE", "MONEY", "PRODUCT"
])

# Deduplication
dedup_similarity_threshold: float = 0.9
dedup_algorithm: str = "minhash" # minhash|simhash|exact

# Relevance filtering (disabled by default - high risk)
relevance_filtering_enabled: bool = False
relevance_keywords: List[str] = field(default_factory=list)
relevance_threshold: float = 0.3

# Chunking
chunk_size: int = 2000 # tokens
chunk_overlap: int = 200 # tokens
respect_boundaries: bool = True # Don't split mid-sentence

@dataclass
class ProcessedDocument:
"""Output of pre-processing pipeline"""
document_id: str
original_path: str

# Content
cleaned_text: str
chunks: List['Chunk']

# Metadata
entities: Dict[str, List[str]]
key_terms: List[str]

# Statistics
original_tokens: int
processed_tokens: int
reduction_ratio: float

# Processing info
stages_applied: List[PreProcessingStage]
warnings: List[str]

# Dedup info
duplicate_of: Optional[str] = None
duplicate_paragraphs_removed: int = 0

@dataclass
class Chunk:
"""Semantic chunk of processed document"""
chunk_id: str
document_id: str
content: str
token_count: int

# Position
start_char: int
end_char: int
chunk_index: int
total_chunks: int

# Metadata
entities: Dict[str, List[str]]
key_terms: List[str]

# Relations
previous_chunk_id: Optional[str] = None
next_chunk_id: Optional[str] = None

class PreProcessingPipeline:
"""
Document pre-processing pipeline for token reduction.

Design principles:
1. Minimal intervention (avoid over-processing)
2. Preserve original meaning
3. Track all transformations
4. Configurable per use case
"""

def __init__(self, config: PreProcessingConfig):
self.config = config
self.nlp = spacy.load(config.ner_model)
self.tfidf = TfidfVectorizer(max_features=100)
self.deduplicator = Deduplicator(
threshold=config.dedup_similarity_threshold,
algorithm=config.dedup_algorithm
)

async def process(
self,
documents: List['RawDocument']
) -> List[ProcessedDocument]:
"""
Process documents through configured pipeline stages.
"""
results = []

for doc in documents:
try:
processed = await self._process_single(doc)
results.append(processed)
except Exception as e:
# Log error but continue processing
results.append(self._create_error_result(doc, e))

# Cross-document deduplication
if PreProcessingStage.DEDUPLICATION in self.config.enabled_stages:
results = await self._cross_document_dedup(results)

return results

async def _process_single(
self,
doc: 'RawDocument'
) -> ProcessedDocument:
"""Process single document through pipeline"""

warnings = []
stages_applied = []
text = doc.content
original_tokens = self._count_tokens(text)

# Stage 1: Format Parsing
if PreProcessingStage.FORMAT_PARSING in self.config.enabled_stages:
text, parse_warnings = await self._parse_format(doc)
warnings.extend(parse_warnings)
stages_applied.append(PreProcessingStage.FORMAT_PARSING)

# Stage 2: Noise Removal
if PreProcessingStage.NOISE_REMOVAL in self.config.enabled_stages:
text, noise_warnings = self._remove_noise(text)
warnings.extend(noise_warnings)
stages_applied.append(PreProcessingStage.NOISE_REMOVAL)

# Stage 3: Entity Extraction
entities = {}
key_terms = []
if PreProcessingStage.ENTITY_EXTRACTION in self.config.enabled_stages:
entities = self._extract_entities(text)
key_terms = self._extract_key_terms(text)
stages_applied.append(PreProcessingStage.ENTITY_EXTRACTION)

# Stage 4: Paragraph Deduplication (within document)
dedup_count = 0
if PreProcessingStage.DEDUPLICATION in self.config.enabled_stages:
text, dedup_count = self._deduplicate_paragraphs(text)
stages_applied.append(PreProcessingStage.DEDUPLICATION)

# Stage 5: Relevance Filtering (optional, risky)
if (PreProcessingStage.RELEVANCE_FILTERING in self.config.enabled_stages
and self.config.relevance_filtering_enabled):
text, filter_warnings = self._filter_by_relevance(text)
warnings.extend(filter_warnings)
stages_applied.append(PreProcessingStage.RELEVANCE_FILTERING)

# Stage 6: Semantic Chunking
chunks = []
if PreProcessingStage.SEMANTIC_CHUNKING in self.config.enabled_stages:
chunks = self._create_chunks(text, doc.id, entities, key_terms)
stages_applied.append(PreProcessingStage.SEMANTIC_CHUNKING)

processed_tokens = self._count_tokens(text)

return ProcessedDocument(
document_id=doc.id,
original_path=doc.path,
cleaned_text=text,
chunks=chunks,
entities=entities,
key_terms=key_terms,
original_tokens=original_tokens,
processed_tokens=processed_tokens,
reduction_ratio=1 - (processed_tokens / original_tokens) if original_tokens > 0 else 0,
stages_applied=stages_applied,
warnings=warnings,
duplicate_paragraphs_removed=dedup_count
)

# ==================== Stage Implementations ====================

async def _parse_format(
self,
doc: 'RawDocument'
) -> tuple[str, List[str]]:
"""Stage 1: Parse document format and extract text"""

warnings = []

if doc.format == "pdf":
text, pdf_warnings = await self._parse_pdf(doc)
warnings.extend(pdf_warnings)
elif doc.format == "docx":
text = self._parse_docx(doc)
elif doc.format == "txt":
text = doc.content
elif doc.format in ["png", "jpg", "jpeg", "tiff"]:
if self.config.ocr_enabled:
text, ocr_warnings = await self._ocr_image(doc)
warnings.extend(ocr_warnings)
else:
warnings.append(f"OCR disabled, skipping image: {doc.path}")
text = ""
else:
warnings.append(f"Unknown format: {doc.format}")
text = doc.content

return text, warnings

async def _parse_pdf(
self,
doc: 'RawDocument'
) -> tuple[str, List[str]]:
"""Parse PDF with optional OCR for scanned pages"""

import pymupdf # PyMuPDF

warnings = []
pages = []

pdf = pymupdf.open(doc.path)

for page_num, page in enumerate(pdf):
# Try text extraction first
text = page.get_text()

# If page appears scanned (low text density), try OCR
if len(text.strip()) < 100 and self.config.ocr_enabled:
warnings.append(f"Page {page_num + 1} appears scanned, applying OCR")
pix = page.get_pixmap()
img_bytes = pix.tobytes("png")
text, _ = await self._ocr_bytes(img_bytes)

pages.append(text)

return "\n\n".join(pages), warnings

def _remove_noise(self, text: str) -> tuple[str, List[str]]:
"""Stage 2: Remove noise patterns"""

import re
warnings = []
original_length = len(text)

# Remove boilerplate patterns
for pattern in self.config.boilerplate_patterns:
text = re.sub(pattern, "", text, flags=re.IGNORECASE)

# Remove page numbers
if self.config.remove_page_numbers:
text = re.sub(r'\n\s*\d+\s*\n', '\n', text)
text = re.sub(r'Page\s+\d+\s*(of\s+\d+)?', '', text, flags=re.IGNORECASE)

# Normalize whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)

removed_chars = original_length - len(text)
if removed_chars > original_length * 0.3:
warnings.append(f"Removed {removed_chars} chars ({removed_chars/original_length:.1%})")

return text.strip(), warnings

def _extract_entities(self, text: str) -> Dict[str, List[str]]:
"""Stage 3a: Named Entity Recognition"""

doc = self.nlp(text[:100000]) # Limit for performance

entities = {}
for ent in doc.ents:
if ent.label_ in self.config.extract_entities:
if ent.label_ not in entities:
entities[ent.label_] = []
if ent.text not in entities[ent.label_]:
entities[ent.label_].append(ent.text)

return entities

def _extract_key_terms(self, text: str) -> List[str]:
"""Stage 3b: TF-IDF key term extraction"""

# Fit TF-IDF on single document
try:
tfidf_matrix = self.tfidf.fit_transform([text])
feature_names = self.tfidf.get_feature_names_out()

# Get top terms by TF-IDF score
scores = tfidf_matrix.toarray()[0]
top_indices = scores.argsort()[-20:][::-1]

return [feature_names[i] for i in top_indices]
except Exception:
return []

def _deduplicate_paragraphs(self, text: str) -> tuple[str, int]:
"""Stage 4: Remove duplicate paragraphs within document"""

paragraphs = text.split('\n\n')
seen_hashes = set()
unique_paragraphs = []
removed_count = 0

for para in paragraphs:
para_hash = hashlib.md5(para.strip().lower().encode()).hexdigest()

if para_hash not in seen_hashes:
seen_hashes.add(para_hash)
unique_paragraphs.append(para)
else:
removed_count += 1

return '\n\n'.join(unique_paragraphs), removed_count

def _filter_by_relevance(self, text: str) -> tuple[str, List[str]]:
"""Stage 5: Filter sections by keyword relevance (RISKY)"""

warnings = []

if not self.config.relevance_keywords:
warnings.append("No relevance keywords specified, skipping filter")
return text, warnings

paragraphs = text.split('\n\n')
relevant_paragraphs = []
filtered_count = 0

keywords_lower = [kw.lower() for kw in self.config.relevance_keywords]

for para in paragraphs:
para_lower = para.lower()

# Check if any keyword appears
relevance_score = sum(
1 for kw in keywords_lower if kw in para_lower
) / len(keywords_lower)

if relevance_score >= self.config.relevance_threshold:
relevant_paragraphs.append(para)
else:
filtered_count += 1

if filtered_count > len(paragraphs) * 0.5:
warnings.append(
f"⚠️ Filtered {filtered_count}/{len(paragraphs)} paragraphs - "
"may have lost relevant content"
)

return '\n\n'.join(relevant_paragraphs), warnings

def _create_chunks(
self,
text: str,
document_id: str,
entities: Dict[str, List[str]],
key_terms: List[str]
) -> List[Chunk]:
"""Stage 6: Create semantic chunks"""

chunks = []

# Split by semantic boundaries (paragraphs, sections)
if self.config.respect_boundaries:
segments = self._split_by_boundaries(text)
else:
segments = [text]

current_chunk = ""
current_start = 0
chunk_index = 0

for segment in segments:
segment_tokens = self._count_tokens(segment)
current_tokens = self._count_tokens(current_chunk)

if current_tokens + segment_tokens <= self.config.chunk_size:
current_chunk += "\n\n" + segment if current_chunk else segment
else:
# Save current chunk
if current_chunk:
chunks.append(self._create_chunk(
content=current_chunk,
document_id=document_id,
start_char=current_start,
chunk_index=chunk_index,
entities=entities,
key_terms=key_terms
))
chunk_index += 1

# Start new chunk with overlap
if self.config.chunk_overlap > 0:
overlap_text = self._get_overlap(current_chunk)
current_chunk = overlap_text + "\n\n" + segment
else:
current_chunk = segment

current_start = text.find(segment)

# Save final chunk
if current_chunk:
chunks.append(self._create_chunk(
content=current_chunk,
document_id=document_id,
start_char=current_start,
chunk_index=chunk_index,
entities=entities,
key_terms=key_terms
))

# Set total_chunks and link chunks
for i, chunk in enumerate(chunks):
chunk.total_chunks = len(chunks)
if i > 0:
chunk.previous_chunk_id = chunks[i-1].chunk_id
if i < len(chunks) - 1:
chunk.next_chunk_id = chunks[i+1].chunk_id

return chunks

def _split_by_boundaries(self, text: str) -> List[str]:
"""Split text respecting semantic boundaries"""

# Split by double newlines (paragraphs)
paragraphs = text.split('\n\n')

# Further split large paragraphs by sentences
segments = []
for para in paragraphs:
if self._count_tokens(para) > self.config.chunk_size:
# Split by sentences
doc = self.nlp(para)
sentences = [sent.text for sent in doc.sents]
segments.extend(sentences)
else:
segments.append(para)

return segments

async def _cross_document_dedup(
self,
documents: List[ProcessedDocument]
) -> List[ProcessedDocument]:
"""Cross-document deduplication"""

# Build hash index
doc_hashes = {}
for doc in documents:
doc_hash = hashlib.md5(doc.cleaned_text.encode()).hexdigest()

if doc_hash in doc_hashes:
# Mark as duplicate
doc.duplicate_of = doc_hashes[doc_hash]
else:
doc_hashes[doc_hash] = doc.document_id

return documents

# ==================== Utilities ====================

def _count_tokens(self, text: str) -> int:
"""Approximate token count"""
# Rough approximation: 1 token ≈ 4 characters
return len(text) // 4

def _create_chunk(
self,
content: str,
document_id: str,
start_char: int,
chunk_index: int,
entities: Dict[str, List[str]],
key_terms: List[str]
) -> Chunk:
"""Create chunk with metadata"""

chunk_id = f"{document_id}_chunk_{chunk_index}"

# Extract entities/terms specific to this chunk
chunk_entities = self._extract_entities(content)
chunk_terms = self._extract_key_terms(content)

return Chunk(
chunk_id=chunk_id,
document_id=document_id,
content=content,
token_count=self._count_tokens(content),
start_char=start_char,
end_char=start_char + len(content),
chunk_index=chunk_index,
total_chunks=0, # Set later
entities=chunk_entities,
key_terms=chunk_terms
)

CLI Commands

# Process documents with default settings
coditect preprocess run \
--input ./raw_documents/ \
--output ./processed/ \
--format json

# Process with custom config
coditect preprocess run \
--input ./raw_documents/ \
--output ./processed/ \
--config ./preprocess_config.yaml \
--enable-ocr \
--chunk-size 3000

# Analyze token reduction
coditect preprocess analyze \
--input ./raw_documents/ \
--report ./reduction_report.json

# Preview without writing
coditect preprocess preview \
--input ./document.pdf \
--show-entities \
--show-key-terms

Configuration File

# preprocess_config.yaml

enabled_stages:
- format_parsing
- noise_removal
- entity_extraction
- deduplication
- semantic_chunking
# - relevance_filtering # Disabled by default (risky)

format_parsing:
ocr_enabled: true
ocr_language: eng
preserve_tables: true

noise_removal:
remove_headers_footers: true
remove_page_numbers: true
boilerplate_patterns:
- "CONFIDENTIAL"
- "Page \\d+ of \\d+"
- "©\\s*\\d{4}"
- "DRAFT"
- "DO NOT DISTRIBUTE"

entity_extraction:
ner_model: en_core_web_lg
extract_entities:
- PERSON
- ORG
- DATE
- MONEY
- PRODUCT
- GPE # Geopolitical entities

deduplication:
similarity_threshold: 0.9
algorithm: minhash # minhash|simhash|exact

relevance_filtering:
enabled: false # Enable with caution
keywords: []
threshold: 0.3

semantic_chunking:
chunk_size: 2000 # tokens
chunk_overlap: 200 # tokens
respect_boundaries: true

Consequences

Positive

  • 60-80% token reduction with conservative settings
  • Improved extraction quality from cleaner text
  • Structured metadata (entities, terms) aids downstream processing
  • Configurable per use case

Negative

  • Processing latency (especially OCR)
  • Risk of information loss with aggressive filtering
  • OCR accuracy varies with document quality
  • Additional dependencies (spaCy, OCR libraries)

Metrics

MetricTargetMeasurement
Token reduction50-70%(original - processed) / original
Processing speed>10 docs/minDocuments processed per minute
OCR accuracy>95%Manual sample validation
Entity precision>90%Correctly identified entities
  • ADR-027: Hybrid Document Processing Architecture (parent)
  • ADR-028: Map-Reduce Agent Orchestration (uses processed output)
  • ADR-029: Hierarchical Knowledge Store (receives chunks)