Skip to main content

scripts-session-import

#!/usr/bin/env python3 """

title: "Session Import" component_type: script version: "1.1.0" audience: contributor status: stable summary: "Import CUSF session data to sessions.db (ADR-118 Tier 3) for reconstruction" keywords: ['import', 'session', 'cusf', 'reconstruction', 'database', 'adr-118'] tokens: ~400 created: 2026-01-28 updated: 2026-01-28 script_name: "session-import.py" language: python executable: true usage: "python3 scripts/session-import.py [options]" python_version: "3.10+" dependencies: [] modifies_files: true network_access: false requires_auth: false

Session Import - CUSF to Context Database

Imports CUSF-formatted session exports into the CODITECT context database. Enables full reconstruction of session history from exports.

Features:

  • Import from JSONL, JSON, or SQLite CUSF files
  • Deduplication of existing messages
  • Validation against CUSF schema
  • Merge with existing sessions

Usage: python3 session-import.py export.jsonl python3 session-import.py sessions.db --validate python3 session-import.py --merge backup.jsonl

Track: J.13 (Memory - Generic Session Export) Task: J.13.3.3 """

from future import annotations

import argparse import json import sqlite3 import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Tuple import hashlib

Add parent paths for imports

_script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root)) if str(_script_dir) not in sys.path: sys.path.insert(0, str(_script_dir))

def get_sessions_db_path() -> Path: """Get path to sessions database (ADR-118 Tier 3).

Session import data (messages, sessions, tool_analytics) is Tier 3
regenerable data that belongs in sessions.db.
"""
# Try paths module first
try:
from core.paths import get_sessions_db_path as _get_sessions_db
return _get_sessions_db()
except ImportError:
pass

# ADR-114 standard locations
candidates = [
Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db",
Path.home() / ".coditect-data" / "context-storage" / "sessions.db",
]

for candidate in candidates:
if candidate.parent.exists():
return candidate

# Default to new ADR-114 location
return candidates[0]

Legacy alias for backward compatibility

def get_context_db_path() -> Path: """DEPRECATED: Use get_sessions_db_path() instead.""" return get_sessions_db_path()

def iter_cusf_jsonl(path: Path) -> Iterator[Dict[str, Any]]: """Iterate over CUSF JSONL file.""" with open(path, 'r', encoding='utf-8') as f: for line in f: if line.strip(): yield json.loads(line)

def iter_cusf_json(path: Path) -> Iterator[Dict[str, Any]]: """Iterate over CUSF JSON file.""" with open(path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, list): yield from data

def iter_cusf_sqlite(path: Path) -> Iterator[Dict[str, Any]]: """Iterate over CUSF SQLite file.""" conn = sqlite3.connect(path) conn.row_factory = sqlite3.Row

# Get export metadata
cursor = conn.cursor()
cursor.execute("SELECT * FROM export_meta WHERE id = 1")
meta_row = cursor.fetchone()
if meta_row:
yield {"_meta": dict(meta_row)}

# Get sessions
cursor.execute("SELECT * FROM sessions ORDER BY started_at")
for row in cursor:
session = dict(row)
session["type"] = "session_start"
yield session

# Get messages
cursor.execute("SELECT * FROM messages ORDER BY timestamp")
for row in cursor:
msg = dict(row)
msg["type"] = "message"
if msg.get("raw_json"):
try:
raw = json.loads(msg["raw_json"])
msg.update(raw)
except:
pass
yield msg

# Get tool calls
cursor.execute("SELECT * FROM tool_calls ORDER BY timestamp")
for row in cursor:
tc = dict(row)
if tc.get("result"):
# Has result - emit both tool_use and tool_result
yield {
"type": "tool_use",
"tool_id": tc["tool_id"],
"tool_name": tc["tool_name"],
"tool_input": json.loads(tc.get("tool_input", "{}")),
"timestamp": tc["timestamp"],
"parent_id": tc.get("parent_id")
}
yield {
"type": "tool_result",
"tool_id": tc["tool_id"],
"result": tc["result"],
"is_error": bool(tc.get("is_error")),
"timestamp": tc.get("result_timestamp", tc["timestamp"])
}
else:
yield {
"type": "tool_use",
"tool_id": tc["tool_id"],
"tool_name": tc["tool_name"],
"tool_input": json.loads(tc.get("tool_input", "{}")),
"timestamp": tc["timestamp"],
"parent_id": tc.get("parent_id")
}

conn.close()

def detect_format(path: Path) -> str: """Detect CUSF file format.""" suffix = path.suffix.lower() if suffix == ".jsonl": return "jsonl" elif suffix == ".json": return "json" elif suffix in (".db", ".sqlite", ".sqlite3"): return "sqlite" else: # Try to detect from content try: with open(path, 'rb') as f: header = f.read(16) if header.startswith(b'SQLite format'): return "sqlite" except: pass return "jsonl" # Default

def iter_cusf(path: Path) -> Iterator[Dict[str, Any]]: """Iterate over CUSF file (auto-detect format).""" fmt = detect_format(path) if fmt == "jsonl": yield from iter_cusf_jsonl(path) elif fmt == "json": yield from iter_cusf_json(path) elif fmt == "sqlite": yield from iter_cusf_sqlite(path)

def compute_message_hash(entry: Dict[str, Any]) -> str: """Compute deduplication hash for message.""" # Use content + timestamp + role for deduplication key_parts = [ entry.get("role", ""), entry.get("content", "")[:1000], entry.get("timestamp", "") ] return hashlib.md5("|".join(key_parts).encode()).hexdigest()

class CUSFImporter: """Imports CUSF data into sessions database (ADR-118 Tier 3)."""

def __init__(
self,
db_path: Optional[Path] = None,
deduplicate: bool = True,
validate: bool = False
):
self.db_path = db_path or get_sessions_db_path()
self.deduplicate = deduplicate
self.validate = validate
self._conn: Optional[sqlite3.Connection] = None
self._existing_hashes: set = set()

def __enter__(self) -> "CUSFImporter":
self.open()
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()

def open(self) -> None:
"""Open database connection."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(self.db_path)
self._init_schema()

if self.deduplicate:
self._load_existing_hashes()

def close(self) -> None:
"""Close database connection."""
if self._conn:
self._conn.commit()
self._conn.close()
self._conn = None

def _init_schema(self) -> None:
"""Ensure required tables exist."""
cursor = self._conn.cursor()

# Sessions table (if not exists)
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
llm_source TEXT,
llm_model TEXT,
started_at TEXT,
ended_at TEXT,
project_path TEXT,
cwd TEXT,
total_messages INTEGER,
total_tokens_input INTEGER,
total_tokens_output INTEGER,
imported_at TEXT
)
""")

# Messages table
cursor.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
message_id TEXT UNIQUE,
role TEXT,
content TEXT,
timestamp TEXT,
model TEXT,
tokens_input INTEGER,
tokens_output INTEGER,
hash TEXT,
imported_at TEXT
)
""")

# Tool calls (unified)
cursor.execute("""
CREATE TABLE IF NOT EXISTS tool_analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
tool_id TEXT,
tool_name TEXT,
tool_input TEXT,
result TEXT,
is_error INTEGER,
timestamp TEXT,
imported_at TEXT
)
""")

# Index for deduplication
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_hash ON messages(hash)
""")

self._conn.commit()

def _load_existing_hashes(self) -> None:
"""Load existing message hashes for deduplication."""
cursor = self._conn.cursor()
cursor.execute("SELECT hash FROM messages WHERE hash IS NOT NULL")
self._existing_hashes = {row[0] for row in cursor}

def import_file(self, path: Path) -> Tuple[int, int, List[str]]:
"""
Import CUSF file.

Returns:
Tuple of (imported_count, skipped_count, errors)
"""
imported = 0
skipped = 0
errors: List[str] = []
current_session_id: Optional[str] = None

for entry in iter_cusf(path):
try:
entry_type = entry.get("type")

if "_meta" in entry:
# Metadata entry - skip but validate
if self.validate:
version = entry["_meta"].get("version", "")
if not version.startswith("1."):
errors.append(f"Unsupported CUSF version: {version}")
continue

if entry_type == "session_start":
current_session_id = entry.get("session_id")
self._import_session(entry)
imported += 1

elif entry_type == "session_end":
self._update_session_end(entry)

elif entry_type == "message":
if self.deduplicate:
msg_hash = compute_message_hash(entry)
if msg_hash in self._existing_hashes:
skipped += 1
continue
self._existing_hashes.add(msg_hash)
entry["_hash"] = msg_hash

self._import_message(entry, current_session_id)
imported += 1

elif entry_type == "tool_use":
self._import_tool_use(entry, current_session_id)
imported += 1

elif entry_type == "tool_result":
self._import_tool_result(entry)

except Exception as e:
errors.append(f"Error importing entry: {e}")

self._conn.commit()
return imported, skipped, errors

def _import_session(self, entry: Dict[str, Any]) -> None:
"""Import session start entry."""
cursor = self._conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO sessions
(session_id, llm_source, llm_model, started_at, project_path, cwd, imported_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
entry.get("session_id"),
entry.get("llm_source"),
entry.get("llm_model"),
entry.get("started_at"),
entry.get("project_path"),
entry.get("cwd"),
datetime.now().isoformat()
))

def _update_session_end(self, entry: Dict[str, Any]) -> None:
"""Update session with end data."""
cursor = self._conn.cursor()
tokens = entry.get("total_tokens", {})
cursor.execute("""
UPDATE sessions SET
ended_at = ?,
total_messages = ?,
total_tokens_input = ?,
total_tokens_output = ?
WHERE session_id = ?
""", (
entry.get("ended_at"),
entry.get("total_messages"),
tokens.get("input"),
tokens.get("output"),
entry.get("session_id")
))

def _import_message(self, entry: Dict[str, Any], session_id: Optional[str]) -> None:
"""Import message entry."""
cursor = self._conn.cursor()
usage = entry.get("usage", {})
cursor.execute("""
INSERT OR IGNORE INTO messages
(session_id, message_id, role, content, timestamp, model,
tokens_input, tokens_output, hash, imported_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
session_id,
entry.get("message_id"),
entry.get("role"),
entry.get("content"),
entry.get("timestamp"),
entry.get("model"),
usage.get("input"),
usage.get("output"),
entry.get("_hash"),
datetime.now().isoformat()
))

def _import_tool_use(self, entry: Dict[str, Any], session_id: Optional[str]) -> None:
"""Import tool use entry."""
cursor = self._conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO tool_analytics
(session_id, tool_id, tool_name, tool_input, timestamp, imported_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
session_id,
entry.get("tool_id"),
entry.get("tool_name"),
json.dumps(entry.get("tool_input", {})),
entry.get("timestamp"),
datetime.now().isoformat()
))

def _import_tool_result(self, entry: Dict[str, Any]) -> None:
"""Import tool result entry."""
cursor = self._conn.cursor()
cursor.execute("""
UPDATE tool_analytics SET
result = ?,
is_error = ?
WHERE tool_id = ?
""", (
entry.get("result"),
1 if entry.get("is_error") else 0,
entry.get("tool_id")
))

def main() -> int: """Main entry point.""" parser = argparse.ArgumentParser( description="Import CUSF session data to context database" )

parser.add_argument(
"input",
type=Path,
help="CUSF file to import (JSONL, JSON, or SQLite)"
)
parser.add_argument(
"--db",
type=Path,
help="Target database path (default: ~/.coditect-data/context-storage/sessions.db - ADR-118)"
)
parser.add_argument(
"--no-dedupe",
action="store_true",
help="Disable deduplication"
)
parser.add_argument(
"--validate",
action="store_true",
help="Validate CUSF format"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview import without writing"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Verbose output"
)
parser.add_argument(
"--json-output",
action="store_true",
help="Output results as JSON"
)

args = parser.parse_args()

if not args.input.exists():
print(f"Error: File not found: {args.input}", file=sys.stderr)
return 1

# Dry run - just count entries
if args.dry_run:
entry_count = 0
session_count = 0
message_count = 0

for entry in iter_cusf(args.input):
entry_count += 1
if entry.get("type") == "session_start":
session_count += 1
elif entry.get("type") == "message":
message_count += 1

if args.json_output:
print(json.dumps({
"would_import": True,
"entries": entry_count,
"sessions": session_count,
"messages": message_count
}))
else:
print(f"Would import from {args.input}:")
print(f" Total entries: {entry_count}")
print(f" Sessions: {session_count}")
print(f" Messages: {message_count}")

return 0

# Perform import
with CUSFImporter(
db_path=args.db,
deduplicate=not args.no_dedupe,
validate=args.validate
) as importer:
imported, skipped, errors = importer.import_file(args.input)

if args.json_output:
print(json.dumps({
"success": len(errors) == 0,
"imported": imported,
"skipped": skipped,
"errors": errors
}))
else:
print(f"✓ Import complete")
print(f" Imported: {imported}")
print(f" Skipped (duplicates): {skipped}")
if errors:
print(f" Errors: {len(errors)}")
if args.verbose:
for err in errors[:10]:
print(f" - {err}")

return 0 if len(errors) == 0 else 1

if name == "main": sys.exit(main())