Skip to main content

scripts-cusf-formatter

#!/usr/bin/env python3 """

title: "CUSF Formatter" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Normalize session data to CODITECT Universal Session Format" keywords: ['cusf', 'formatter', 'normalize', 'canonical', 'session'] tokens: ~250 created: 2026-01-28 updated: 2026-01-28 script_name: "cusf_formatter.py" language: python executable: true usage: "from scripts.core.cusf_formatter import CUSFFormatter" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

CUSF Formatter for CODITECT /sx command.

Normalizes all LLM session formats to CODITECT Universal Session Format (CUSF). Ensures consistent output regardless of source LLM (Claude, Codex, Gemini).

Track: J.13 (Memory - Generic Session Export) Task: J.13.1.5 """

from future import annotations

import json from datetime import datetime, timezone from typing import Any, Dict, Iterator, List, Optional

from scripts.core.session_extractor import ( ExtractedEntry, ExtractionResult, SessionMetadata )

CUSF_VERSION = "1.0.0" EXPORTER_ID = "coditect-sx/1.0.0"

class CUSFFormatter: """ Formats extracted session data to CUSF canonical format.

CUSF (CODITECT Universal Session Format) is a JSONL format with:
- _meta header with format version and export metadata
- session_start entry with session metadata
- message entries for user/assistant messages
- tool_use entries for tool invocations
- tool_result entries for tool outputs
- session_end entry with summary statistics
"""

def __init__(
self,
include_raw: bool = False,
include_thinking: bool = True,
max_tool_result_length: int = 50000
):
"""
Initialize CUSFFormatter.

Args:
include_raw: Include raw source data in entries (for debugging)
include_thinking: Include extended thinking content
max_tool_result_length: Max length for tool results before truncation
"""
self.include_raw = include_raw
self.include_thinking = include_thinking
self.max_tool_result_length = max_tool_result_length

def format(self, result: ExtractionResult) -> Iterator[Dict[str, Any]]:
"""
Format extraction result to CUSF entries.

Args:
result: ExtractionResult from a SessionExtractor

Yields:
CUSF-formatted dictionary entries (for JSONL output)
"""
# 1. Meta header
yield self._create_meta()

# 2. Session start
yield self._format_session_start(result.metadata)

# 3. All entries in order
for entry in result.iter_entries():
formatted = self._format_entry(entry)
if formatted:
yield formatted

# 4. Session end
yield self._format_session_end(result.metadata)

def format_single(self, entry: ExtractedEntry) -> Optional[Dict[str, Any]]:
"""Format a single entry to CUSF format."""
return self._format_entry(entry)

def _create_meta(self) -> Dict[str, Any]:
"""Create CUSF meta header."""
return {
"_meta": {
"format": "cusf",
"version": CUSF_VERSION,
"exported_at": datetime.now(timezone.utc).isoformat(),
"exporter": EXPORTER_ID
}
}

def _format_session_start(self, metadata: SessionMetadata) -> Dict[str, Any]:
"""Format session_start entry."""
entry: Dict[str, Any] = {
"type": "session_start",
"session_id": metadata.session_id,
"llm_source": metadata.llm_source,
"started_at": (
metadata.started_at.isoformat()
if metadata.started_at
else datetime.now(timezone.utc).isoformat()
)
}

# Optional fields
if metadata.llm_model:
entry["llm_model"] = metadata.llm_model
if metadata.project_path:
entry["project_path"] = metadata.project_path
if metadata.git_branch:
entry["git_branch"] = metadata.git_branch
if metadata.cwd:
entry["cwd"] = metadata.cwd
if metadata.machine_id:
entry["machine_id"] = metadata.machine_id
if metadata.tenant_id:
entry["tenant_id"] = metadata.tenant_id
if metadata.user_id:
entry["user_id"] = metadata.user_id

return entry

def _format_session_end(self, metadata: SessionMetadata) -> Dict[str, Any]:
"""Format session_end entry."""
return {
"type": "session_end",
"session_id": metadata.session_id,
"ended_at": (
metadata.ended_at.isoformat()
if metadata.ended_at
else datetime.now(timezone.utc).isoformat()
),
"total_messages": metadata.total_messages,
"total_tokens": {
"input": metadata.total_tokens_input,
"output": metadata.total_tokens_output
},
"end_reason": "export"
}

def _format_entry(self, entry: ExtractedEntry) -> Optional[Dict[str, Any]]:
"""Format a single extracted entry to CUSF format."""
if entry.type == "session_start" or entry.type == "session_end":
# These are handled separately
return None

result = entry.to_dict()

# Apply formatting rules based on type
if entry.type == "message":
result = self._format_message(result)
elif entry.type == "tool_use":
result = self._format_tool_use(result)
elif entry.type == "tool_result":
result = self._format_tool_result(result)

# Optionally include raw data
if self.include_raw and entry.raw:
result["_raw"] = entry.raw

return result

def _format_message(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Apply message-specific formatting."""
# Remove thinking if not included
if not self.include_thinking and "thinking" in data:
del data["thinking"]

# Ensure required fields
if "message_id" not in data:
data["message_id"] = f"msg_{hash(data.get('content', ''))}"

return data

def _format_tool_use(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Apply tool_use-specific formatting."""
# Ensure tool_input is serializable
if "tool_input" in data:
try:
json.dumps(data["tool_input"])
except (TypeError, ValueError):
data["tool_input"] = str(data["tool_input"])

return data

def _format_tool_result(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Apply tool_result-specific formatting with truncation."""
result = data.get("result", "")

if isinstance(result, str) and len(result) > self.max_tool_result_length:
data["result"] = result[:self.max_tool_result_length]
data["truncated"] = True

return data

def to_jsonl(self, result: ExtractionResult) -> str:
"""
Format extraction result to complete JSONL string.

Args:
result: ExtractionResult from a SessionExtractor

Returns:
JSONL-formatted string
"""
lines = []
for entry in self.format(result):
lines.append(json.dumps(entry, ensure_ascii=False))
return '\n'.join(lines)

def to_json(self, result: ExtractionResult) -> str:
"""
Format extraction result to JSON array string.

Args:
result: ExtractionResult from a SessionExtractor

Returns:
JSON array string
"""
entries = list(self.format(result))
return json.dumps(entries, indent=2, ensure_ascii=False)

def format_to_cusf(result: ExtractionResult, **kwargs) -> Iterator[Dict[str, Any]]: """Convenience function to format to CUSF.""" formatter = CUSFFormatter(**kwargs) yield from formatter.format(result)

def cusf_to_jsonl(result: ExtractionResult, **kwargs) -> str: """Convenience function to format to JSONL string.""" formatter = CUSFFormatter(**kwargs) return formatter.to_jsonl(result)

if name == "main": # Demo/test print("CUSFFormatter - CODITECT Universal Session Format") print(f"Version: {CUSF_VERSION}") print(f"Exporter: {EXPORTER_ID}")

# Create sample meta entry
formatter = CUSFFormatter()
meta = formatter._create_meta()
print("\nSample meta entry:")
print(json.dumps(meta, indent=2))