Skip to main content

#!/usr/bin/env python3 """ Generate DATA-DICTIONARY.md from database schemas.

Task: J.24.1.3, J.24.1.5 ADR: ADR-148 (Database Schema Documentation Standard)

Automatically extracts schema information from all four CODITECT databases and generates field-level documentation in ADR-148 format.

Also provides schema drift validation (J.24.1.5) to detect when the actual database schema differs from the documented schema.

Usage: python3 scripts/generate-data-dictionary.py # Generate full dictionary python3 scripts/generate-data-dictionary.py --dry-run # Preview without writing python3 scripts/generate-data-dictionary.py --stats # Show statistics only python3 scripts/generate-data-dictionary.py --db sessions # Single database python3 scripts/generate-data-dictionary.py --validate # Validate schema drift """

import argparse import json import re import sqlite3 import sys from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple

Version

version = "1.1.0"

Resolve paths

SCRIPT_DIR = Path(file).parent ROOT_DIR = SCRIPT_DIR.parent

Add scripts to path for imports

sys.path.insert(0, str(SCRIPT_DIR))

try: from core.paths import ( get_org_db_path, get_sessions_db_path, get_projects_db_path, get_user_data_dir, ) PATHS_AVAILABLE = True except ImportError: PATHS_AVAILABLE = False

Output path

OUTPUT_PATH = ROOT_DIR / "docs" / "reference" / "database" / "DATA-DICTIONARY.md"

Database tier configuration

DATABASE_TIERS = { "platform": { "tier": 1, "name": "platform.db", "purpose": "Component metadata index (agents, skills, commands, scripts)", "backup": "Regenerable", "location": "/.coditect-data/context-storage/platform.db", "regenerable": True, }, "org": { "tier": 2, "name": "org.db", "purpose": "Organizational knowledge (decisions, learnings, error solutions)", "backup": "DAILY (CRITICAL)", "location": "/.coditect-data/context-storage/org.db", "regenerable": False, }, "sessions": { "tier": 3, "name": "sessions.db", "purpose": "Session data (messages, tool analytics, activity associations)", "backup": "Regenerable", "location": "/.coditect-data/context-storage/sessions.db", "regenerable": True, }, "projects": { "tier": 4, "name": "projects.db", "purpose": "Project-specific data (project index, embeddings)", "backup": "With code", "location": "/.coditect-data/context-storage/projects.db", "regenerable": True, }, }

Table purpose descriptions (manually curated for important tables)

TABLE_PURPOSES = { # Tier 1 - platform.db "components": "Master registry of all CODITECT framework components", "capabilities": "Component capability tags for semantic search", "triggers": "Usage triggers that activate components", "component_relationships": "Dependencies between components", "component_search": "FTS5 full-text search index for components", "metadata": "Key-value metadata store",

# Tier 2 - org.db
"decisions": "Architectural and design decisions with rationale",
"decisions_fts": "FTS5 search index for decisions",
"skill_learnings": "Accumulated skill improvement learnings",
"error_solutions": "Known error patterns and their solutions",
"calibration_history": "MoE classifier calibration records",
"quality_metrics": "Component quality tracking metrics",
"usage_patterns": "Component usage pattern analytics",
"feedback_history": "User feedback on component effectiveness",

# Tier 3 - sessions.db
"messages": "Extracted session messages from all LLMs",
"messages_fts": "FTS5 search index for messages",
"sessions": "Session metadata and boundaries",
"tool_analytics": "Tool usage statistics and performance",
"token_economics": "Token usage tracking per session",
"activity_associations": "Activity-to-project mapping (ADR-122)",
"workflow_states": "Cross-session workflow state persistence",
"kg_nodes": "Knowledge graph nodes (ADR-151)",
"kg_edges": "Knowledge graph edges (ADR-151)",
"sync_queue": "Offline sync queue for cloud synchronization",
"task_tracking": "Local task status tracking",
"task_messages": "Task-to-message associations",

# Tier 4 - projects.db
"projects": "Registered project metadata",
"content_hashes": "File content hashes for change detection",
"project_embeddings": "Semantic embeddings for project content",
"exclude_patterns": "File exclusion patterns per project",

}

def get_database_path(db_key: str) -> Optional[Path]: """Get the path to a database file.""" if db_key == "platform": # Platform DB is in context-storage if PATHS_AVAILABLE: user_data = get_user_data_dir() return user_data / "context-storage" / "platform.db" return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "platform.db"

elif db_key == "org":
if PATHS_AVAILABLE:
return get_org_db_path()
return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "org.db"

elif db_key == "sessions":
if PATHS_AVAILABLE:
return get_sessions_db_path()
return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db"

elif db_key == "projects":
if PATHS_AVAILABLE:
return get_projects_db_path()
return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "projects.db"

return None

def get_tables(conn: sqlite3.Connection) -> List[str]: """Get all table names from database.""" cursor = conn.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name """) return [row[0] for row in cursor.fetchall()]

def get_table_info(conn: sqlite3.Connection, table: str) -> List[Dict[str, Any]]: """Get column information for a table using PRAGMA table_info.""" cursor = conn.execute(f"PRAGMA table_info({table})") columns = [] for row in cursor.fetchall(): columns.append({ "cid": row[0], "name": row[1], "type": row[2] or "TEXT", "notnull": bool(row[3]), "default": row[4], "pk": bool(row[5]), }) return columns

def get_indexes(conn: sqlite3.Connection, table: str) -> List[Dict[str, Any]]: """Get index information for a table.""" cursor = conn.execute(f"PRAGMA index_list({table})") indexes = [] for row in cursor.fetchall(): index_name = row[1] is_unique = bool(row[2])

    # Get columns in this index
col_cursor = conn.execute(f"PRAGMA index_info({index_name})")
columns = [col[2] for col in col_cursor.fetchall()]

indexes.append({
"name": index_name,
"unique": is_unique,
"columns": columns,
})
return indexes

def get_foreign_keys(conn: sqlite3.Connection, table: str) -> List[Dict[str, Any]]: """Get foreign key information for a table.""" cursor = conn.execute(f"PRAGMA foreign_key_list({table})") fks = [] for row in cursor.fetchall(): fks.append({ "from": row[3], "table": row[2], "to": row[4], }) return fks

def is_fts_table(table: str) -> bool: """Check if table is an FTS5 virtual table.""" return table.endswith("_fts") or "fts" in table

def format_constraints(col: Dict[str, Any], fks: List[Dict[str, Any]]) -> str: """Format constraints for a column.""" constraints = [] if col["pk"]: constraints.append("PK") if col["notnull"]: constraints.append("NN")

# Check for foreign keys
for fk in fks:
if fk["from"] == col["name"]:
constraints.append(f"FK→{fk['table']}")

return ", ".join(constraints) if constraints else "-"

def format_default(default: Any) -> str: """Format default value for display.""" if default is None: return "-" if default == "CURRENT_TIMESTAMP": return "CURRENT_TIMESTAMP" if isinstance(default, str): return f"{default}" if default else '""' return str(default)

def generate_table_docs( conn: sqlite3.Connection, table: str, db_key: str ) -> str: """Generate markdown documentation for a single table.""" lines = []

# Table header
lines.append(f"### Table: `{table}`")
lines.append("")

# Purpose (from manual curation or auto-description)
purpose = TABLE_PURPOSES.get(table, "")
if not purpose:
if is_fts_table(table):
base_table = table.replace("_fts", "").replace("_content", "").replace("_data", "")
purpose = f"FTS5 full-text search index for `{base_table}`"
else:
purpose = f"Table in {db_key}.db"

lines.append(f"**Purpose:** {purpose}")
lines.append("")

# Get schema info
columns = get_table_info(conn, table)
indexes = get_indexes(conn, table)
fks = get_foreign_keys(conn, table)

# Special handling for FTS tables
if is_fts_table(table):
lines.append("**Type:** FTS5 Virtual Table")
lines.append("")
lines.append("#### Indexed Columns")
lines.append("")
if columns:
for col in columns:
lines.append(f"- `{col['name']}`")
else:
lines.append("- *(See base table for columns)*")
lines.append("")
return "\n".join(lines)

# Related tables (from foreign keys)
if fks:
lines.append("**Related Tables:**")
for fk in fks:
lines.append(f"- `{fk['table']}` (FK: {fk['from']} → {fk['to']})")
lines.append("")

# Fields table
lines.append("#### Fields")
lines.append("")
lines.append("| Field | Type | Constraints | Default | Description |")
lines.append("|-------|------|-------------|---------|-------------|")

for col in columns:
field = f"`{col['name']}`"
col_type = col["type"]
constraints = format_constraints(col, fks)
default = format_default(col["default"])
description = "-" # Could be enhanced with column descriptions

lines.append(f"| {field} | {col_type} | {constraints} | {default} | {description} |")

lines.append("")

# Indexes (non-auto-generated)
real_indexes = [idx for idx in indexes if not idx["name"].startswith("sqlite_")]
if real_indexes:
lines.append("#### Indexes")
lines.append("")
lines.append("| Index | Columns | Unique |")
lines.append("|-------|---------|--------|")
for idx in real_indexes:
cols = ", ".join(f"`{c}`" for c in idx["columns"])
unique = "✓" if idx["unique"] else "-"
lines.append(f"| `{idx['name']}` | {cols} | {unique} |")
lines.append("")

return "\n".join(lines)

def generate_database_section(db_key: str, conn: sqlite3.Connection) -> Tuple[str, int]: """Generate documentation section for a database.""" config = DATABASE_TIERS[db_key] tables = get_tables(conn)

lines = []

# Section header
tier = config["tier"]
name = config["name"]
critical = " (CRITICAL)" if not config["regenerable"] else ""

lines.append(f"## Tier {tier}: {name}{critical}")
lines.append("")
lines.append(f"**Purpose:** {config['purpose']}")
lines.append(f"**Location:** `{config['location']}`")
lines.append(f"**Backup:** {config['backup']}")
lines.append(f"**Tables:** {len(tables)}")
lines.append("")

# Generate docs for each table
for table in tables:
lines.append(generate_table_docs(conn, table, db_key))

lines.append("---")
lines.append("")

return "\n".join(lines), len(tables)

def generate_frontmatter(total_tables: int) -> str: """Generate YAML frontmatter for the document.""" now = datetime.now(timezone.utc).strftime("%Y-%m-%d")

return f"""---

title: CODITECT Data Dictionary type: reference component_type: data-dictionary version: 1.0.0 audience: contributor status: active summary: Field-level documentation for all {total_tables} tables across CODITECT four-tier database architecture keywords:

  • database
  • schema
  • data-dictionary
  • field-documentation
  • ADR-148
  • ADR-118 tokens: ~8000 created: '2026-02-04' updated: '{now}' tags:
  • database
  • schema
  • reference
  • data-dictionary moe_confidence: 0.950 moe_classified: {now}

"""

def generate_header(table_counts: Dict[str, int]) -> str: """Generate document header with quick reference.""" total = sum(table_counts.values())

return f"""# CODITECT Data Dictionary

Governance: ADR-148, ADR-118

Purpose: Field-level documentation for all tables in CODITECT's four-tier database architecture. For high-level architecture overview, see DATABASE-SCHEMA.md.

Auto-generated by: scripts/generate-data-dictionary.py (J.24.1.3)


Quick Reference

DatabaseTierTablesBackupDocumentation
platform.db1{table_counts.get('platform', 0)}RegenerableJump
org.db2{table_counts.get('org', 0)}DAILYJump
sessions.db3{table_counts.get('sessions', 0)}RegenerableJump
projects.db4{table_counts.get('projects', 0)}With codeJump

Total: {total} tables across 4 databases


Legend

SymbolMeaning
PKPrimary Key
FKForeign Key
UQUnique Constraint
NNNOT NULL
FTSFull-Text Search Index

"""

Cloud Sync Mapping

See ADR-053 for cloud sync details.

Local TableCloud TableDirectionFrequency
decisionsDecision↑↓ BidirectionalOn change
skill_learningsSkillLearning↑ UploadSession end
task_trackingTaskTracking↑↓ BidirectionalOn change
messagesMessage↑ UploadBatch (daily)

Purpose Matrix

I need to...TableDatabase
Find a componentcomponentsplatform.db
Search message historymessages_ftssessions.db
Track a decisiondecisionsorg.db
Store skill learningskill_learningsorg.db
Record tool usagetool_analyticssessions.db
Associate activityactivity_associationssessions.db
Query knowledge graphkg_nodes, kg_edgessessions.db
Track project filescontent_hashesprojects.db

Generated: {now} Script: scripts/generate-data-dictionary.py v{version} Task: J.24.1.3 """

def generate_data_dictionary( databases: Optional[List[str]] = None, dry_run: bool = False, stats_only: bool = False, ) -> Dict[str, Any]: """Generate the complete data dictionary."""

results = {
"databases": {},
"total_tables": 0,
"errors": [],
}

# Process each database
db_keys = databases or list(DATABASE_TIERS.keys())
sections = []
table_counts = {}

for db_key in db_keys:
if db_key not in DATABASE_TIERS:
results["errors"].append(f"Unknown database: {db_key}")
continue

db_path = get_database_path(db_key)
if not db_path or not db_path.exists():
results["errors"].append(f"Database not found: {db_key} ({db_path})")
table_counts[db_key] = 0
continue

try:
conn = sqlite3.connect(db_path)
section, count = generate_database_section(db_key, conn)
sections.append(section)
table_counts[db_key] = count
results["databases"][db_key] = {
"path": str(db_path),
"tables": count,
}
conn.close()
except Exception as e:
results["errors"].append(f"Error processing {db_key}: {e}")
table_counts[db_key] = 0

results["total_tables"] = sum(table_counts.values())

if stats_only:
return results

# Generate full document
content = []
content.append(generate_frontmatter(results["total_tables"]))
content.append(generate_header(table_counts))
content.extend(sections)
content.append(generate_footer())

full_content = "\n".join(content)
results["content_length"] = len(full_content)
results["content_lines"] = full_content.count("\n")

if dry_run:
results["dry_run"] = True
print(full_content[:2000])
print(f"\n... ({len(full_content)} characters total)")
else:
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH.write_text(full_content)
results["output_path"] = str(OUTPUT_PATH)

return results

@dataclass class ColumnSchema: """Represents a column's schema for comparison.""" name: str col_type: str notnull: bool pk: bool default: Optional[str] = None

@dataclass class TableSchema: """Represents a table's schema for comparison.""" name: str columns: Dict[str, ColumnSchema] = field(default_factory=dict) is_fts: bool = False

@dataclass class DatabaseSchema: """Represents a database's schema for comparison.""" name: str tables: Dict[str, TableSchema] = field(default_factory=dict)

@dataclass class SchemaDrift: """Represents schema drift between documented and actual schema.""" database: str tables_added: List[str] = field(default_factory=list) tables_removed: List[str] = field(default_factory=list) columns_added: Dict[str, List[str]] = field(default_factory=dict) columns_removed: Dict[str, List[str]] = field(default_factory=dict) columns_changed: Dict[str, List[Tuple[str, str, str]]] = field(default_factory=dict)

def has_drift(self) -> bool:
"""Check if any drift was detected."""
return bool(
self.tables_added or
self.tables_removed or
self.columns_added or
self.columns_removed or
self.columns_changed
)

def summary(self) -> str:
"""Generate a summary of the drift."""
parts = []
if self.tables_added:
parts.append(f"+{len(self.tables_added)} tables")
if self.tables_removed:
parts.append(f"-{len(self.tables_removed)} tables")
total_cols_added = sum(len(v) for v in self.columns_added.values())
total_cols_removed = sum(len(v) for v in self.columns_removed.values())
total_cols_changed = sum(len(v) for v in self.columns_changed.values())
if total_cols_added:
parts.append(f"+{total_cols_added} columns")
if total_cols_removed:
parts.append(f"-{total_cols_removed} columns")
if total_cols_changed:
parts.append(f"~{total_cols_changed} columns changed")
return ", ".join(parts) if parts else "No drift"

def parse_documented_schema(doc_path: Path) -> Dict[str, DatabaseSchema]: """ Parse the DATA-DICTIONARY.md to extract documented schema.

Returns a dict mapping database key to DatabaseSchema.
"""
if not doc_path.exists():
return {}

content = doc_path.read_text()
schemas: Dict[str, DatabaseSchema] = {}

# Pattern to match database sections
db_section_pattern = re.compile(
r'^## Tier \d+: (platform|org|sessions|projects)\.db',
re.MULTILINE | re.IGNORECASE
)

# Pattern to match table definitions
table_pattern = re.compile(r'^### Table: `([^`]+)`', re.MULTILINE)

# Pattern to match field rows in tables
# | `field_name` | TYPE | Constraints | Default | Description |
field_pattern = re.compile(
r'^\| `([^`]+)` \| ([^|]+) \| ([^|]+) \| ([^|]+) \|',
re.MULTILINE
)

# Pattern for FTS tables
fts_pattern = re.compile(r'\*\*Type:\*\* FTS5 Virtual Table', re.MULTILINE)

# Find all database sections
db_matches = list(db_section_pattern.finditer(content))

for i, db_match in enumerate(db_matches):
db_name = db_match.group(1).lower()
start = db_match.end()
end = db_matches[i + 1].start() if i + 1 < len(db_matches) else len(content)
section = content[start:end]

db_schema = DatabaseSchema(name=db_name)

# Find all tables in this section
table_matches = list(table_pattern.finditer(section))

for j, table_match in enumerate(table_matches):
table_name = table_match.group(1)
table_start = table_match.end()
table_end = table_matches[j + 1].start() if j + 1 < len(table_matches) else len(section)
table_section = section[table_start:table_end]

# Check if FTS table
is_fts = bool(fts_pattern.search(table_section))

table_schema = TableSchema(name=table_name, is_fts=is_fts)

# Extract fields (only for non-FTS tables)
if not is_fts:
for field_match in field_pattern.finditer(table_section):
col_name = field_match.group(1)
col_type = field_match.group(2).strip()
constraints = field_match.group(3).strip()
default = field_match.group(4).strip()

col_schema = ColumnSchema(
name=col_name,
col_type=col_type,
notnull="NN" in constraints,
pk="PK" in constraints,
default=default if default != "-" else None,
)
table_schema.columns[col_name] = col_schema

db_schema.tables[table_name] = table_schema

schemas[db_name] = db_schema

return schemas

def get_actual_schema(db_key: str) -> Optional[DatabaseSchema]: """Get the actual schema from a database.""" db_path = get_database_path(db_key) if not db_path or not db_path.exists(): return None

try:
conn = sqlite3.connect(db_path)
db_schema = DatabaseSchema(name=db_key)

tables = get_tables(conn)
for table_name in tables:
is_fts = is_fts_table(table_name)
table_schema = TableSchema(name=table_name, is_fts=is_fts)

if not is_fts:
columns = get_table_info(conn, table_name)
for col in columns:
col_schema = ColumnSchema(
name=col["name"],
col_type=col["type"],
notnull=col["notnull"],
pk=col["pk"],
default=col["default"],
)
table_schema.columns[col["name"]] = col_schema

db_schema.tables[table_name] = table_schema

conn.close()
return db_schema

except Exception:
return None

def compare_schemas( documented: Optional[DatabaseSchema], actual: Optional[DatabaseSchema], db_key: str ) -> SchemaDrift: """Compare documented schema with actual schema and return drift.""" drift = SchemaDrift(database=db_key)

if not actual:
return drift

if not documented:
# No documentation exists - all tables are "added"
drift.tables_added = list(actual.tables.keys())
return drift

doc_tables = set(documented.tables.keys())
actual_tables = set(actual.tables.keys())

# Tables added (in actual but not documented)
drift.tables_added = sorted(actual_tables - doc_tables)

# Tables removed (in docs but not in actual)
drift.tables_removed = sorted(doc_tables - actual_tables)

# Check columns in common tables
common_tables = doc_tables & actual_tables
for table_name in sorted(common_tables):
doc_table = documented.tables[table_name]
actual_table = actual.tables[table_name]

# Skip FTS tables for detailed comparison
if doc_table.is_fts or actual_table.is_fts:
continue

doc_cols = set(doc_table.columns.keys())
actual_cols = set(actual_table.columns.keys())

# Columns added
added = actual_cols - doc_cols
if added:
drift.columns_added[table_name] = sorted(added)

# Columns removed
removed = doc_cols - actual_cols
if removed:
drift.columns_removed[table_name] = sorted(removed)

# Columns changed (type mismatch)
common_cols = doc_cols & actual_cols
for col_name in sorted(common_cols):
doc_col = doc_table.columns[col_name]
actual_col = actual_table.columns[col_name]

# Compare types (normalize for comparison)
doc_type = doc_col.col_type.upper().strip()
actual_type = actual_col.col_type.upper().strip()

if doc_type != actual_type:
if table_name not in drift.columns_changed:
drift.columns_changed[table_name] = []
drift.columns_changed[table_name].append(
(col_name, doc_type, actual_type)
)

return drift

def validate_schema_drift( databases: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Validate schema drift between documentation and actual databases.

Returns validation results with drift details for each database.
"""
results = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"documentation": str(OUTPUT_PATH),
"databases": {},
"total_drift": False,
"summary": [],
}

# Parse documented schema
documented_schemas = parse_documented_schema(OUTPUT_PATH)

if not documented_schemas:
results["error"] = f"No documented schema found at {OUTPUT_PATH}"
return results

# Compare each database
db_keys = databases or list(DATABASE_TIERS.keys())

for db_key in db_keys:
if db_key not in DATABASE_TIERS:
continue

actual_schema = get_actual_schema(db_key)
documented_schema = documented_schemas.get(db_key)

drift = compare_schemas(documented_schema, actual_schema, db_key)

db_result = {
"documented_tables": len(documented_schema.tables) if documented_schema else 0,
"actual_tables": len(actual_schema.tables) if actual_schema else 0,
"has_drift": drift.has_drift(),
"summary": drift.summary(),
}

if drift.has_drift():
results["total_drift"] = True
db_result["drift"] = {
"tables_added": drift.tables_added,
"tables_removed": drift.tables_removed,
"columns_added": drift.columns_added,
"columns_removed": drift.columns_removed,
"columns_changed": {
t: [(c, d, a) for c, d, a in cols]
for t, cols in drift.columns_changed.items()
},
}

results["databases"][db_key] = db_result
results["summary"].append(f"{db_key}: {drift.summary()}")

return results

def format_validation_report(results: Dict[str, Any]) -> str: """Format validation results as a human-readable report.""" lines = [] lines.append("=" * 60) lines.append("SCHEMA DRIFT VALIDATION REPORT") lines.append("=" * 60) lines.append(f"Timestamp: {results['timestamp']}") lines.append(f"Documentation: {results['documentation']}") lines.append("")

if results.get("error"):
lines.append(f"ERROR: {results['error']}")
return "\n".join(lines)

# Overall status
if results["total_drift"]:
lines.append("⚠️ DRIFT DETECTED")
else:
lines.append("✅ NO DRIFT - Schema is in sync")
lines.append("")

# Per-database results
for db_key, db_result in results.get("databases", {}).items():
tier = DATABASE_TIERS.get(db_key, {}).get("tier", "?")
lines.append(f"Tier {tier}: {db_key}.db")
lines.append(f" Documented: {db_result['documented_tables']} tables")
lines.append(f" Actual: {db_result['actual_tables']} tables")
lines.append(f" Status: {db_result['summary']}")

if db_result.get("drift"):
drift = db_result["drift"]

if drift["tables_added"]:
lines.append(f" + Tables added: {', '.join(drift['tables_added'])}")
if drift["tables_removed"]:
lines.append(f" - Tables removed: {', '.join(drift['tables_removed'])}")

for table, cols in drift.get("columns_added", {}).items():
lines.append(f" + {table}: columns added: {', '.join(cols)}")

for table, cols in drift.get("columns_removed", {}).items():
lines.append(f" - {table}: columns removed: {', '.join(cols)}")

for table, changes in drift.get("columns_changed", {}).items():
for col, doc_type, actual_type in changes:
lines.append(f" ~ {table}.{col}: {doc_type} → {actual_type}")

lines.append("")

lines.append("=" * 60)

if results["total_drift"]:
lines.append("To fix: Run 'python3 scripts/generate-data-dictionary.py' to regenerate")
else:
lines.append("Documentation is up to date with database schema")

lines.append("=" * 60)

return "\n".join(lines)

def main(): parser = argparse.ArgumentParser( description="Generate DATA-DICTIONARY.md from database schemas", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 scripts/generate-data-dictionary.py # Generate full dictionary python3 scripts/generate-data-dictionary.py --dry-run # Preview without writing python3 scripts/generate-data-dictionary.py --stats # Show statistics only python3 scripts/generate-data-dictionary.py --db sessions # Single database python3 scripts/generate-data-dictionary.py --validate # Check for schema drift """, ) parser.add_argument( "--dry-run", action="store_true", help="Preview output without writing file", ) parser.add_argument( "--stats", action="store_true", help="Show statistics only", ) parser.add_argument( "--db", choices=["platform", "org", "sessions", "projects"], action="append", dest="databases", help="Process specific database(s) only", ) parser.add_argument( "--json", action="store_true", help="Output results as JSON", ) parser.add_argument( "--validate", action="store_true", help="Validate schema drift between documentation and databases (J.24.1.5)", ) parser.add_argument( "--version", action="version", version=f"%(prog)s {version}", )

args = parser.parse_args()

# Handle validation mode
if args.validate:
results = validate_schema_drift(databases=args.databases)

if args.json:
print(json.dumps(results, indent=2))
else:
print(format_validation_report(results))

# Exit with error if drift detected
sys.exit(1 if results.get("total_drift") else 0)

results = generate_data_dictionary(
databases=args.databases,
dry_run=args.dry_run,
stats_only=args.stats,
)

if args.json:
print(json.dumps(results, indent=2))
else:
print(f"\n{'='*60}")
print("DATA DICTIONARY GENERATION RESULTS")
print(f"{'='*60}")

for db_key, info in results.get("databases", {}).items():
print(f" {db_key}: {info['tables']} tables")

print(f"\n Total tables: {results['total_tables']}")

if results.get("errors"):
print(f"\n Errors: {len(results['errors'])}")
for err in results["errors"]:
print(f" - {err}")

if results.get("output_path"):
print(f"\n Output: {results['output_path']}")
print(f" Lines: {results.get('content_lines', 0)}")

if args.dry_run:
print("\n (Dry run - no file written)")

print(f"{'='*60}\n")

# Exit with error if there were problems
if results.get("errors"):
sys.exit(1)

if name == "main": main()