Ingestion Pipeline (Markdown → Postgres + Meilisearch)
High-level, idempotent pipeline for processing Markdown documents into the compliance system.
Pipeline Overview
┌──────────────────┐
│ 1. Discovery │ Git commits / filesystem events
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 2. Parse │ Frontmatter + Markdown rendering
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 3. Enrich │ Retention lookup, normalization
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 4. WORM Write │ Immutable archive
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 5. DB Upsert │ PostgreSQL metadata
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 6. Index │ Meilisearch update
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 7. Audit Log │ Record ingestion event
└──────────────────┘
Step 1: Discovery
From Git
import subprocess
import json
def get_changed_files(since_commit):
"""Get changed .md files since a commit."""
result = subprocess.run(
['git', 'diff', '--name-only', since_commit, 'HEAD', '--', '*.md'],
capture_output=True, text=True
)
return result.stdout.strip().split('\n')
# Example: Get files changed in last commit
changed_files = get_changed_files('HEAD~1')
From Filesystem Events
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MarkdownHandler(FileSystemEventHandler):
def on_modified(self, event):
if event.src_path.endswith('.md'):
process_file(event.src_path)
observer = Observer()
observer.schedule(MarkdownHandler(), path='./documents', recursive=True)
observer.start()
Step 2: Parse Markdown
import yaml
import hashlib
from markdown import Markdown
from io import StringIO
def parse_markdown(file_path):
"""Parse Markdown file extracting frontmatter and content."""
with open(file_path, 'r') as f:
content = f.read()
# Split frontmatter and body
if content.startswith('---'):
parts = content.split('---', 2)
frontmatter = yaml.safe_load(parts[1])
body = parts[2].strip()
else:
frontmatter = {}
body = content
# Compute content hash
content_hash = hashlib.sha256(content.encode()).hexdigest()
# Extract headings
md = Markdown()
html = md.convert(body)
headings = extract_headings(body)
# Render to plain text
plain_text = strip_markdown(body)
return {
'frontmatter': frontmatter,
'body': body,
'plain_text': plain_text,
'headings': headings,
'content_hash': content_hash
}
def extract_headings(markdown_text):
"""Extract H1-H3 headings from Markdown."""
headings = []
for line in markdown_text.split('\n'):
if line.startswith('# '):
headings.append(line[2:].strip())
elif line.startswith('## '):
headings.append(line[3:].strip())
elif line.startswith('### '):
headings.append(line[4:].strip())
return headings
Step 3: Enrich Metadata
from datetime import date, timedelta
def enrich_metadata(frontmatter, db):
"""Enrich metadata with retention and normalized values."""
# Lookup retention policy
retention_category = frontmatter.get('retention_category', 'DEFAULT-7Y')
policy = db.query(
"SELECT * FROM retention_policies WHERE retention_category = %s",
[retention_category]
).fetchone()
if not policy:
raise ValueError(f"Unknown retention_category: {retention_category}")
# Compute retain_until
effective_date = frontmatter.get('effective_date', date.today())
years = max(policy['period_years_default'], policy['min_years'])
retain_until = effective_date + timedelta(days=years * 365)
# Normalize regulation codes
regulations = frontmatter.get('regulations', [])
normalized_regulations = [normalize_regulation(r) for r in regulations]
# Normalize jurisdiction
jurisdiction = frontmatter.get('jurisdiction', ['US'])
return {
**frontmatter,
'retention_period_y': years,
'retain_until': retain_until,
'regulations': normalized_regulations,
'jurisdiction': jurisdiction
}
Step 4: WORM Write
import boto3
import json
from datetime import datetime
def write_to_worm(doc_id, version, content, metadata):
"""Write document + metadata to WORM storage."""
s3 = boto3.client('s3')
# Serialize record
record = {
'markdown': content,
'metadata': metadata,
'doc_id': str(doc_id),
'version': version,
'archived_at': datetime.utcnow().isoformat()
}
# Write with Object Lock
key = f'records/{doc_id}/{version}.json'
s3.put_object(
Bucket='compliance-archive',
Key=key,
Body=json.dumps(record),
ObjectLockMode='COMPLIANCE',
ObjectLockRetainUntilDate=metadata['retain_until']
)
return key
Step 5: Database Upsert
from uuid import uuid4
def upsert_document(db, doc_id, file_path, parsed, metadata, worm_key):
"""Upsert document and metadata in PostgreSQL."""
# Check if document exists
existing = db.query(
"SELECT doc_id, current_version FROM documents WHERE doc_id = %s",
[doc_id]
).fetchone()
now = datetime.utcnow()
if existing:
# Update existing document
new_version = existing['current_version'] + 1
# Insert new version
db.execute("""
INSERT INTO document_versions
(doc_id, version, worm_object_id, content_hash, created_at, created_by, supersedes_version)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", [doc_id, new_version, worm_key, parsed['content_hash'], now, 'system', existing['current_version']])
# Update documents
db.execute("""
UPDATE documents SET
current_version = %s,
content_hash = %s,
worm_object_id = %s,
last_modified_at = %s,
last_modified_by = %s
WHERE doc_id = %s
""", [new_version, parsed['content_hash'], worm_key, now, 'system', doc_id])
else:
# Insert new document
db.execute("""
INSERT INTO documents
(doc_id, path, current_version, content_hash, worm_object_id, created_at, created_by, last_modified_at, last_modified_by)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", [doc_id, file_path, 1, parsed['content_hash'], worm_key, now, 'system', now, 'system'])
# Insert version
db.execute("""
INSERT INTO document_versions
(doc_id, version, worm_object_id, content_hash, created_at, created_by)
VALUES (%s, %s, %s, %s, %s, %s)
""", [doc_id, 1, worm_key, parsed['content_hash'], now, 'system'])
# Upsert metadata
db.execute("""
INSERT INTO document_metadata
(doc_id, title, domain, document_type, jurisdiction, regulations,
security_class, contains_phi, contains_pii, contains_financial,
status, effective_date, review_due_date, retention_category,
retention_period_y, retain_until, owner_user_id, owner_role,
business_unit, facility, created_at, last_modified_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (doc_id) DO UPDATE SET
title = EXCLUDED.title,
status = EXCLUDED.status,
last_modified_at = EXCLUDED.last_modified_at
""", [doc_id, metadata['title'], metadata['domain'], ...])
db.commit()
Step 6: Index in Meilisearch
import meilisearch
def index_document(doc_id, parsed, metadata):
"""Index document in Meilisearch."""
client = meilisearch.Client('http://localhost:7700', 'YOUR_API_KEY')
index = client.index('documents')
doc = {
'doc_id': str(doc_id),
'title': metadata['title'],
'headings': parsed['headings'],
'body': parsed['plain_text'],
'domain': metadata['domain'],
'document_type': metadata['document_type'],
'jurisdiction': metadata['jurisdiction'],
'regulations': metadata['regulations'],
'security_class': metadata['security_class'],
'contains_phi': metadata.get('contains_phi', False),
'contains_pii': metadata.get('contains_pii', False),
'contains_financial': metadata.get('contains_financial', False),
'status': metadata['status'],
'retention_category': metadata['retention_category'],
'business_unit': metadata.get('business_unit'),
'facility': metadata.get('facility'),
'owner_role': metadata['owner_role'],
'owner_user_id': metadata['owner_user_id'],
'effective_date': metadata['effective_date'].isoformat(),
'review_due_date': metadata.get('review_due_date', '').isoformat() if metadata.get('review_due_date') else None,
'retain_until': metadata['retain_until'].isoformat(),
'created_at': metadata['created_at'].isoformat(),
'last_modified_at': metadata['last_modified_at'].isoformat()
}
index.add_documents([doc])
Step 7: Audit Logging
def log_ingestion(db, doc_id, version, user_id='system'):
"""Log ingestion event to audit trail."""
db.execute("""
INSERT INTO audit_events
(user_id, action, doc_id, doc_version, new_value)
VALUES (%s, %s, %s, %s, %s)
""", [user_id, 'ingest', doc_id, version, json.dumps({'status': 'success'})])
db.commit()
Complete Pipeline Function
def process_file(file_path, db):
"""Complete ingestion pipeline for a single file."""
# Generate or lookup doc_id
doc_id = get_or_create_doc_id(file_path, db)
# Step 2: Parse
parsed = parse_markdown(file_path)
# Step 3: Enrich
metadata = enrich_metadata(parsed['frontmatter'], db)
# Step 4: WORM
version = get_next_version(doc_id, db)
worm_key = write_to_worm(doc_id, version, parsed['body'], metadata)
# Step 5: DB
upsert_document(db, doc_id, file_path, parsed, metadata, worm_key)
# Step 6: Index
index_document(doc_id, parsed, metadata)
# Step 7: Audit
log_ingestion(db, doc_id, version)
return doc_id