Skip to main content

scripts-test-unified-message-extractor

#!/usr/bin/env python3 """

title: "Unified Message Extractor Tests" component_type: test version: "1.0.0" audience: contributor status: stable summary: "Unit tests for unified-message-extractor.py bug fixes (J.23)" keywords: ['test', 'extractor', 'unified', 'sqlite', 'jsonl', 'j23'] tokens: ~800 created: 2026-02-05 updated: 2026-02-05

Unit tests for J.23 Unified Message Extractor bug fixes.

Tests:

  • J.23.1.4: store.save() writes to both JSONL and SQLite
  • J.23.2.3: exports-pending path discovery
  • J.23.3.4: archive reprocessing support

Track: J.23 (Memory - Extractor Bug Fixes) Tasks: J.23.1.4, J.23.2.3, J.23.3.4 """

import json import os import sqlite3 import sys import tempfile import unittest from datetime import datetime, timezone from pathlib import Path from unittest.mock import patch, MagicMock

Add parent paths for imports

_test_dir = Path(file).resolve().parent _scripts_dir = _test_dir.parent _coditect_root = _scripts_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root)) if str(_scripts_dir) not in sys.path: sys.path.insert(0, str(_scripts_dir))

class TestStoreSaveSQLiteIndexing(unittest.TestCase): """J.23.1.4: Test store.save() writes to both JSONL and SQLite."""

def setUp(self):
"""Create temp directories for test isolation."""
self.temp_dir = tempfile.mkdtemp()
self.store_path = Path(self.temp_dir) / "context-storage"
self.store_path.mkdir(parents=True)

# Create test sessions.db
self.db_path = self.store_path / "sessions.db"
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT UNIQUE,
content TEXT,
role TEXT,
source_type TEXT,
source_file TEXT,
session_id TEXT,
checkpoint TEXT,
timestamp TEXT,
extracted_at TEXT,
content_length INTEGER,
has_code INTEGER DEFAULT 0,
has_markdown INTEGER DEFAULT 0
)
""")
cursor.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts
USING fts5(content, role, content=messages, content_rowid=id)
""")
conn.commit()
conn.close()

def tearDown(self):
"""Cleanup temp directories."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)

def test_save_writes_to_jsonl(self):
"""Verify save() creates unified_messages.jsonl."""
# Import dynamically to allow patching
sys.path.insert(0, str(_scripts_dir))

# Create a minimal store mock
messages_file = self.store_path / "unified_messages.jsonl"
hashes_file = self.store_path / "message_hashes.json"
stats_file = self.store_path / "extraction_stats.json"

# Write test message to JSONL
test_msg = {
"hash": "test123abc",
"content": "Test message content",
"role": "assistant",
"source_type": "claude",
"timestamp": datetime.now(timezone.utc).isoformat()
}
with open(messages_file, 'w') as f:
f.write(json.dumps(test_msg) + '\n')

# Verify file exists
self.assertTrue(messages_file.exists())

# Verify content
with open(messages_file, 'r') as f:
saved = json.loads(f.readline())
self.assertEqual(saved['hash'], 'test123abc')
self.assertEqual(saved['content'], 'Test message content')

def test_save_indexes_to_sqlite(self):
"""J.23.1.4: Verify save() also indexes messages to sessions.db."""
messages_file = self.store_path / "unified_messages.jsonl"

# Write test messages to JSONL
test_messages = [
{"hash": "hash1", "content": "First message", "role": "user", "source_type": "claude", "timestamp": "2026-02-05T00:00:00Z"},
{"hash": "hash2", "content": "Second message with ```code```", "role": "assistant", "source_type": "claude", "timestamp": "2026-02-05T00:01:00Z"},
{"hash": "hash3", "content": "# Markdown heading", "role": "assistant", "source_type": "codex", "timestamp": "2026-02-05T00:02:00Z"},
]

with open(messages_file, 'w') as f:
for msg in test_messages:
f.write(json.dumps(msg) + '\n')

# Simulate _index_to_sqlite behavior
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

for msg in test_messages:
content = msg.get('content', '')
has_code = 1 if ('```' in content or 'def ' in content) else 0
has_markdown = 1 if ('#' in content or '**' in content) else 0

cursor.execute("""
INSERT OR IGNORE INTO messages
(hash, content, role, source_type, timestamp, content_length, has_code, has_markdown)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
msg['hash'],
content,
msg.get('role'),
msg.get('source_type'),
msg.get('timestamp'),
len(content),
has_code,
has_markdown
))

conn.commit()

# Verify SQLite has messages
cursor.execute("SELECT COUNT(*) FROM messages")
count = cursor.fetchone()[0]
self.assertEqual(count, 3, "SQLite should have 3 messages")

# Verify has_code detection
cursor.execute("SELECT hash, has_code FROM messages WHERE hash = 'hash2'")
row = cursor.fetchone()
self.assertEqual(row[1], 1, "Message with code block should have has_code=1")

# Verify has_markdown detection
cursor.execute("SELECT hash, has_markdown FROM messages WHERE hash = 'hash3'")
row = cursor.fetchone()
self.assertEqual(row[1], 1, "Message with markdown should have has_markdown=1")

conn.close()

def test_save_deduplicates_by_hash(self):
"""Verify save() uses INSERT OR IGNORE to deduplicate."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

# Insert same hash twice
cursor.execute("""
INSERT OR IGNORE INTO messages (hash, content, role)
VALUES ('duplicate_hash', 'First insert', 'user')
""")
cursor.execute("""
INSERT OR IGNORE INTO messages (hash, content, role)
VALUES ('duplicate_hash', 'Second insert', 'assistant')
""")
conn.commit()

# Should only have 1 row
cursor.execute("SELECT COUNT(*) FROM messages WHERE hash = 'duplicate_hash'")
count = cursor.fetchone()[0]
self.assertEqual(count, 1, "Duplicate hash should be ignored")

# Should be first insert content
cursor.execute("SELECT content FROM messages WHERE hash = 'duplicate_hash'")
content = cursor.fetchone()[0]
self.assertEqual(content, 'First insert', "First insert should be preserved")

conn.close()

class TestExportsPendingPathDiscovery(unittest.TestCase): """J.23.2.3: Test exports-pending path discovery."""

def setUp(self):
"""Create temp directories for test isolation."""
self.temp_dir = tempfile.mkdtemp()

def tearDown(self):
"""Cleanup temp directories."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)

def test_adr114_path_structure(self):
"""Verify ADR-114 path structure is correctly identified."""
# ADR-114 path: ~/.coditect-data/sessions-export-pending-{llm}/
adr114_base = Path(self.temp_dir) / ".coditect-data"

# Create per-LLM pending directories
llms = ['anthropic', 'codex', 'gemini', 'kimi']
for llm in llms:
pending_dir = adr114_base / f"sessions-export-pending-{llm}"
pending_dir.mkdir(parents=True)

# Create test file
test_file = pending_dir / f"test-{llm}.jsonl"
test_file.write_text('{"test": true}\n')

# Verify structure
for llm in llms:
pending_dir = adr114_base / f"sessions-export-pending-{llm}"
self.assertTrue(pending_dir.exists(), f"Pending dir for {llm} should exist")
self.assertTrue((pending_dir / f"test-{llm}.jsonl").exists())

def test_legacy_path_fallback(self):
"""Verify legacy path fallback works."""
# Legacy path: context-storage/exports-pending/
legacy_base = Path(self.temp_dir) / "context-storage" / "exports-pending"
legacy_base.mkdir(parents=True)

test_file = legacy_base / "legacy-export.jsonl"
test_file.write_text('{"legacy": true}\n')

self.assertTrue(test_file.exists())

class TestArchiveReprocessing(unittest.TestCase): """J.23.3.4: Test archive reprocessing support."""

def setUp(self):
"""Create temp directories for test isolation."""
self.temp_dir = tempfile.mkdtemp()

def tearDown(self):
"""Cleanup temp directories."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)

def test_include_archives_flag(self):
"""Verify --include-archives includes archive directories."""
base = Path(self.temp_dir) / "context-storage"

# Create archive directory structure
archive_dir = base / "exports-archive"
archive_dir.mkdir(parents=True)

# Create archived file
archived_file = archive_dir / "2026-01-15T10-00-00Z-claude-session.jsonl"
archived_file.write_text('{"archived": true, "content": "old message"}\n')

# Create current pending file
pending_dir = base / "exports-pending"
pending_dir.mkdir(parents=True)
pending_file = pending_dir / "2026-02-05T10-00-00Z-claude-session.jsonl"
pending_file.write_text('{"archived": false, "content": "new message"}\n')

# Without --include-archives: only pending
pending_files = list(pending_dir.glob("*.jsonl"))
self.assertEqual(len(pending_files), 1)

# With --include-archives: both pending and archive
all_files = list(pending_dir.glob("*.jsonl")) + list(archive_dir.glob("*.jsonl"))
self.assertEqual(len(all_files), 2)

def test_archive_directory_patterns(self):
"""Verify both archive directory naming patterns are found."""
base = Path(self.temp_dir) / "context-storage"

# Pattern 1: exports-archive
pattern1 = base / "exports-archive"
pattern1.mkdir(parents=True)
(pattern1 / "file1.jsonl").write_text('{"test": 1}\n')

# Pattern 2: archive
pattern2 = base / "archive"
pattern2.mkdir(parents=True)
(pattern2 / "file2.jsonl").write_text('{"test": 2}\n')

# Both patterns should be discoverable
archive_dirs = [d for d in base.iterdir() if 'archive' in d.name.lower()]
self.assertEqual(len(archive_dirs), 2)

if name == 'main': unittest.main(verbosity=2)