Skip to main content

#!/usr/bin/env python3 """ CODITECT Ralph Wiggum Context Integration (H.8.1.7)

Integrates the checkpoint protocol with the /cx context extraction pipeline. Three integration surfaces:

  1. Context Enrichment: Populate ContextSummary from org.db when creating checkpoints. Pulls recent decisions, learnings, and error_solutions relevant to the task.

  2. Continuation Prompt Enhancement: Embed a compact context snapshot into the recovery.continuation_prompt so the next agent iteration starts with indexed knowledge.

  3. Checkpoint Provenance Tagging: Tag /cx extracted messages with the checkpoint ID that was active when they were generated, enabling traceability.

Usage: from scripts.core.ralph_wiggum.context_integration import ContextIntegration

ci = ContextIntegration()

# Enrich a ContextSummary from org.db
enriched = ci.enrich_context_summary(task_id="H.8.1.7", project_id="PILOT")

# Enhance continuation prompt with context snapshot
enhanced_prompt = ci.enhance_continuation_prompt(checkpoint)

# Tag messages with checkpoint provenance
ci.tag_checkpoint_provenance(checkpoint_id, message_ids)

Author: CODITECT Framework Version: 1.0.0 Created: February 17, 2026 Task: H.8.1.7 ADR Reference: ADR-108, ADR-114, ADR-118 """

import json import logging import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional

from .checkpoint_protocol import ( Checkpoint, CheckpointService, ContextSummary, HandoffProtocol, )

logger = logging.getLogger(name)

class ContextIntegration: """ Bridges Ralph Wiggum checkpoints with the /cx context extraction pipeline.

Provides three integration surfaces:
- enrich_context_summary(): Pull decisions/learnings from org.db into ContextSummary
- enhance_continuation_prompt(): Add context snapshot to handoff prompts
- tag_checkpoint_provenance(): Mark /cx messages with checkpoint reference
"""

def __init__(
self,
org_db_path: Optional[Path] = None,
sessions_db_path: Optional[Path] = None,
):
"""
Initialize context integration.

Args:
org_db_path: Path to org.db (decisions, learnings). Auto-discovered if None.
sessions_db_path: Path to sessions.db (messages). Auto-discovered if None.
"""
self._org_db_path = org_db_path
self._sessions_db_path = sessions_db_path

# Lazy-resolve paths on first use
self._org_db_resolved = False
self._sessions_db_resolved = False

def _resolve_org_db(self) -> Optional[Path]:
"""Resolve org.db path using ADR-118 path discovery."""
if self._org_db_resolved:
return self._org_db_path
self._org_db_resolved = True

if self._org_db_path and self._org_db_path.exists():
return self._org_db_path

try:
import sys
scripts_core = Path(__file__).parent.parent
if str(scripts_core) not in sys.path:
sys.path.insert(0, str(scripts_core))
from paths import get_org_db_path
self._org_db_path = get_org_db_path()
except (ImportError, Exception) as e:
logger.debug(f"Path discovery unavailable: {e}")
# Fallback
fallback = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "org.db"
if fallback.exists():
self._org_db_path = fallback

return self._org_db_path

def _resolve_sessions_db(self) -> Optional[Path]:
"""Resolve sessions.db path using ADR-118 path discovery."""
if self._sessions_db_resolved:
return self._sessions_db_path
self._sessions_db_resolved = True

if self._sessions_db_path and self._sessions_db_path.exists():
return self._sessions_db_path

try:
import sys
scripts_core = Path(__file__).parent.parent
if str(scripts_core) not in sys.path:
sys.path.insert(0, str(scripts_core))
from paths import get_sessions_db_path
self._sessions_db_path = get_sessions_db_path()
except (ImportError, Exception) as e:
logger.debug(f"Path discovery unavailable: {e}")
fallback = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db"
if fallback.exists():
self._sessions_db_path = fallback

return self._sessions_db_path

# =========================================================================
# 1. CONTEXT ENRICHMENT
# =========================================================================

def enrich_context_summary(
self,
task_id: str = "",
project_id: str = "",
limit: int = 10,
existing_summary: Optional[ContextSummary] = None,
) -> ContextSummary:
"""
Enrich a ContextSummary by querying org.db for relevant decisions,
learnings, and error solutions.

Args:
task_id: Task ID to filter relevance (e.g., "H.8.1.7")
project_id: Project ID for scoping (e.g., "PILOT")
limit: Max items per category
existing_summary: Existing summary to extend (preserves existing items)

Returns:
ContextSummary populated with decisions as key_decisions,
learnings as assumptions, error_solutions as constraints,
and external dependencies from context.
"""
summary = existing_summary or ContextSummary()
org_db = self._resolve_org_db()

if not org_db or not org_db.exists():
logger.warning("org.db not available — returning summary as-is")
return summary

try:
conn = sqlite3.connect(str(org_db))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()

# Pull recent decisions
decisions = self._query_decisions(cursor, task_id, project_id, limit)
for d in decisions:
entry = f"{d['title']}: {d['rationale']}" if d.get('rationale') else d['title']
if entry not in summary.key_decisions:
summary.key_decisions.append(entry)

# Pull learnings as assumptions/knowledge
learnings = self._query_learnings(cursor, task_id, project_id, limit)
for l in learnings:
entry = l.get('learning', l.get('title', ''))
if entry and entry not in summary.assumptions:
summary.assumptions.append(entry)

# Pull error solutions as constraints (things to avoid)
errors = self._query_error_solutions(cursor, task_id, project_id, limit)
for e in errors:
entry = f"Avoid: {e['error_type']} — Solution: {e['solution']}"
if entry not in summary.constraints:
summary.constraints.append(entry)

conn.close()
logger.info(
f"Enriched context: {len(decisions)} decisions, "
f"{len(learnings)} learnings, {len(errors)} error solutions"
)

except sqlite3.Error as e:
logger.warning(f"Failed to query org.db: {e}")

return summary

def _query_decisions(
self, cursor: sqlite3.Cursor, task_id: str, project_id: str, limit: int
) -> List[Dict[str, Any]]:
"""Query recent decisions from org.db."""
try:
# Try project-scoped first, fall back to global
if project_id:
cursor.execute(
"""SELECT title, rationale, created_at
FROM decisions
WHERE (project_id = ? OR scope = 'global')
ORDER BY created_at DESC LIMIT ?""",
(project_id, limit),
)
else:
cursor.execute(
"""SELECT title, rationale, created_at
FROM decisions
ORDER BY created_at DESC LIMIT ?""",
(limit,),
)
return [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError:
# Table may not exist
return []

def _query_learnings(
self, cursor: sqlite3.Cursor, task_id: str, project_id: str, limit: int
) -> List[Dict[str, Any]]:
"""Query recent learnings from org.db."""
try:
if project_id:
cursor.execute(
"""SELECT learning, category, created_at
FROM skill_learnings
WHERE (project_id = ? OR scope = 'global')
ORDER BY created_at DESC LIMIT ?""",
(project_id, limit),
)
else:
cursor.execute(
"""SELECT learning, category, created_at
FROM skill_learnings
ORDER BY created_at DESC LIMIT ?""",
(limit,),
)
return [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError:
return []

def _query_error_solutions(
self, cursor: sqlite3.Cursor, task_id: str, project_id: str, limit: int
) -> List[Dict[str, Any]]:
"""Query recent error solutions from org.db."""
try:
if project_id:
cursor.execute(
"""SELECT error_type, solution, success_count
FROM error_solutions
WHERE (project_id = ? OR scope = 'global')
ORDER BY success_count DESC LIMIT ?""",
(project_id, limit),
)
else:
cursor.execute(
"""SELECT error_type, solution, success_count
FROM error_solutions
ORDER BY success_count DESC LIMIT ?""",
(limit,),
)
return [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError:
return []

# =========================================================================
# 2. CONTINUATION PROMPT ENHANCEMENT
# =========================================================================

def enhance_continuation_prompt(
self,
checkpoint: Checkpoint,
include_snapshot: bool = True,
include_db_context: bool = True,
project_id: str = "",
snapshot_token_budget: int = 200,
) -> str:
"""
Generate an enhanced continuation prompt that includes:
- Standard HandoffProtocol prompt (completed/pending/blocked work)
- Context snapshot with track progress and priorities
- Relevant decisions and learnings from org.db

Args:
checkpoint: The checkpoint to generate prompt for
include_snapshot: Whether to include context snapshot section
include_db_context: Whether to include org.db decisions/learnings
project_id: Project ID for context scoping
snapshot_token_budget: Token budget for snapshot section

Returns:
Enhanced continuation prompt string
"""
# Start with the standard continuation prompt
base_prompt = HandoffProtocol.generate_continuation_prompt(checkpoint)

sections = [base_prompt]

# Add context snapshot
if include_snapshot:
snapshot_section = self._generate_snapshot_section(snapshot_token_budget)
if snapshot_section:
sections.append(snapshot_section)

# Add org.db context
if include_db_context:
db_section = self._generate_db_context_section(
checkpoint.metadata.task_id, project_id
)
if db_section:
sections.append(db_section)

return "\n\n".join(sections)

def _generate_snapshot_section(self, token_budget: int = 200) -> str:
"""Generate compact context snapshot section."""
try:
# Import context_snapshot module
import sys
scripts_dir = Path(__file__).parent.parent.parent
if str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
from context_snapshot import generate_snapshot

snapshot = generate_snapshot(token_budget)

parts = ["### Project Context Snapshot"]

tracks = snapshot.get("tracks", {})
if tracks:
track_line = ", ".join(
f"{t}:{d['pct']}%" for t, d in sorted(tracks.items())
)
parts.append(f"**Tracks:** {track_line}")

recent = snapshot.get("recent", [])
if recent:
parts.append("**Recent:** " + "; ".join(recent[:3]))

next_items = snapshot.get("next", [])
if next_items:
parts.append("**Next:** " + "; ".join(next_items[:3]))

return "\n".join(parts) if len(parts) > 1 else ""

except (ImportError, Exception) as e:
logger.debug(f"Context snapshot unavailable: {e}")
return ""

def _generate_db_context_section(
self, task_id: str, project_id: str
) -> str:
"""Generate org.db context section for continuation prompt."""
org_db = self._resolve_org_db()
if not org_db or not org_db.exists():
return ""

try:
conn = sqlite3.connect(str(org_db))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()

parts = ["### Relevant Knowledge (from org.db)"]

# Recent decisions (compact)
decisions = self._query_decisions(cursor, task_id, project_id, 5)
if decisions:
parts.append("**Decisions:**")
for d in decisions[:3]:
parts.append(f"- {d['title']}")

# Recent error solutions (compact)
errors = self._query_error_solutions(cursor, task_id, project_id, 5)
if errors:
parts.append("**Known Issues:**")
for e in errors[:3]:
parts.append(f"- {e['error_type']}: {e['solution']}")

conn.close()

return "\n".join(parts) if len(parts) > 1 else ""

except sqlite3.Error as e:
logger.debug(f"Failed to query org.db for prompt: {e}")
return ""

# =========================================================================
# 3. CHECKPOINT PROVENANCE TAGGING
# =========================================================================

def tag_checkpoint_provenance(
self,
checkpoint_id: str,
task_id: str,
session_id: str = "",
) -> int:
"""
Tag /cx extracted messages with checkpoint provenance.

Updates messages in sessions.db to include the checkpoint_id in their
provenance metadata, enabling traceability from checkpoint to messages.

Args:
checkpoint_id: The checkpoint ID to tag with
task_id: The task ID associated with the checkpoint
session_id: Optional session ID to scope the tagging

Returns:
Number of messages tagged
"""
sessions_db = self._resolve_sessions_db()
if not sessions_db or not sessions_db.exists():
logger.warning("sessions.db not available — cannot tag provenance")
return 0

try:
conn = sqlite3.connect(str(sessions_db))
cursor = conn.cursor()

# Check if messages table has a provenance/metadata column
cursor.execute("PRAGMA table_info(messages)")
columns = {row[1] for row in cursor.fetchall()}

if "metadata" not in columns and "provenance" not in columns:
logger.info("messages table lacks metadata/provenance column — skipping")
conn.close()
return 0

meta_col = "provenance" if "provenance" in columns else "metadata"

# Tag recent messages (last 5 minutes) with checkpoint provenance
now = datetime.now(timezone.utc).isoformat()
provenance = json.dumps({
"checkpoint_id": checkpoint_id,
"task_id": task_id,
"tagged_at": now,
})

if session_id:
cursor.execute(
f"""UPDATE messages
SET {meta_col} = ?
WHERE session_id = ?
AND ({meta_col} IS NULL OR {meta_col} = '')""",
(provenance, session_id),
)
else:
# Tag untagged messages from the last 5 minutes
cursor.execute(
f"""UPDATE messages
SET {meta_col} = ?
WHERE ({meta_col} IS NULL OR {meta_col} = '')
AND datetime(created_at) >= datetime('now', '-5 minutes')""",
(provenance,),
)

tagged = cursor.rowcount
conn.commit()
conn.close()

logger.info(f"Tagged {tagged} messages with checkpoint {checkpoint_id}")
return tagged

except sqlite3.Error as e:
logger.warning(f"Failed to tag provenance: {e}")
return 0

# =========================================================================
# 4. CHECKPOINT INDEXING FOR /cx SEARCH (H.8.1.7)
# =========================================================================

def index_checkpoints_for_search(
self,
dry_run: bool = False,
) -> Dict[str, Any]:
"""
Index all un-indexed checkpoints into checkpoint_index + FTS5 tables
in sessions.db, making them searchable via /cxq.

Also inserts lightweight cross-references into the messages table so
bare /cxq searches surface checkpoint data.

Args:
dry_run: If True, count checkpoints but don't index.

Returns:
Dict with success, checkpoints_found, checkpoints_indexed,
checkpoints_total, error.
"""
result = {
"success": False,
"checkpoints_found": 0,
"checkpoints_indexed": 0,
"checkpoints_total": 0,
"error": None,
}

sessions_db = self._resolve_sessions_db()
if not sessions_db or not sessions_db.exists():
result["error"] = "sessions.db not available"
return result

try:
conn = sqlite3.connect(str(sessions_db), timeout=30)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
cursor = conn.cursor()

# Check if checkpoints table exists
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='checkpoints'"
)
if not cursor.fetchone():
# No checkpoints table — nothing to index
result["success"] = True
conn.close()
return result

# Create checkpoint_index table
cursor.execute("""
CREATE TABLE IF NOT EXISTS checkpoint_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
checkpoint_id TEXT UNIQUE NOT NULL,
task_id TEXT NOT NULL,
agent_id TEXT,
agent_type TEXT,
phase TEXT,
iteration INTEGER,
completed_items_text TEXT,
pending_items_text TEXT,
blocked_items_text TEXT,
current_focus TEXT,
context_summary_text TEXT,
continuation_prompt TEXT,
files_modified_text TEXT,
tokens_consumed INTEGER,
tools_invoked INTEGER,
created_at TEXT,
indexed_at TEXT NOT NULL
)
""")

# Create indexes
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_ci_task ON checkpoint_index(task_id)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_ci_phase ON checkpoint_index(phase)"
)

# Create FTS5 virtual table
try:
cursor.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS checkpoint_fts
USING fts5(
task_id, phase, completed_items_text, pending_items_text,
blocked_items_text, context_summary_text, continuation_prompt,
content=checkpoint_index, content_rowid=id
)
""")
except sqlite3.OperationalError:
pass # Already exists with different schema

conn.commit()

# Find un-indexed checkpoints
cursor.execute("""
SELECT c.id, c.task_id, c.agent_id, c.agent_type, c.phase,
c.iteration, c.completed_items, c.pending_items,
c.blocked_items, c.current_focus, c.context_summary,
c.continuation_prompt, c.files_modified,
c.tokens_consumed, c.tools_invoked, c.created_at
FROM checkpoints c
LEFT JOIN checkpoint_index ci ON ci.checkpoint_id = c.id
WHERE ci.checkpoint_id IS NULL
""")
unindexed = cursor.fetchall()
result["checkpoints_found"] = len(unindexed)

# Get total count
cursor.execute("SELECT COUNT(*) FROM checkpoints")
result["checkpoints_total"] = cursor.fetchone()[0]

if dry_run:
result["success"] = True
conn.close()
return result

now_iso = datetime.now(timezone.utc).isoformat()
indexed_count = 0

for row in unindexed:
(cp_id, task_id, agent_id, agent_type, phase,
iteration, completed_json, pending_json,
blocked_json, current_focus, summary_json,
cont_prompt, files_json,
tokens, tools, created_at) = row

# Flatten JSON arrays to searchable text
completed_text = self._flatten_json_array(completed_json)
pending_text = self._flatten_json_array(pending_json)
blocked_text = self._flatten_json_array(blocked_json)
files_text = self._flatten_json_array(files_json)
summary_text = self._flatten_context_summary(summary_json)

# Insert into checkpoint_index
cursor.execute("""
INSERT OR IGNORE INTO checkpoint_index
(checkpoint_id, task_id, agent_id, agent_type, phase,
iteration, completed_items_text, pending_items_text,
blocked_items_text, current_focus, context_summary_text,
continuation_prompt, files_modified_text,
tokens_consumed, tools_invoked, created_at, indexed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
cp_id, task_id, agent_id, agent_type, phase,
iteration, completed_text, pending_text,
blocked_text, current_focus, summary_text,
cont_prompt or '', files_text,
tokens or 0, tools or 0, created_at, now_iso,
))

# Insert cross-reference into messages table for unified /cxq
import hashlib
msg_hash = hashlib.sha256(
f"checkpoint:{cp_id}".encode()
).hexdigest()
phase_str = phase or "unknown"
msg_content = (
f"[Checkpoint] {task_id} iteration {iteration or 1} "
f"phase={phase_str} agent={agent_id}\n"
f"Completed: {completed_text[:300]}\n"
f"Pending: {pending_text[:300]}\n"
f"Focus: {current_focus or 'none'}"
)
cursor.execute("""
INSERT OR IGNORE INTO messages
(hash, content, role, source_type, source_file,
session_id, checkpoint, timestamp, extracted_at,
content_length, has_code, has_markdown)
VALUES (?, ?, 'checkpoint', 'checkpoint', '',
'', ?, ?, ?, ?, 0, 0)
""", (
msg_hash,
msg_content,
cp_id,
created_at or '',
now_iso,
len(msg_content),
))

indexed_count += 1

conn.commit()

# Rebuild FTS5 index if entries were added
if indexed_count > 0:
try:
cursor.execute(
"INSERT INTO checkpoint_fts(checkpoint_fts) VALUES('rebuild')"
)
conn.commit()
except sqlite3.OperationalError:
pass

conn.close()

result["success"] = True
result["checkpoints_indexed"] = indexed_count
logger.info(
f"Indexed {indexed_count} checkpoints "
f"({result['checkpoints_total']} total)"
)

except sqlite3.Error as e:
result["error"] = str(e)
logger.warning(f"Checkpoint indexing failed: {e}")

return result

@staticmethod
def _flatten_json_array(json_str: Optional[str]) -> str:
"""Flatten a JSON array to a newline-separated text string."""
if not json_str:
return ""
try:
data = json.loads(json_str)
if isinstance(data, list):
return "\n".join(str(item) for item in data)
return str(data)
except (json.JSONDecodeError, TypeError):
return str(json_str) if json_str else ""

@staticmethod
def _flatten_context_summary(json_str: Optional[str]) -> str:
"""Flatten a ContextSummary JSON to searchable text."""
if not json_str:
return ""
try:
data = json.loads(json_str)
if isinstance(data, dict):
parts = []
for key in ("key_decisions", "assumptions", "constraints",
"external_dependencies"):
items = data.get(key, [])
if items:
parts.append(f"{key}: {'; '.join(str(i) for i in items)}")
return "\n".join(parts) if parts else str(data)
return str(data)
except (json.JSONDecodeError, TypeError):
return str(json_str) if json_str else ""

# =========================================================================
# CONVENIENCE: ENRICHED CHECKPOINT CREATION
# =========================================================================

def create_enriched_checkpoint(
self,
service: CheckpointService,
task_id: str,
agent_id: str,
execution_state: "ExecutionState",
project_id: str = "",
metrics: Optional["CheckpointMetrics"] = None,
agent_type: str = "implementation",
iteration: int = 1,
) -> Checkpoint:
"""
Create a checkpoint with context-enriched summary and enhanced prompt.

Combines:
1. Standard CheckpointService.create_checkpoint()
2. Context enrichment from org.db
3. Enhanced continuation prompt with snapshot + knowledge

Args:
service: CheckpointService instance
task_id: Task identifier
agent_id: Agent identifier
execution_state: Current execution state
project_id: Project ID for context scoping
metrics: Optional checkpoint metrics
agent_type: Type of agent
iteration: Loop iteration number

Returns:
Checkpoint with enriched context and enhanced prompt
"""
from .checkpoint_protocol import ExecutionState as ES, CheckpointMetrics as CM

# Step 1: Enrich context summary from org.db
enriched_summary = self.enrich_context_summary(
task_id=task_id,
project_id=project_id,
)

# Step 2: Create checkpoint with enriched summary
checkpoint = service.create_checkpoint(
task_id=task_id,
agent_id=agent_id,
execution_state=execution_state,
context_summary=enriched_summary,
metrics=metrics,
agent_type=agent_type,
iteration=iteration,
)

# Step 3: Replace continuation prompt with enhanced version
enhanced_prompt = self.enhance_continuation_prompt(
checkpoint=checkpoint,
project_id=project_id,
)
checkpoint.recovery.continuation_prompt = enhanced_prompt

# Step 4: Recompute hash (prompt changed)
checkpoint.compliance.hash = checkpoint.compute_hash()

# Step 5: Re-save with updated prompt and hash
service._save_checkpoint(checkpoint)

# Step 6: Tag recent messages with checkpoint provenance
self.tag_checkpoint_provenance(
checkpoint_id=checkpoint.metadata.checkpoint_id,
task_id=task_id,
)

logger.info(
f"Created enriched checkpoint {checkpoint.metadata.checkpoint_id} "
f"for task {task_id}"
)

return checkpoint

=============================================================================

EXPORTS

=============================================================================

all = [ "ContextIntegration", ]