#!/usr/bin/env python3 """ Session Exporter - Export Claude Code sessions to /cx-compatible format.
Creates export files that match Claude Code's /export format:
- ASCII banner with version
- User markers (> )
- Assistant markers (⏺ )
- Standard naming: YYYY-MM-DD-
.txt
Usage:
python3 session-exporter.py # Interactive picker
python3 session-exporter.py
import argparse import json import os import random import string import sys from datetime import datetime from pathlib import Path
ADR-114: Use centralized path discovery
_script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))
try: from scripts.core.paths import get_context_storage_dir CONTEXT_STORAGE = get_context_storage_dir() except ImportError: _new_location = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" CONTEXT_STORAGE = _new_location if _new_location.exists() else Path.home() / ".coditect" / "context-storage"
def get_sessions_dir() -> Path: """Get the Claude Code sessions directory for the current project.""" # Try to find project-specific sessions cwd = os.getcwd() project_path = cwd.replace("/", "-").replace(" ", "-") if project_path.startswith("-"): project_path = project_path[1:]
sessions_base = Path.home() / ".claude" / "projects"
project_sessions = sessions_base / f"-{project_path}"
if project_sessions.exists():
return project_sessions
# Fallback to searching for matching directories
for d in sessions_base.iterdir():
if d.is_dir() and cwd.replace("/", "-") in d.name:
return d
return sessions_base
def list_sessions(sessions_dir: Path, limit: int = 10) -> list: """List recent sessions with metadata.""" sessions = []
for f in sessions_dir.glob("*.jsonl"):
if f.name.startswith("agent-"):
continue # Skip agent sessions
stat = f.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime)
# Get first assistant message for topic hint
topic = get_session_topic(f)
sessions.append({
"id": f.stem,
"path": f,
"size": size,
"mtime": mtime,
"topic": topic[:50] if topic else "Unknown"
})
# Sort by modification time, newest first
sessions.sort(key=lambda x: x["mtime"], reverse=True)
return sessions[:limit]
def get_session_topic(session_path: Path) -> str: """Extract topic from first meaningful assistant message.""" try: with open(session_path, 'r') as f: for line in f: try: obj = json.loads(line) if obj.get('type') == 'assistant': content = obj.get('message', {}).get('content', []) if isinstance(content, list): for item in content: if isinstance(item, dict) and item.get('type') == 'text': text = item.get('text', '') if len(text) > 20 and not text.startswith('I'): return text[:100] except: pass except: pass return ""
def get_session_metadata(session_path: Path) -> dict: """Extract version and model from session.""" version = "2.1.4" model = "claude-opus-4-5-20251101"
try:
with open(session_path, 'r') as f:
for line in f:
try:
obj = json.loads(line)
if obj.get('version'):
version = obj.get('version')
if obj.get('message', {}).get('model'):
model = obj.get('message', {}).get('model')
break
except:
pass
except:
pass
return {"version": version, "model": model}
def extract_messages(session_path: Path) -> list: """Extract all user and assistant messages from session.""" messages = []
with open(session_path, 'r') as f:
for line in f:
try:
obj = json.loads(line)
msg_type = obj.get('type')
if msg_type == 'user':
content = obj.get('message', {}).get('content', '')
text = ''
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get('type') == 'text':
text += item.get('text', '')
elif isinstance(content, str):
text = content
if text:
messages.append(('user', text))
elif msg_type == 'assistant':
content = obj.get('message', {}).get('content', [])
text = ''
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get('type') == 'text':
text += item.get('text', '')
if text:
messages.append(('assistant', text))
except:
pass
return messages
def generate_filename() -> str: """Generate filename matching Claude Code /export format.
Uses .txt extension since write_export() creates human-readable text format,
NOT JSONL format. The J.10 change (2026-01-11) incorrectly used .jsonl
extension which broke /cx processing - fixed in v2.1 (2026-01-20).
"""
date_str = datetime.now().strftime("%Y-%m-%d")
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16))
return f"{date_str}-{random_str}.txt"
def write_export(messages: list, metadata: dict, output_path: Path) -> dict: """Write messages to /cx-compatible export format.""" version = metadata.get('version', '2.1.4') model = metadata.get('model', 'claude-opus-4-5-20251101')
with open(output_path, 'w') as f:
# Header matching Claude Code export format
f.write(f"""
▐▛███▜▌ Claude Code v{version} Model: {model}
─────────────────────────────────────────────────────────────────
""")
for role, text in messages:
if role == 'user':
f.write(f"> {text}\n\n")
else:
f.write(f"⏺ {text}\n\n")
f.write("─" * 60 + "\n\n")
# Verify format
with open(output_path, 'r') as f:
content = f.read()
user_count = sum(1 for r, _ in messages if r == 'user')
asst_count = sum(1 for r, _ in messages if r == 'assistant')
return {
"path": output_path,
"size": len(content),
"messages": len(messages),
"user_count": user_count,
"assistant_count": asst_count,
"has_banner": '▐▛███▜▌' in content,
"has_version": 'Claude Code v' in content,
"has_user_marker": '> ' in content,
"has_assistant_marker": '⏺' in content
}
def interactive_picker(sessions: list) -> list: """Show interactive session picker and return selected sessions.""" if not sessions: print("No sessions found.") return []
print("\n┌─────────────────────────────────────────────────────────────┐")
print("│ Select Session(s) to Export │")
print("├─────────────────────────────────────────────────────────────┤")
for i, s in enumerate(sessions, 1):
mtime_str = s['mtime'].strftime("%Y-%m-%d %H:%M")
size_str = format_size(s['size'])
topic = s['topic'][:35] if s['topic'] else "Unknown"
current = " (current)" if i == 1 else ""
print(f"│ [{i}] {mtime_str} │ {size_str:>6} │ {topic}{current:<10}│")
print("├─────────────────────────────────────────────────────────────┤")
print("│ Enter selection (1-N, 'all', or 'q' to quit): │")
print("└─────────────────────────────────────────────────────────────┘")
try:
choice = input("\nSelection: ").strip().lower()
except (EOFError, KeyboardInterrupt):
return []
if choice == 'q':
return []
elif choice == 'all':
return sessions
else:
try:
indices = [int(x.strip()) for x in choice.split(',')]
return [sessions[i-1] for i in indices if 1 <= i <= len(sessions)]
except:
try:
idx = int(choice)
if 1 <= idx <= len(sessions):
return [sessions[idx-1]]
except:
pass
return []
def format_size(size: int) -> str: """Format file size for display.""" if size > 1024 * 1024: return f"{size / (1024 * 1024):.1f}M" elif size > 1024: return f"{size / 1024:.1f}K" else: return f"{size}B"
def main(): # Default export location: exports-pending (where /cx finds unprocessed exports) # ADR-114: Use centralized path discovery default_output = CONTEXT_STORAGE / "exports-pending" default_output.mkdir(parents=True, exist_ok=True)
parser = argparse.ArgumentParser(
description="Export Claude Code sessions to /cx-compatible format"
)
parser.add_argument("session_id", nargs="?", help="Session UUID to export")
parser.add_argument("--current", action="store_true", help="Export current session")
parser.add_argument("--last", type=int, help="Export last N sessions")
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
parser.add_argument("--output", "-o", type=Path, default=default_output,
help="Output directory (default: ~/PROJECTS/.coditect-data/context-storage/exports-pending)")
parser.add_argument("--list", "-l", action="store_true", help="List available sessions")
args = parser.parse_args()
sessions_dir = get_sessions_dir()
if not sessions_dir.exists():
print(f"❌ Sessions directory not found: {sessions_dir}")
sys.exit(1)
# List mode
if args.list:
sessions = list_sessions(sessions_dir, limit=20)
print(f"\nRecent sessions in {sessions_dir}:\n")
for s in sessions:
mtime_str = s['mtime'].strftime("%Y-%m-%d %H:%M")
size_str = format_size(s['size'])
print(f" {s['id'][:36]} {mtime_str} {size_str:>6} {s['topic'][:40]}")
return
# Determine which sessions to export
sessions_to_export = []
if args.session_id:
# Specific session
session_path = sessions_dir / f"{args.session_id}.jsonl"
if not session_path.exists():
print(f"❌ Session not found: {args.session_id}")
sys.exit(1)
sessions_to_export = [{"id": args.session_id, "path": session_path}]
elif args.current:
# Most recent session
sessions = list_sessions(sessions_dir, limit=1)
if sessions:
sessions_to_export = sessions
else:
print("❌ No sessions found")
sys.exit(1)
elif args.last:
# Last N sessions
sessions = list_sessions(sessions_dir, limit=args.last)
sessions_to_export = sessions
else:
# Interactive picker
sessions = list_sessions(sessions_dir, limit=10)
sessions_to_export = interactive_picker(sessions)
if not sessions_to_export:
print("No sessions selected.")
return
# Export each session
results = []
for session in sessions_to_export:
session_path = session['path']
print(f"\n📦 Exporting: {session['id'][:36]}...")
# Extract messages
messages = extract_messages(session_path)
if not messages:
print(f" ⚠️ No messages found in session")
continue
# Get metadata
metadata = get_session_metadata(session_path)
if args.dry_run:
user_count = sum(1 for r, _ in messages if r == 'user')
asst_count = sum(1 for r, _ in messages if r == 'assistant')
print(f" [DRY RUN] Would export {len(messages)} messages ({user_count} user, {asst_count} assistant)")
continue
# Generate output path
output_path = args.output / generate_filename()
# Write export
result = write_export(messages, metadata, output_path)
results.append(result)
# Report
print(f" ✅ Exported to: {result['path'].name}")
print(f" Messages: {result['messages']} ({result['user_count']} user, {result['assistant_count']} assistant)")
print(f" Size: {format_size(result['size'])}")
# Verify format
verifications = [
("Banner", result['has_banner']),
("Version", result['has_version']),
("User markers", result['has_user_marker']),
("Assistant markers", result['has_assistant_marker'])
]
all_ok = all(v[1] for v in verifications)
if all_ok:
print(" Format: ✓ /cx-compatible")
else:
print(" Format verification:")
for name, ok in verifications:
print(f" {'✓' if ok else '✗'} {name}")
if results:
print(f"\n✅ Exported {len(results)} session(s). Ready for /cx processing.")
if name == "main": main()