#!/usr/bin/env python3 """ Deduplicate doc_index table - ensure unique file paths.
Removes duplicate entries keeping the most recently indexed version. Also adds UNIQUE constraint to prevent future duplicates.
Usage: python3 scripts/deduplicate-doc-index.py # Dry run python3 scripts/deduplicate-doc-index.py --apply # Apply changes python3 scripts/deduplicate-doc-index.py --stats # Show statistics only """
import sqlite3 import argparse from pathlib import Path from datetime import datetime
ADR-114 & ADR-118: Use centralized path discovery
doc_index table is in org.db (Tier 2: organizational knowledge)
SCRIPT_DIR = Path(file).resolve().parent FRAMEWORK_ROOT = SCRIPT_DIR.parent import sys sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_org_db_path, ORG_DB DB_PATH = ORG_DB except ImportError: _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "org.db" else: DB_PATH = FRAMEWORK_ROOT / "context-storage" / "org.db"
def get_stats(conn: sqlite3.Connection) -> dict: """Get current doc_index statistics.""" cursor = conn.cursor()
total = cursor.execute("SELECT COUNT(*) FROM doc_index").fetchone()[0]
unique_paths = cursor.execute("SELECT COUNT(DISTINCT file_path) FROM doc_index").fetchone()[0]
unique_hashes = cursor.execute("SELECT COUNT(DISTINCT doc_hash) FROM doc_index").fetchone()[0]
# Count duplicates
duplicates = cursor.execute("""
SELECT file_path, COUNT(*) as cnt
FROM doc_index
GROUP BY file_path
HAVING cnt > 1
""").fetchall()
duplicate_count = sum(cnt - 1 for _, cnt in duplicates)
return {
"total_rows": total,
"unique_paths": unique_paths,
"unique_hashes": unique_hashes,
"duplicate_rows": duplicate_count,
"paths_with_duplicates": len(duplicates),
}
def find_duplicates(conn: sqlite3.Connection) -> list: """Find all duplicate file paths.""" cursor = conn.cursor()
# Get paths with duplicates and their details
duplicates = cursor.execute("""
SELECT file_path, COUNT(*) as cnt,
GROUP_CONCAT(id) as ids,
MAX(indexed_at) as latest
FROM doc_index
GROUP BY file_path
HAVING cnt > 1
ORDER BY cnt DESC
""").fetchall()
return duplicates
def deduplicate(conn: sqlite3.Connection, dry_run: bool = True) -> dict: """Remove duplicate entries, keeping the most recently indexed.""" cursor = conn.cursor()
# Find IDs to delete (keep the one with MAX indexed_at for each file_path)
# Strategy: For each duplicate file_path, keep the row with the highest id
# (which is typically the most recent insertion)
ids_to_delete = cursor.execute("""
SELECT id FROM doc_index
WHERE id NOT IN (
SELECT MAX(id) FROM doc_index GROUP BY file_path
)
""").fetchall()
ids_to_delete = [row[0] for row in ids_to_delete]
result = {
"rows_to_delete": len(ids_to_delete),
"dry_run": dry_run,
"deleted": 0,
}
if not dry_run and ids_to_delete:
# Delete in batches to avoid query size limits
batch_size = 500
deleted = 0
for i in range(0, len(ids_to_delete), batch_size):
batch = ids_to_delete[i:i + batch_size]
placeholders = ",".join("?" * len(batch))
cursor.execute(f"DELETE FROM doc_index WHERE id IN ({placeholders})", batch)
deleted += cursor.rowcount
conn.commit()
result["deleted"] = deleted
# Also clean up orphaned embeddings
cursor.execute("""
DELETE FROM doc_embeddings
WHERE doc_id NOT IN (SELECT id FROM doc_index)
""")
orphaned_embeddings = cursor.rowcount
conn.commit()
result["orphaned_embeddings_deleted"] = orphaned_embeddings
return result
def add_unique_constraint(conn: sqlite3.Connection, dry_run: bool = True) -> dict: """Add UNIQUE constraint on file_path to prevent future duplicates.""" cursor = conn.cursor()
# Check if unique index already exists
existing = cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='index' AND tbl_name='doc_index' AND name='idx_doc_unique_path'
""").fetchone()
result = {
"constraint_exists": existing is not None,
"dry_run": dry_run,
"added": False,
}
if existing:
return result
if not dry_run:
try:
cursor.execute("""
CREATE UNIQUE INDEX idx_doc_unique_path ON doc_index(file_path)
""")
conn.commit()
result["added"] = True
except sqlite3.IntegrityError as e:
result["error"] = str(e)
result["message"] = "Cannot add UNIQUE constraint - duplicates still exist"
return result
def main(): parser = argparse.ArgumentParser( description="Deduplicate doc_index table" ) parser.add_argument( "--apply", action="store_true", help="Apply changes (default is dry run)" ) parser.add_argument( "--stats", action="store_true", help="Show statistics only" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show detailed duplicate information" )
args = parser.parse_args()
dry_run = not args.apply
if not DB_PATH.exists():
print(f"Error: Database not found at {DB_PATH}")
return 1
conn = sqlite3.connect(DB_PATH)
print("=" * 60)
print("Doc Index Deduplication")
print("=" * 60)
print(f"Database: {DB_PATH}")
print(f"Mode: {'DRY RUN' if dry_run else 'APPLYING CHANGES'}")
print()
# Get current stats
stats = get_stats(conn)
print("Current State:")
print(f" Total rows: {stats['total_rows']:,}")
print(f" Unique file paths: {stats['unique_paths']:,}")
print(f" Duplicate rows: {stats['duplicate_rows']:,}")
print(f" Paths with duplicates: {stats['paths_with_duplicates']:,}")
print()
if args.stats:
conn.close()
return 0
if args.verbose:
duplicates = find_duplicates(conn)
print("Top 20 Duplicated Paths:")
for path, cnt, ids, latest in duplicates[:20]:
print(f" {cnt}x: {path[:80]}...")
print()
if stats['duplicate_rows'] == 0:
print("No duplicates found!")
# Check/add unique constraint
constraint_result = add_unique_constraint(conn, dry_run)
if constraint_result["constraint_exists"]:
print("UNIQUE constraint already exists on file_path")
elif dry_run:
print("Would add UNIQUE constraint on file_path")
else:
print("Added UNIQUE constraint on file_path")
conn.close()
return 0
# Deduplicate
print("Deduplicating...")
result = deduplicate(conn, dry_run)
if dry_run:
print(f" Would delete: {result['rows_to_delete']:,} duplicate rows")
else:
print(f" Deleted: {result['deleted']:,} duplicate rows")
if result.get('orphaned_embeddings_deleted', 0) > 0:
print(f" Cleaned up: {result['orphaned_embeddings_deleted']:,} orphaned embeddings")
# Add unique constraint
if not dry_run:
print()
print("Adding UNIQUE constraint...")
constraint_result = add_unique_constraint(conn, dry_run=False)
if constraint_result.get("added"):
print(" Added UNIQUE constraint on file_path")
elif constraint_result.get("error"):
print(f" Error: {constraint_result['error']}")
else:
print(" UNIQUE constraint already exists")
# Final stats
if not dry_run:
print()
final_stats = get_stats(conn)
print("Final State:")
print(f" Total rows: {final_stats['total_rows']:,}")
print(f" Unique file paths: {final_stats['unique_paths']:,}")
print(f" Duplicate rows: {final_stats['duplicate_rows']:,}")
conn.close()
print()
if dry_run:
print("Run with --apply to make changes")
else:
print("Deduplication complete!")
return 0
if name == "main": exit(main())