Skip to main content

Ingestion Pipeline (CODITECT Integration)

Pipeline integrated with the existing CODITECT DMS API for document processing and compliance metadata.

Pipeline Overview

┌──────────────────────────────────────────────────────────────┐
│ 1. Upload via /api/v1/documents/upload │
│ - File received, hash computed │
│ - Initial record created with status='pending' │
└─────────────────────────┬────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 2. Processor Worker (Background) │
│ - Parse Markdown (frontmatter + content) │
│ - Extract compliance metadata │
│ - Compute retention from policy table │
└─────────────────────────┬────────────────────────────────────┘

┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Chunk + │ │ Upsert │ │ WORM Archive │
│ Embed │ │ Metadata │ │ (Optional) │
│ (pgvector) │ │ (Postgres) │ │ │
└───────┬───────┘ └───────┬───────┘ └───────────────┘
│ │
└────────┬────────┘

┌──────────────────────────────────────────────────────────────┐
│ 3. Index in Meilisearch │
│ - Full text + compliance filters │
└─────────────────────────┬────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 4. Update status = 'processed' │
│ - Audit log entry │
└──────────────────────────────────────────────────────────────┘

Step 1: Upload Handler

Extend the existing upload endpoint:

@app.post("/api/v1/documents/upload")
async def upload_document(file: UploadFile = File(...)):
"""Upload a document file."""
# Existing: Store file, compute hash
file_path = await store_file(file)
file_hash = compute_hash(file_path)

# Create document record
document = await db.execute("""
INSERT INTO documents
(id, filename, filepath, mime_type, file_size, file_hash,
status, document_type, title, version, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, 'pending', 'reference', $7, '1.0', now(), now())
RETURNING *
""", [uuid4(), file.filename, file_path, file.content_type,
file.size, file_hash, file.filename])

# Queue for processing
await task_queue.enqueue('process_document', document.id)

return DocumentResponse.from_orm(document)

Step 2: Processor Worker

async def process_document(doc_id: UUID):
"""Process uploaded document with compliance extraction."""
document = await db.fetch_one(
"SELECT * FROM documents WHERE id = $1", [doc_id]
)

try:
# Parse Markdown
content = read_file(document.filepath)
parsed = parse_markdown(content)

# Extract compliance metadata from frontmatter
metadata = extract_compliance_metadata(parsed['frontmatter'])

# Compute retention
policy = await db.fetch_one(
"SELECT * FROM retention_policies WHERE retention_category = $1",
[metadata['retention_category']]
)
metadata['retain_until'] = compute_retain_until(
metadata['effective_date'], policy
)

# Chunk and embed (existing logic)
chunks = chunk_content(parsed['body'])
embeddings = await generate_embeddings(chunks)

# Store chunks
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
await db.execute("""
INSERT INTO document_chunks (id, document_id, index, content, embedding, created_at)
VALUES ($1, $2, $3, $4, $5, now())
""", [uuid4(), doc_id, i, chunk, embedding])

# Upsert compliance metadata
await upsert_compliance_metadata(doc_id, metadata)

# Update document
await db.execute("""
UPDATE documents SET
title = $1,
summary = $2,
chunk_count = $3,
status = 'processed',
updated_at = now()
WHERE id = $4
""", [metadata['title'], metadata.get('summary'), len(chunks), doc_id])

# Index in Meilisearch
await sync_to_meilisearch(document, metadata)

# Audit log
await log_event('document_processed', doc_id)

except Exception as e:
await db.execute("""
UPDATE documents SET
status = 'error',
processing_error = $1,
updated_at = now()
WHERE id = $2
""", [str(e), doc_id])
raise

Step 3: Compliance Metadata Extraction

def extract_compliance_metadata(frontmatter: dict) -> dict:
"""Extract and validate compliance metadata from frontmatter."""
required_fields = ['title', 'domain', 'document_type', 'retention_category']
for field in required_fields:
if field not in frontmatter:
raise ValueError(f"Missing required field: {field}")

return {
'title': frontmatter['title'],
'summary': frontmatter.get('summary'),
'domain': frontmatter['domain'],
'document_type': frontmatter['document_type'],
'jurisdiction': frontmatter.get('jurisdiction', ['US']),
'regulations': frontmatter.get('regulations', []),
'security_class': frontmatter.get('security_class', 'internal'),
'contains_phi': frontmatter.get('contains_phi', False),
'contains_pii': frontmatter.get('contains_pii', False),
'contains_financial': frontmatter.get('contains_financial', False),
'status': frontmatter.get('status', 'draft'),
'effective_date': parse_date(frontmatter.get('effective_date')),
'review_due_date': parse_date(frontmatter.get('review_due_date')),
'retention_category': frontmatter['retention_category'],
'business_unit': frontmatter.get('business_unit'),
'facility': frontmatter.get('facility'),
'owner_user_id': frontmatter.get('owner_user_id', 'system'),
'owner_role': frontmatter.get('owner_role', 'unassigned')
}

Step 4: Meilisearch Sync

async def sync_to_meilisearch(document, metadata):
"""Sync document to Meilisearch."""
client = meilisearch.Client(MEILISEARCH_URL, MEILISEARCH_KEY)
index = client.index('documents')

# Get full text content
content = read_file(document.filepath)
parsed = parse_markdown(content)

doc = {
'id': str(document.id),
'title': metadata['title'],
'summary': metadata.get('summary', ''),
'body': parsed['plain_text'],
'keywords': list(document.keywords or []),
'tags': list(document.tags or []),
'document_type': metadata['document_type'],
'domain': metadata['domain'],
'jurisdiction': metadata['jurisdiction'],
'regulations': metadata['regulations'],
'security_class': metadata['security_class'],
'contains_phi': metadata['contains_phi'],
'contains_pii': metadata['contains_pii'],
'contains_financial': metadata['contains_financial'],
'status': metadata['status'],
'retention_category': metadata['retention_category'],
'business_unit': metadata.get('business_unit'),
'facility': metadata.get('facility'),
'owner_role': metadata['owner_role'],
'owner_user_id': metadata['owner_user_id'],
'effective_date': metadata['effective_date'].isoformat() if metadata['effective_date'] else None,
'review_due_date': metadata['review_due_date'].isoformat() if metadata['review_due_date'] else None,
'retain_until': metadata['retain_until'].isoformat(),
'created_at': document.created_at.isoformat(),
'updated_at': document.updated_at.isoformat()
}

index.add_documents([doc])

Search Orchestration

Integrate with /api/v1/search/hybrid:

@app.post("/api/v1/search/hybrid")
async def hybrid_search(request: SearchRequest):
"""Hybrid search with compliance filtering."""

# 1. Vector search via pgvector
embedding = await generate_embedding(request.query)
vector_results = await db.fetch_all("""
SELECT document_id, 1 - (embedding <=> $1) as score
FROM document_chunks
ORDER BY embedding <=> $1
LIMIT 100
""", [embedding])

candidate_ids = [r['document_id'] for r in vector_results]

# 2. Build Meilisearch filter from request
filter_expr = build_filter_expression(request.filters)

# 3. Text search with filters
meilisearch_results = meilisearch_client.index('documents').search(
request.query,
{
'filter': filter_expr,
'limit': 100,
'attributesToHighlight': ['title', 'body']
}
)

# 4. RRF fusion
combined = reciprocal_rank_fusion(vector_results, meilisearch_results)

# 5. Return top results
return SearchResponse(
results=combined[:request.limit],
total=len(combined),
processing_time_ms=elapsed
)

References