scripts-ingest-to-knowledge-db
#!/usr/bin/env python3 """
title: "unified_logger import is done lazily in setup_logging() to allow -h to work" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Ingest Unique Messages into Knowledge Database" keywords: ['database', 'ingest', 'knowledge', 'review'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "ingest-to-knowledge-db.py" language: python executable: true usage: "python3 scripts/ingest-to-knowledge-db.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Ingest Unique Messages into Knowledge Database
Reads messages from dedup_state/unique_messages.jsonl and ingests them into knowledge.db SQLite database for dashboard search functionality.
This script is designed to be called automatically by export-dedup.py as part of the unified workflow, ensuring the dashboard stays synchronized with the deduplication state.
Features:
- Incremental updates (only adds new messages)
- Atomic transactions (all-or-nothing)
- Comprehensive logging per ADR-0001
- Error handling with rollback
- Progress reporting
- Idempotent (safe to run multiple times)
Author: AZ1.AI INC (Hal Casteel) License: MIT Framework: CODITECT """
import argparse import sys import sqlite3 import json import hashlib import logging from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple, Set from dataclasses import dataclass
def parse_args(): """Parse command line arguments""" parser = argparse.ArgumentParser( description='Ingest unique messages from JSONL into knowledge.db SQLite database.', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Examples: %(prog)s # Ingest from default location %(prog)s --source path/to.jsonl # Ingest from specific JSONL file %(prog)s --db path/to.db # Target specific database %(prog)s --dry-run # Preview ingestion without changes %(prog)s --verbose # Show detailed progress
Features:
- Incremental updates (only adds new messages)
- Atomic transactions (all-or-nothing)
- Idempotent (safe to run multiple times)
- SHA-256 deduplication
Part of CODITECT Anti-Forgetting Memory System ''' ) parser.add_argument('--source', type=str, default=None, help='Path to unique_messages.jsonl file') parser.add_argument('--db', type=str, default=None, help='Path to knowledge.db database') parser.add_argument('--dry-run', action='store_true', help='Preview ingestion without making changes') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') parser.add_argument('--quiet', '-q', action='store_true', help='Suppress all output except errors') return parser.parse_args()
unified_logger import is done lazily in setup_logging() to allow -h to work
============================================================================
DATA STRUCTURES
============================================================================
@dataclass class MessageRecord: """Represents a message to be ingested into the database""" hash: str checkpoint_id: str role: str content: str first_seen: str message_index: int
@dataclass class IngestionStats: """Statistics for the ingestion process""" total_messages: int = 0 already_exists: int = 0 newly_inserted: int = 0 errors: int = 0 start_time: Optional[datetime] = None end_time: Optional[datetime] = None
@property
def duration_seconds(self) -> float:
"""Calculate duration in seconds"""
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 0.0
============================================================================
LOGGING SETUP
============================================================================
def setup_logging(log_dir: Path) -> logging.Logger: """Configure unified logging per ADR-0001 standards""" # Lazy import to allow -h to work before module dependencies are resolved script_dir = Path(file).parent coditect_core_scripts = script_dir.parent.parent / "submodules" / "core" / "coditect-core" / "scripts" / "core" sys.path.insert(0, str(coditect_core_scripts)) from unified_logger import setup_unified_logger
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "knowledge-db-ingestion.log"
logger = setup_unified_logger(
component="knowledge-db-ingestion",
log_file=log_file,
max_lines=5000,
console_level=logging.INFO,
file_level=logging.DEBUG
)
return logger
============================================================================
DATABASE OPERATIONS
============================================================================
class KnowledgeDBIngester: """Handles ingestion of messages into knowledge.db"""
def __init__(self, db_path: Path, logger: logging.Logger):
self.db_path = db_path
self.logger = logger
self.stats = IngestionStats()
def get_existing_message_hashes(self, conn: sqlite3.Connection) -> Set[str]:
"""Fetch all existing message hashes from database"""
cursor = conn.cursor()
cursor.execute("SELECT hash FROM messages")
return {row[0] for row in cursor.fetchall()}
def parse_unique_messages(self, jsonl_path: Path) -> List[MessageRecord]:
"""Parse unique_messages.jsonl into MessageRecord objects"""
messages = []
if not jsonl_path.exists():
self.logger.warning(f"unique_messages.jsonl not found at {jsonl_path}")
return messages
with open(jsonl_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
try:
data = json.loads(line.strip())
# Extract required fields
message_hash = data.get('message_hash', '')
checkpoint_id = data.get('checkpoint_id', 'UNKNOWN')
role = data.get('role', 'unknown')
content = data.get('content', '')
first_seen = data.get('first_seen', '')
message_index = data.get('message_index', 0)
record = MessageRecord(
hash=message_hash,
checkpoint_id=checkpoint_id,
role=role,
content=content,
first_seen=first_seen,
message_index=message_index
)
messages.append(record)
except json.JSONDecodeError as e:
self.logger.error(f"Line {line_num}: Invalid JSON - {e}")
self.stats.errors += 1
except Exception as e:
self.logger.error(f"Line {line_num}: Parse error - {e}")
self.stats.errors += 1
self.logger.info(f"Parsed {len(messages)} messages from {jsonl_path.name}")
return messages
def insert_messages(
self,
conn: sqlite3.Connection,
messages: List[MessageRecord],
existing_hashes: Set[str]
) -> int:
"""Insert new messages into database, skipping existing ones"""
cursor = conn.cursor()
inserted = 0
for msg in messages:
self.stats.total_messages += 1
if msg.hash in existing_hashes:
self.stats.already_exists += 1
continue
try:
cursor.execute('''
INSERT INTO messages (
hash, checkpoint_id, role, content,
first_seen, message_index
) VALUES (?, ?, ?, ?, ?, ?)
''', (
msg.hash,
msg.checkpoint_id,
msg.role,
msg.content,
msg.first_seen,
msg.message_index
))
inserted += 1
self.stats.newly_inserted += 1
# Log progress every 100 messages
if inserted % 100 == 0:
self.logger.debug(f"Inserted {inserted} messages...")
except sqlite3.IntegrityError:
# Duplicate hash (should not happen due to pre-check)
self.stats.already_exists += 1
except Exception as e:
self.logger.error(f"Failed to insert message {msg.hash}: {e}")
self.stats.errors += 1
return inserted
def run(self, jsonl_path: Path) -> IngestionStats:
"""Execute the complete ingestion workflow"""
self.stats.start_time = datetime.now()
try:
self.logger.info("="*60)
self.logger.info("Knowledge DB Ingestion Started")
self.logger.info("="*60)
self.logger.info(f"Database: {self.db_path}")
self.logger.info(f"Source: {jsonl_path}")
self.logger.info("")
# Verify database exists
if not self.db_path.exists():
raise FileNotFoundError(f"Database not found: {self.db_path}")
# Parse messages from JSONL
self.logger.info("Step 1: Parsing unique_messages.jsonl")
messages = self.parse_unique_messages(jsonl_path)
if not messages:
self.logger.warning("No messages to ingest")
return self.stats
# Connect to database
self.logger.info("Step 2: Connecting to knowledge.db")
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
# Get existing hashes
self.logger.info("Step 3: Fetching existing message hashes")
existing_hashes = self.get_existing_message_hashes(conn)
self.logger.info(f"Found {len(existing_hashes)} existing messages in database")
# Insert new messages
self.logger.info("Step 4: Inserting new messages")
inserted = self.insert_messages(conn, messages, existing_hashes)
# Commit transaction
conn.commit()
self.logger.info(f"✅ Transaction committed: {inserted} new messages inserted")
except Exception as e:
conn.rollback()
self.logger.error(f"❌ Transaction rolled back: {e}")
raise
finally:
conn.close()
except Exception as e:
self.logger.error(f"Ingestion failed: {e}", exc_info=True)
raise
finally:
self.stats.end_time = datetime.now()
return self.stats
def print_summary(self):
"""Print ingestion summary"""
self.logger.info("")
self.logger.info("="*60)
self.logger.info("Ingestion Summary")
self.logger.info("="*60)
self.logger.info(f"Total messages processed: {self.stats.total_messages}")
self.logger.info(f"Already in database: {self.stats.already_exists}")
self.logger.info(f"Newly inserted: {self.stats.newly_inserted}")
self.logger.info(f"Errors: {self.stats.errors}")
self.logger.info(f"Duration: {self.stats.duration_seconds:.2f}s")
self.logger.info("="*60)
============================================================================
MAIN EXECUTION
============================================================================
def main(): """Main entry point""" args = parse_args()
# Setup paths
script_dir = Path(__file__).parent
base_dir = script_dir.parent
dedup_state_dir = base_dir / "dedup_state"
jsonl_path = dedup_state_dir / "unique_messages.jsonl"
db_path = base_dir / "knowledge.db"
log_dir = base_dir / "logs"
# Setup logging
logger = setup_logging(log_dir)
try:
# Create ingester
ingester = KnowledgeDBIngester(db_path, logger)
# Run ingestion
stats = ingester.run(jsonl_path)
# Print summary
ingester.print_summary()
# Exit with appropriate code
if stats.errors > 0:
logger.warning(f"Completed with {stats.errors} errors")
return 1
else:
logger.info("✅ Ingestion completed successfully")
return 0
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
return 1
if name == "main": sys.exit(main())