Skip to main content

#!/usr/bin/env python3 """ ADR-151 Context Graph LangGraph Checkpointer (Phase 5: CP-34, CP-35)

Provides a LangGraph-compatible checkpointer interface for context graphs. Enables state persistence and retrieval during agent orchestration.

Interface: - ContextGraphCheckpointer: Abstract base class - SQLiteContextGraphCheckpointer: SQLite-backed implementation

LangGraph Integration: - Compatible with langgraph.checkpoint.base.BaseCheckpointSaver - Stores context graphs as checkpoints for agent state - Enables per-turn context refresh in multi-agent workflows

Created: 2026-02-03 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.25.5 (CP-34, CP-35) """

import json import logging import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Tuple

try: from scripts.context_graph.algorithms import ContextGraph, GraphNode, GraphEdge except ImportError: # Allow import when running standalone from algorithms import ContextGraph, GraphNode, GraphEdge

logger = logging.getLogger(name)

=============================================================================

CP-34: Checkpointer Interface

=============================================================================

class ContextGraphCheckpointer(ABC): """ Abstract base class for context graph checkpointing.

Compatible with LangGraph checkpoint patterns for integration
with agent orchestration workflows.

Checkpoint Structure:
- thread_id: Unique identifier for conversation/workflow
- checkpoint_id: Unique identifier for this checkpoint
- context_graph: Serialized ContextGraph
- metadata: Additional checkpoint metadata
"""

@abstractmethod
def put(
self,
thread_id: str,
checkpoint_id: str,
context_graph: ContextGraph,
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""
Store a context graph checkpoint.

Args:
thread_id: Conversation/workflow identifier
checkpoint_id: Unique checkpoint identifier
context_graph: The context graph to store
metadata: Optional metadata (agent_id, turn_number, etc.)

Returns:
The checkpoint_id used for storage
"""
pass

@abstractmethod
def get(
self,
thread_id: str,
checkpoint_id: Optional[str] = None,
) -> Optional[ContextGraph]:
"""
Retrieve a context graph checkpoint.

Args:
thread_id: Conversation/workflow identifier
checkpoint_id: Specific checkpoint (None = latest)

Returns:
ContextGraph if found, None otherwise
"""
pass

@abstractmethod
def list(
self,
thread_id: str,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
List checkpoints for a thread.

Args:
thread_id: Conversation/workflow identifier
limit: Maximum checkpoints to return

Returns:
List of checkpoint metadata dicts
"""
pass

@abstractmethod
def delete(
self,
thread_id: str,
checkpoint_id: Optional[str] = None,
) -> int:
"""
Delete checkpoints.

Args:
thread_id: Conversation/workflow identifier
checkpoint_id: Specific checkpoint (None = all for thread)

Returns:
Number of checkpoints deleted
"""
pass

=============================================================================

CP-35: SQLite Checkpointer Adapter

=============================================================================

class SQLiteContextGraphCheckpointer(ContextGraphCheckpointer): """ SQLite-backed context graph checkpointer.

Stores checkpoints in a dedicated table within sessions.db,
compatible with ADR-118 four-tier architecture.

Table: context_graph_checkpoints
- thread_id, checkpoint_id (composite key)
- context_graph_json (serialized ContextGraph)
- metadata_json (optional metadata)
- created_at (timestamp)
"""

SCHEMA = """
CREATE TABLE IF NOT EXISTS context_graph_checkpoints (
thread_id TEXT NOT NULL,
checkpoint_id TEXT NOT NULL,
context_graph_json TEXT NOT NULL,
metadata_json TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
PRIMARY KEY (thread_id, checkpoint_id)
);

CREATE INDEX IF NOT EXISTS idx_cg_checkpoints_thread
ON context_graph_checkpoints(thread_id, created_at DESC);
"""

def __init__(
self,
db_path: Path,
auto_create_table: bool = True,
):
"""
Initialize SQLite checkpointer.

Args:
db_path: Path to sessions.db
auto_create_table: Create table if not exists
"""
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None

if auto_create_table:
self._ensure_table()

def _get_conn(self) -> sqlite3.Connection:
"""Get or create database connection."""
if self._conn is None:
self._conn = sqlite3.connect(str(self.db_path))
self._conn.row_factory = sqlite3.Row
return self._conn

def _ensure_table(self):
"""Create checkpoints table if not exists."""
conn = self._get_conn()
conn.executescript(self.SCHEMA)
conn.commit()
logger.debug("Ensured context_graph_checkpoints table exists")

def close(self):
"""Close database connection."""
if self._conn:
self._conn.close()
self._conn = None

def put(
self,
thread_id: str,
checkpoint_id: str,
context_graph: ContextGraph,
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Store a context graph checkpoint."""
conn = self._get_conn()

# Serialize context graph
graph_json = json.dumps(context_graph.to_dict(), ensure_ascii=False)
metadata_json = json.dumps(metadata, ensure_ascii=False) if metadata else None

try:
conn.execute("""
INSERT OR REPLACE INTO context_graph_checkpoints
(thread_id, checkpoint_id, context_graph_json, metadata_json, created_at)
VALUES (?, ?, ?, ?, ?)
""", (
thread_id,
checkpoint_id,
graph_json,
metadata_json,
datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
))
conn.commit()
logger.info(f"Stored checkpoint: {thread_id}/{checkpoint_id}")
return checkpoint_id
except sqlite3.Error as e:
logger.error(f"Failed to store checkpoint: {e}")
raise

def get(
self,
thread_id: str,
checkpoint_id: Optional[str] = None,
) -> Optional[ContextGraph]:
"""Retrieve a context graph checkpoint."""
conn = self._get_conn()

if checkpoint_id:
cursor = conn.execute("""
SELECT context_graph_json FROM context_graph_checkpoints
WHERE thread_id = ? AND checkpoint_id = ?
""", (thread_id, checkpoint_id))
else:
# Get latest checkpoint
cursor = conn.execute("""
SELECT context_graph_json FROM context_graph_checkpoints
WHERE thread_id = ?
ORDER BY created_at DESC
LIMIT 1
""", (thread_id,))

row = cursor.fetchone()
if not row:
return None

# Deserialize context graph
return self._deserialize_graph(row[0])

def get_with_metadata(
self,
thread_id: str,
checkpoint_id: Optional[str] = None,
) -> Optional[Tuple[ContextGraph, Dict[str, Any]]]:
"""Retrieve checkpoint with metadata."""
conn = self._get_conn()

if checkpoint_id:
cursor = conn.execute("""
SELECT context_graph_json, metadata_json, created_at
FROM context_graph_checkpoints
WHERE thread_id = ? AND checkpoint_id = ?
""", (thread_id, checkpoint_id))
else:
cursor = conn.execute("""
SELECT context_graph_json, metadata_json, created_at
FROM context_graph_checkpoints
WHERE thread_id = ?
ORDER BY created_at DESC
LIMIT 1
""", (thread_id,))

row = cursor.fetchone()
if not row:
return None

graph = self._deserialize_graph(row[0])
metadata = json.loads(row[1]) if row[1] else {}
metadata["created_at"] = row[2]

return graph, metadata

def list(
self,
thread_id: str,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""List checkpoints for a thread."""
conn = self._get_conn()

cursor = conn.execute("""
SELECT checkpoint_id, metadata_json, created_at
FROM context_graph_checkpoints
WHERE thread_id = ?
ORDER BY created_at DESC
LIMIT ?
""", (thread_id, limit))

checkpoints = []
for row in cursor:
metadata = json.loads(row[1]) if row[1] else {}
checkpoints.append({
"checkpoint_id": row[0],
"created_at": row[2],
**metadata,
})

return checkpoints

def list_threads(
self,
limit: int = 100,
) -> List[str]:
"""List all thread IDs with checkpoints."""
conn = self._get_conn()

cursor = conn.execute("""
SELECT DISTINCT thread_id
FROM context_graph_checkpoints
ORDER BY MAX(created_at) DESC
LIMIT ?
""", (limit,))

return [row[0] for row in cursor]

def delete(
self,
thread_id: str,
checkpoint_id: Optional[str] = None,
) -> int:
"""Delete checkpoints."""
conn = self._get_conn()

if checkpoint_id:
cursor = conn.execute("""
DELETE FROM context_graph_checkpoints
WHERE thread_id = ? AND checkpoint_id = ?
""", (thread_id, checkpoint_id))
else:
cursor = conn.execute("""
DELETE FROM context_graph_checkpoints
WHERE thread_id = ?
""", (thread_id,))

conn.commit()
deleted = cursor.rowcount
logger.info(f"Deleted {deleted} checkpoints for thread {thread_id}")
return deleted

def cleanup_old(
self,
max_age_hours: int = 24,
) -> int:
"""
Clean up old checkpoints.

Args:
max_age_hours: Delete checkpoints older than this

Returns:
Number of checkpoints deleted
"""
conn = self._get_conn()

cursor = conn.execute("""
DELETE FROM context_graph_checkpoints
WHERE created_at < datetime('now', ?)
""", (f"-{max_age_hours} hours",))

conn.commit()
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"Cleaned up {deleted} old checkpoints")
return deleted

def _deserialize_graph(self, graph_json: str) -> ContextGraph:
"""Deserialize JSON to ContextGraph."""
data = json.loads(graph_json)

graph = ContextGraph(
id=data.get("id", ""),
task_description=data.get("task_description", ""),
seed_nodes=data.get("seed_nodes", []),
seed_strategy=data.get("seed_strategy", "anchor"),
token_budget=data.get("token_budget", 4000),
max_depth=data.get("max_depth", 3),
max_nodes=data.get("max_nodes", 128),
)

# Deserialize nodes
for node_id, node_data in data.get("nodes", {}).items():
graph.nodes[node_id] = GraphNode(
id=node_data.get("id", node_id),
node_type=node_data.get("node_type", "unknown"),
name=node_data.get("name", ""),
subtype=node_data.get("subtype"),
properties=node_data.get("properties"),
relevance_score=node_data.get("relevance_score", 1.0),
depth=node_data.get("depth", 0),
is_seed=node_data.get("is_seed", False),
token_estimate=node_data.get("token_estimate", 0),
)

# Deserialize edges
for edge_data in data.get("edges", []):
graph.edges.append(GraphEdge(
from_node=edge_data.get("from_node", ""),
to_node=edge_data.get("to_node", ""),
edge_type=edge_data.get("edge_type", ""),
properties=edge_data.get("properties"),
weight=edge_data.get("weight", 1.0),
))

return graph

=============================================================================

Factory Function

=============================================================================

def get_checkpointer( db_path: Optional[Path] = None, backend: str = "sqlite", ) -> ContextGraphCheckpointer: """ Factory function to get appropriate checkpointer.

Args:
db_path: Path to database (defaults to sessions.db)
backend: Backend type ('sqlite' only for now)

Returns:
ContextGraphCheckpointer instance
"""
if db_path is None:
try:
from scripts.core.paths import get_sessions_db_path
db_path = get_sessions_db_path()
except ImportError:
db_path = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db"

if backend == "sqlite":
return SQLiteContextGraphCheckpointer(db_path)
else:
raise ValueError(f"Unknown backend: {backend}")