Skip to main content

scripts-test-memory-retrieval

#!/usr/bin/env python3 """

title: "Add scripts directory to path" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Test Suite for Memory Retrieval System" keywords: ['api', 'database', 'memory', 'retrieval', 'test'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "test-memory-retrieval.py" language: python executable: true usage: "python3 scripts/test-memory-retrieval.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Test Suite for Memory Retrieval System

Tests the memory-context-agent, /recall command, and memory-retrieval.py script.

Usage: python3 scripts/test-memory-retrieval.py python3 scripts/test-memory-retrieval.py -v # Verbose python3 scripts/test-memory-retrieval.py --integration # Include integration tests

Version: 1.0.0 """

import argparse import json import os import subprocess import sys import tempfile import unittest from pathlib import Path from unittest.mock import patch, MagicMock

Add scripts directory to path

SCRIPT_DIR = Path(file).parent sys.path.insert(0, str(SCRIPT_DIR)) sys.path.insert(0, str(SCRIPT_DIR / "core"))

ADR-118: Four-Tier Database Architecture

Tests should use sessions.db (Tier 3) for session/message data

try: from paths import SESSIONS_DB TEST_DB_PATH = SESSIONS_DB except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): TEST_DB_PATH = _user_data / "sessions.db" else: TEST_DB_PATH = SCRIPT_DIR.parent / "context-storage" / "sessions.db"

class TestMemoryRetrievalUnit(unittest.TestCase): """Unit tests for memory retrieval functions."""

def setUp(self):
"""Set up test fixtures."""
self.test_messages = [
{
'content': 'Working on authentication refactor',
'role': 'user',
'timestamp': '2025-12-12T10:00:00Z'
},
{
'content': 'Blocked by missing API keys',
'role': 'assistant',
'timestamp': '2025-12-12T11:00:00Z'
},
{
'content': 'Completed the database migration',
'role': 'user',
'timestamp': '2025-12-11T15:00:00Z'
}
]

def test_estimate_tokens(self):
"""Test token estimation."""
try:
from scripts import memory_retrieval
# Rough estimate: 4 chars per token
text = "This is a test string with about 40 characters."
tokens = memory_retrieval.estimate_tokens(text)
# Should be roughly len(text) / 4
self.assertGreater(tokens, 5)
self.assertLess(tokens, 20)
except ImportError:
# Module not importable directly, test the calculation
text = "This is a test string"
estimated = len(text) // 4
self.assertEqual(estimated, 5)

def test_truncate_to_budget(self):
"""Test budget truncation."""
long_text = "Line 1\n" * 1000 # Very long text
budget = 100 # Small budget

# Simulate truncation
char_limit = budget * 4
truncated = long_text[:char_limit]

self.assertLessEqual(len(truncated), char_limit)

def test_work_status_extraction_patterns(self):
"""Test work status keyword detection."""
status_keywords = {
'in_progress': ['working on', 'implementing', 'in progress'],
'blocked': ['blocked', 'waiting for', 'stuck on'],
'completed': ['completed', 'finished', 'done']
}

test_cases = [
('working on authentication', 'in_progress'),
('blocked by API keys', 'blocked'),
('completed the migration', 'completed'),
('implementing new feature', 'in_progress'),
('waiting for approval', 'blocked'),
('done with testing', 'completed'),
]

for content, expected_status in test_cases:
found_status = None
for status, keywords in status_keywords.items():
for keyword in keywords:
if keyword in content.lower():
found_status = status
break
if found_status:
break

self.assertEqual(
found_status, expected_status,
f"Failed for '{content}': expected {expected_status}, got {found_status}"
)

class TestMemoryRetrievalCLI(unittest.TestCase): """CLI tests for memory-retrieval.py script."""

def setUp(self):
"""Set up test fixtures."""
self.script_path = SCRIPT_DIR / 'memory-retrieval.py'

def test_help_flag(self):
"""Test --help flag works."""
result = subprocess.run(
[sys.executable, str(self.script_path), '--help'],
capture_output=True,
text=True
)
self.assertEqual(result.returncode, 0)
self.assertIn('Memory Retrieval', result.stdout)
self.assertIn('--budget', result.stdout)

def test_missing_database_error(self):
"""Test error when database doesn't exist."""
# Use a temp directory without database
with tempfile.TemporaryDirectory() as tmpdir:
env = os.environ.copy()
env['CODITECT_CONTEXT_DB'] = os.path.join(tmpdir, 'nonexistent.db')

result = subprocess.run(
[sys.executable, str(self.script_path), 'test topic'],
capture_output=True,
text=True,
cwd=tmpdir
)

# Should either error or handle gracefully
# (exact behavior depends on database check implementation)
self.assertIn(
result.returncode in [0, 1],
[True],
"Should return 0 or 1"
)

class TestMemoryRetrievalIntegration(unittest.TestCase): """Integration tests (require database)."""

@classmethod
def setUpClass(cls):
"""Check if database exists for integration tests.

ADR-118: Uses sessions.db (Tier 3) for session/message data.
"""
cls.db_path = TEST_DB_PATH
cls.db_exists = cls.db_path.exists()

def setUp(self):
"""Skip if no database."""
if not self.db_exists:
self.skipTest(f"Database not found at {self.db_path} - run /cx first")

def test_status_retrieval(self):
"""Test --status flag retrieves work status."""
script_path = SCRIPT_DIR / 'memory-retrieval.py'
result = subprocess.run(
[sys.executable, str(script_path), '--status'],
capture_output=True,
text=True
)

# Should succeed
self.assertEqual(result.returncode, 0)
# Should have some output
self.assertTrue(len(result.stdout) > 0 or len(result.stderr) > 0)

def test_json_output(self):
"""Test --json flag produces valid JSON."""
script_path = SCRIPT_DIR / 'memory-retrieval.py'
result = subprocess.run(
[sys.executable, str(script_path), '--status', '--json'],
capture_output=True,
text=True
)

if result.returncode == 0 and result.stdout.strip():
try:
data = json.loads(result.stdout)
self.assertIsInstance(data, (list, dict))
except json.JSONDecodeError:
# Empty or non-JSON output is acceptable
pass

def test_budget_respected(self):
"""Test that token budget is respected."""
script_path = SCRIPT_DIR / 'memory-retrieval.py'

# Small budget
result = subprocess.run(
[sys.executable, str(script_path), 'test', '--budget', '100'],
capture_output=True,
text=True
)

if result.returncode == 0:
# Output should be reasonably small
# (100 tokens * 4 chars = 400 chars max, plus overhead)
self.assertLess(len(result.stdout), 2000)

class TestHookConfiguration(unittest.TestCase): """Test hook configuration and parsing."""

def test_budget_map(self):
"""Test budget mapping values."""
budget_map = {
'minimal': 500,
'standard': 2000,
'comprehensive': 5000
}

self.assertEqual(budget_map['minimal'], 500)
self.assertEqual(budget_map['standard'], 2000)
self.assertEqual(budget_map['comprehensive'], 5000)

def test_relevance_thresholds(self):
"""Test relevance threshold values."""
HIGH_RELEVANCE = 0.8
MEDIUM_RELEVANCE = 0.5
MIN_RELEVANCE = 0.3

# Thresholds should be ordered correctly
self.assertGreater(HIGH_RELEVANCE, MEDIUM_RELEVANCE)
self.assertGreater(MEDIUM_RELEVANCE, MIN_RELEVANCE)
self.assertGreater(MIN_RELEVANCE, 0)

def test_freshness_weights(self):
"""Test freshness weight values."""
FRESHNESS_TODAY = 1.0
FRESHNESS_WEEK = 0.8
FRESHNESS_MONTH = 0.5
FRESHNESS_OLD = 0.3

# Weights should decrease over time
self.assertGreater(FRESHNESS_TODAY, FRESHNESS_WEEK)
self.assertGreater(FRESHNESS_WEEK, FRESHNESS_MONTH)
self.assertGreater(FRESHNESS_MONTH, FRESHNESS_OLD)

class TestAgentDefinition(unittest.TestCase): """Test agent definition file."""

def setUp(self):
"""Load agent definition."""
self.agent_path = SCRIPT_DIR.parent / 'agents' / 'memory-context-agent.md'

def test_agent_file_exists(self):
"""Test agent definition file exists."""
self.assertTrue(
self.agent_path.exists(),
f"Agent file not found: {self.agent_path}"
)

def test_agent_has_required_fields(self):
"""Test agent has required frontmatter fields."""
if not self.agent_path.exists():
self.skipTest("Agent file not found")

content = self.agent_path.read_text()

# Check for required fields
required_fields = ['name:', 'description:', 'tools:', 'model:']
for field in required_fields:
self.assertIn(
field, content,
f"Agent missing required field: {field}"
)

def test_agent_name_correct(self):
"""Test agent has correct name."""
if not self.agent_path.exists():
self.skipTest("Agent file not found")

content = self.agent_path.read_text()
self.assertIn('name: memory-context-agent', content)

class TestCommandDefinition(unittest.TestCase): """Test command definition file."""

def setUp(self):
"""Load command definition."""
self.command_path = SCRIPT_DIR.parent / 'commands' / 'recall.md'

def test_command_file_exists(self):
"""Test command definition file exists."""
self.assertTrue(
self.command_path.exists(),
f"Command file not found: {self.command_path}"
)

def test_command_has_required_fields(self):
"""Test command has required frontmatter fields."""
if not self.command_path.exists():
self.skipTest("Command file not found")

content = self.command_path.read_text()

required_fields = ['name:', 'description:', 'category:']
for field in required_fields:
self.assertIn(
field, content,
f"Command missing required field: {field}"
)

class TestSkillDefinition(unittest.TestCase): """Test skill definition file."""

def setUp(self):
"""Load skill definition."""
self.skill_path = SCRIPT_DIR.parent / 'skills' / 'memory-retrieval' / 'SKILL.md'

def test_skill_file_exists(self):
"""Test skill definition file exists."""
self.assertTrue(
self.skill_path.exists(),
f"Skill file not found: {self.skill_path}"
)

def test_skill_has_required_fields(self):
"""Test skill has required frontmatter fields."""
if not self.skill_path.exists():
self.skipTest("Skill file not found")

content = self.skill_path.read_text()

required_fields = ['name:', 'description:', 'allowed-tools:']
for field in required_fields:
self.assertIn(
field, content,
f"Skill missing required field: {field}"
)

def run_tests(verbose: bool = False, integration: bool = False) -> int: """Run the test suite.""" loader = unittest.TestLoader() suite = unittest.TestSuite()

# Always run unit tests
suite.addTests(loader.loadTestsFromTestCase(TestMemoryRetrievalUnit))
suite.addTests(loader.loadTestsFromTestCase(TestMemoryRetrievalCLI))
suite.addTests(loader.loadTestsFromTestCase(TestHookConfiguration))
suite.addTests(loader.loadTestsFromTestCase(TestAgentDefinition))
suite.addTests(loader.loadTestsFromTestCase(TestCommandDefinition))
suite.addTests(loader.loadTestsFromTestCase(TestSkillDefinition))

# Optionally run integration tests
if integration:
suite.addTests(loader.loadTestsFromTestCase(TestMemoryRetrievalIntegration))

# Run tests
verbosity = 2 if verbose else 1
runner = unittest.TextTestRunner(verbosity=verbosity)
result = runner.run(suite)

# Return exit code
return 0 if result.wasSuccessful() else 1

def main(): parser = argparse.ArgumentParser(description='Test memory retrieval system') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') parser.add_argument('--integration', action='store_true', help='Include integration tests (requires database)')

args = parser.parse_args()

print("=" * 60)
print("Memory Retrieval System - Test Suite")
print("=" * 60)
print()

exit_code = run_tests(args.verbose, args.integration)

print()
print("=" * 60)
if exit_code == 0:
print("All tests passed!")
else:
print("Some tests failed.")
print("=" * 60)

sys.exit(exit_code)

if name == 'main': main()