Skip to main content

Ingestion Pipeline (Markdown → Postgres + Meilisearch)

High-level, idempotent pipeline for processing Markdown documents into the compliance system.

Pipeline Overview

┌──────────────────┐
│ 1. Discovery │ Git commits / filesystem events
└────────┬─────────┘


┌──────────────────┐
│ 2. Parse │ Frontmatter + Markdown rendering
└────────┬─────────┘


┌──────────────────┐
│ 3. Enrich │ Retention lookup, normalization
└────────┬─────────┘


┌──────────────────┐
│ 4. WORM Write │ Immutable archive
└────────┬─────────┘


┌──────────────────┐
│ 5. DB Upsert │ PostgreSQL metadata
└────────┬─────────┘


┌──────────────────┐
│ 6. Index │ Meilisearch update
└────────┬─────────┘


┌──────────────────┐
│ 7. Audit Log │ Record ingestion event
└──────────────────┘

Step 1: Discovery

From Git

import subprocess
import json

def get_changed_files(since_commit):
"""Get changed .md files since a commit."""
result = subprocess.run(
['git', 'diff', '--name-only', since_commit, 'HEAD', '--', '*.md'],
capture_output=True, text=True
)
return result.stdout.strip().split('\n')

# Example: Get files changed in last commit
changed_files = get_changed_files('HEAD~1')

From Filesystem Events

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MarkdownHandler(FileSystemEventHandler):
def on_modified(self, event):
if event.src_path.endswith('.md'):
process_file(event.src_path)

observer = Observer()
observer.schedule(MarkdownHandler(), path='./documents', recursive=True)
observer.start()

Step 2: Parse Markdown

import yaml
import hashlib
from markdown import Markdown
from io import StringIO

def parse_markdown(file_path):
"""Parse Markdown file extracting frontmatter and content."""
with open(file_path, 'r') as f:
content = f.read()

# Split frontmatter and body
if content.startswith('---'):
parts = content.split('---', 2)
frontmatter = yaml.safe_load(parts[1])
body = parts[2].strip()
else:
frontmatter = {}
body = content

# Compute content hash
content_hash = hashlib.sha256(content.encode()).hexdigest()

# Extract headings
md = Markdown()
html = md.convert(body)
headings = extract_headings(body)

# Render to plain text
plain_text = strip_markdown(body)

return {
'frontmatter': frontmatter,
'body': body,
'plain_text': plain_text,
'headings': headings,
'content_hash': content_hash
}

def extract_headings(markdown_text):
"""Extract H1-H3 headings from Markdown."""
headings = []
for line in markdown_text.split('\n'):
if line.startswith('# '):
headings.append(line[2:].strip())
elif line.startswith('## '):
headings.append(line[3:].strip())
elif line.startswith('### '):
headings.append(line[4:].strip())
return headings

Step 3: Enrich Metadata

from datetime import date, timedelta

def enrich_metadata(frontmatter, db):
"""Enrich metadata with retention and normalized values."""

# Lookup retention policy
retention_category = frontmatter.get('retention_category', 'DEFAULT-7Y')
policy = db.query(
"SELECT * FROM retention_policies WHERE retention_category = %s",
[retention_category]
).fetchone()

if not policy:
raise ValueError(f"Unknown retention_category: {retention_category}")

# Compute retain_until
effective_date = frontmatter.get('effective_date', date.today())
years = max(policy['period_years_default'], policy['min_years'])
retain_until = effective_date + timedelta(days=years * 365)

# Normalize regulation codes
regulations = frontmatter.get('regulations', [])
normalized_regulations = [normalize_regulation(r) for r in regulations]

# Normalize jurisdiction
jurisdiction = frontmatter.get('jurisdiction', ['US'])

return {
**frontmatter,
'retention_period_y': years,
'retain_until': retain_until,
'regulations': normalized_regulations,
'jurisdiction': jurisdiction
}

Step 4: WORM Write

import boto3
import json
from datetime import datetime

def write_to_worm(doc_id, version, content, metadata):
"""Write document + metadata to WORM storage."""
s3 = boto3.client('s3')

# Serialize record
record = {
'markdown': content,
'metadata': metadata,
'doc_id': str(doc_id),
'version': version,
'archived_at': datetime.utcnow().isoformat()
}

# Write with Object Lock
key = f'records/{doc_id}/{version}.json'
s3.put_object(
Bucket='compliance-archive',
Key=key,
Body=json.dumps(record),
ObjectLockMode='COMPLIANCE',
ObjectLockRetainUntilDate=metadata['retain_until']
)

return key

Step 5: Database Upsert

from uuid import uuid4

def upsert_document(db, doc_id, file_path, parsed, metadata, worm_key):
"""Upsert document and metadata in PostgreSQL."""

# Check if document exists
existing = db.query(
"SELECT doc_id, current_version FROM documents WHERE doc_id = %s",
[doc_id]
).fetchone()

now = datetime.utcnow()

if existing:
# Update existing document
new_version = existing['current_version'] + 1

# Insert new version
db.execute("""
INSERT INTO document_versions
(doc_id, version, worm_object_id, content_hash, created_at, created_by, supersedes_version)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", [doc_id, new_version, worm_key, parsed['content_hash'], now, 'system', existing['current_version']])

# Update documents
db.execute("""
UPDATE documents SET
current_version = %s,
content_hash = %s,
worm_object_id = %s,
last_modified_at = %s,
last_modified_by = %s
WHERE doc_id = %s
""", [new_version, parsed['content_hash'], worm_key, now, 'system', doc_id])

else:
# Insert new document
db.execute("""
INSERT INTO documents
(doc_id, path, current_version, content_hash, worm_object_id, created_at, created_by, last_modified_at, last_modified_by)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", [doc_id, file_path, 1, parsed['content_hash'], worm_key, now, 'system', now, 'system'])

# Insert version
db.execute("""
INSERT INTO document_versions
(doc_id, version, worm_object_id, content_hash, created_at, created_by)
VALUES (%s, %s, %s, %s, %s, %s)
""", [doc_id, 1, worm_key, parsed['content_hash'], now, 'system'])

# Upsert metadata
db.execute("""
INSERT INTO document_metadata
(doc_id, title, domain, document_type, jurisdiction, regulations,
security_class, contains_phi, contains_pii, contains_financial,
status, effective_date, review_due_date, retention_category,
retention_period_y, retain_until, owner_user_id, owner_role,
business_unit, facility, created_at, last_modified_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (doc_id) DO UPDATE SET
title = EXCLUDED.title,
status = EXCLUDED.status,
last_modified_at = EXCLUDED.last_modified_at
""", [doc_id, metadata['title'], metadata['domain'], ...])

db.commit()

Step 6: Index in Meilisearch

import meilisearch

def index_document(doc_id, parsed, metadata):
"""Index document in Meilisearch."""
client = meilisearch.Client('http://localhost:7700', 'YOUR_API_KEY')
index = client.index('documents')

doc = {
'doc_id': str(doc_id),
'title': metadata['title'],
'headings': parsed['headings'],
'body': parsed['plain_text'],
'domain': metadata['domain'],
'document_type': metadata['document_type'],
'jurisdiction': metadata['jurisdiction'],
'regulations': metadata['regulations'],
'security_class': metadata['security_class'],
'contains_phi': metadata.get('contains_phi', False),
'contains_pii': metadata.get('contains_pii', False),
'contains_financial': metadata.get('contains_financial', False),
'status': metadata['status'],
'retention_category': metadata['retention_category'],
'business_unit': metadata.get('business_unit'),
'facility': metadata.get('facility'),
'owner_role': metadata['owner_role'],
'owner_user_id': metadata['owner_user_id'],
'effective_date': metadata['effective_date'].isoformat(),
'review_due_date': metadata.get('review_due_date', '').isoformat() if metadata.get('review_due_date') else None,
'retain_until': metadata['retain_until'].isoformat(),
'created_at': metadata['created_at'].isoformat(),
'last_modified_at': metadata['last_modified_at'].isoformat()
}

index.add_documents([doc])

Step 7: Audit Logging

def log_ingestion(db, doc_id, version, user_id='system'):
"""Log ingestion event to audit trail."""
db.execute("""
INSERT INTO audit_events
(user_id, action, doc_id, doc_version, new_value)
VALUES (%s, %s, %s, %s, %s)
""", [user_id, 'ingest', doc_id, version, json.dumps({'status': 'success'})])
db.commit()

Complete Pipeline Function

def process_file(file_path, db):
"""Complete ingestion pipeline for a single file."""
# Generate or lookup doc_id
doc_id = get_or_create_doc_id(file_path, db)

# Step 2: Parse
parsed = parse_markdown(file_path)

# Step 3: Enrich
metadata = enrich_metadata(parsed['frontmatter'], db)

# Step 4: WORM
version = get_next_version(doc_id, db)
worm_key = write_to_worm(doc_id, version, parsed['body'], metadata)

# Step 5: DB
upsert_document(db, doc_id, file_path, parsed, metadata, worm_key)

# Step 6: Index
index_document(doc_id, parsed, metadata)

# Step 7: Audit
log_ingestion(db, doc_id, version)

return doc_id

References