Retention Automation: Rules and Implementation
Retention enforcement integrated with the CODITECT DMS API.
Retention Policy Table
CREATE TABLE retention_policies (
category TEXT PRIMARY KEY, -- "HIPAA-6Y", "FINRA-6Y"
description TEXT NOT NULL,
period_years INT NOT NULL,
legal_basis TEXT NOT NULL, -- "HIPAA-164.316", "FINRA-4511/SEC-17a-4"
min_years INT NOT NULL -- defensive lower bound
);
-- Seed policies
INSERT INTO retention_policies VALUES
('HIPAA-6Y', 'HIPAA policies and procedures', 6, 'HIPAA 45 CFR 164.316(b)(2)(i)', 6),
('FINRA-6Y', 'FINRA supervisory procedures', 6, 'FINRA 4511', 6),
('SEC-7Y', 'SEC books and records', 7, 'SEC Rule 17a-4', 7),
('HR-7Y', 'HR and employment records', 7, 'State employment law', 7),
('DEFAULT-7Y', 'Default retention', 7, 'Internal policy', 7);
Trigger for retain_until Computation
CREATE OR REPLACE FUNCTION set_retention_fields()
RETURNS TRIGGER AS $$
DECLARE
pol retention_policies;
years INT;
BEGIN
SELECT * INTO pol
FROM retention_policies
WHERE category = NEW.retention_category;
IF NOT FOUND THEN
RAISE EXCEPTION 'Unknown retention_category: %', NEW.retention_category;
END IF;
years := GREATEST(pol.period_years, pol.min_years);
IF NEW.effective_date IS NULL THEN
RAISE EXCEPTION 'effective_date required for retention computation';
END IF;
NEW.retention_period_y := years;
NEW.retain_until := (NEW.effective_date + (years || ' years')::INTERVAL)::DATE;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_set_retention
BEFORE INSERT OR UPDATE OF effective_date, retention_category
ON document_metadata
FOR EACH ROW
EXECUTE FUNCTION set_retention_fields();
Destruction Queue
CREATE TABLE destruction_queue (
id BIGSERIAL PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
queued_at TIMESTAMPTZ NOT NULL DEFAULT now(),
processed_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'queued', -- queued|processing|completed|failed
reason TEXT NOT NULL
);
Daily Retention Job
from celery import shared_task
from datetime import date
@shared_task
def enqueue_expired_documents():
"""Daily job to queue documents for destruction."""
result = db.execute("""
INSERT INTO destruction_queue (document_id, reason)
SELECT dm.document_id, 'retention_expired'
FROM document_metadata dm
LEFT JOIN destruction_queue dq
ON dq.document_id = dm.document_id
AND dq.status IN ('queued', 'processing', 'completed')
WHERE dq.document_id IS NULL
AND dm.legal_hold = FALSE
AND dm.retain_until <= CURRENT_DATE
RETURNING document_id
""")
count = result.rowcount
logger.info(f"Queued {count} documents for destruction")
return count
Destruction Worker
@shared_task
def process_destruction_queue():
"""Process documents in destruction queue."""
items = db.fetch_all("""
SELECT id, document_id
FROM destruction_queue
WHERE status = 'queued'
ORDER BY queued_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for item in items:
try:
# Mark processing
db.execute(
"UPDATE destruction_queue SET status = 'processing' WHERE id = $1",
[item['id']]
)
# Verify still eligible
eligible = db.fetch_one("""
SELECT 1 FROM document_metadata
WHERE document_id = $1
AND legal_hold = FALSE
AND retain_until <= CURRENT_DATE
""", [item['document_id']])
if eligible:
# Perform destruction
await destroy_document(item['document_id'])
# Mark complete
db.execute("""
UPDATE destruction_queue
SET status = 'completed', processed_at = now()
WHERE id = $1
""", [item['id']])
# Audit log
await log_event('document_destroyed', item['document_id'])
else:
# Skip (conditions changed)
db.execute("""
UPDATE destruction_queue
SET status = 'skipped', processed_at = now()
WHERE id = $1
""", [item['id']])
except Exception as e:
db.execute("""
UPDATE destruction_queue
SET status = 'failed', processed_at = now()
WHERE id = $1
""", [item['id']])
logger.error(f"Destruction failed for {item['document_id']}: {e}")
async def destroy_document(doc_id: UUID):
"""Perform document destruction."""
# 1. Update status
db.execute("""
UPDATE document_metadata
SET status = 'destroyed'
WHERE document_id = $1
""", [doc_id])
# 2. Remove from Meilisearch
meilisearch_client.index('documents').delete_document(str(doc_id))
# 3. Optionally delete chunks (or mark deleted)
db.execute(
"DELETE FROM document_chunks WHERE document_id = $1",
[doc_id]
)
# 4. If using WORM storage, the retention lock should have expired
# allowing the object to be deleted
Legal Hold API
@app.post("/api/v1/documents/{doc_id}/legal-hold")
async def apply_legal_hold(doc_id: UUID, request: LegalHoldRequest, user: User = Depends(get_current_user)):
"""Apply legal hold to document."""
# Check permission
if 'doc.place_hold' not in user.permissions:
raise HTTPException(403, "Insufficient permissions")
await db.execute("""
UPDATE document_metadata
SET legal_hold = TRUE,
legal_hold_reason = $1,
last_modified_at = now()
WHERE document_id = $2
""", [request.reason, doc_id])
# Remove from destruction queue if present
await db.execute("""
DELETE FROM destruction_queue
WHERE document_id = $1 AND status = 'queued'
""", [doc_id])
# Audit log
await log_event('legal_hold_applied', doc_id, {
'reason': request.reason,
'applied_by': user.id
})
return {"status": "hold_applied"}
@app.delete("/api/v1/documents/{doc_id}/legal-hold")
async def release_legal_hold(doc_id: UUID, user: User = Depends(get_current_user)):
"""Release legal hold from document."""
if 'doc.place_hold' not in user.permissions:
raise HTTPException(403, "Insufficient permissions")
await db.execute("""
UPDATE document_metadata
SET legal_hold = FALSE,
legal_hold_reason = NULL,
last_modified_at = now()
WHERE document_id = $1
""", [doc_id])
await log_event('legal_hold_released', doc_id, {
'released_by': user.id
})
return {"status": "hold_released"}
Retention Dashboard API
@app.get("/api/v1/compliance/retention-summary")
async def get_retention_summary(user: User = Depends(get_current_user)):
"""Get retention summary for dashboard."""
summary = await db.fetch_all("""
SELECT
retention_category,
COUNT(*) as total,
SUM(CASE WHEN retain_until <= CURRENT_DATE AND legal_hold = FALSE THEN 1 ELSE 0 END) as eligible,
SUM(CASE WHEN legal_hold = TRUE THEN 1 ELSE 0 END) as on_hold,
SUM(CASE WHEN retain_until BETWEEN CURRENT_DATE AND CURRENT_DATE + 90 THEN 1 ELSE 0 END) as expiring_soon
FROM document_metadata
WHERE status != 'destroyed'
GROUP BY retention_category
ORDER BY retention_category
""")
return RetentionSummaryResponse(categories=summary)
Celery Beat Schedule
# celeryconfig.py
from celery.schedules import crontab
beat_schedule = {
'enqueue-expired-documents': {
'task': 'tasks.enqueue_expired_documents',
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
},
'process-destruction-queue': {
'task': 'tasks.process_destruction_queue',
'schedule': crontab(hour=3, minute=0), # Daily at 3 AM
},
}