Skip to main content

scripts-output-writer

#!/usr/bin/env python3 """

title: "Output Writer" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Write CUSF data to various output formats" keywords: ['output', 'writer', 'jsonl', 'json', 'sqlite', 'export'] tokens: ~300 created: 2026-01-28 updated: 2026-01-28 script_name: "output_writer.py" language: python executable: true usage: "from scripts.core.output_writer import OutputWriter" python_version: "3.10+" dependencies: [] modifies_files: true network_access: false requires_auth: false

Output Writer for CODITECT /sx command.

Writes CUSF-formatted session data to various output formats:

  • JSONL (default, streaming-friendly)
  • JSON (single file, human-readable)
  • SQLite (portable database for reconstruction)

Track: J.13 (Memory - Generic Session Export) Task: J.13.1.6 """

from future import annotations

import json import sqlite3 from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterator, Optional, Union import gzip

class OutputWriter: """ Writes CUSF data to various output formats.

Supported formats:
- jsonl: Line-delimited JSON (default, streaming-friendly)
- json: JSON array (human-readable)
- sqlite: SQLite database (portable, queryable)
"""

def __init__(
self,
output_path: Union[str, Path],
format: str = "jsonl",
compress: bool = False,
append: bool = False
):
"""
Initialize OutputWriter.

Args:
output_path: Output file path
format: Output format (jsonl, json, sqlite)
compress: Apply gzip compression (for jsonl/json)
append: Append to existing file (jsonl only)
"""
self.output_path = Path(output_path)
self.format = format.lower()
self.compress = compress
self.append = append

self._entries: list = [] # Buffer for JSON format
self._file_handle = None
self._db_conn: Optional[sqlite3.Connection] = None

if self.format not in ("jsonl", "json", "sqlite"):
raise ValueError(f"Unsupported format: {self.format}")

def __enter__(self) -> "OutputWriter":
"""Context manager entry."""
self.open()
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit."""
self.close()

def open(self) -> None:
"""Open output for writing."""
# Ensure parent directory exists
self.output_path.parent.mkdir(parents=True, exist_ok=True)

if self.format == "jsonl":
mode = "ab" if self.append else "wb"
if self.compress:
self._file_handle = gzip.open(self.output_path, mode)
else:
self._file_handle = open(self.output_path, mode)

elif self.format == "json":
self._entries = []

elif self.format == "sqlite":
self._db_conn = sqlite3.connect(self.output_path)
self._init_sqlite_schema()

def close(self) -> None:
"""Close output and finalize."""
if self.format == "jsonl" and self._file_handle:
self._file_handle.close()
self._file_handle = None

elif self.format == "json":
# Write all entries as JSON array
content = json.dumps(self._entries, indent=2, ensure_ascii=False)
if self.compress:
with gzip.open(self.output_path, "wt", encoding="utf-8") as f:
f.write(content)
else:
with open(self.output_path, "w", encoding="utf-8") as f:
f.write(content)
self._entries = []

elif self.format == "sqlite" and self._db_conn:
self._db_conn.commit()
self._db_conn.close()
self._db_conn = None

def write(self, entry: Dict[str, Any]) -> None:
"""
Write a single CUSF entry.

Args:
entry: CUSF-formatted dictionary
"""
if self.format == "jsonl":
if self._file_handle is None:
raise RuntimeError("OutputWriter not opened")
line = json.dumps(entry, ensure_ascii=False) + "\n"
self._file_handle.write(line.encode("utf-8"))

elif self.format == "json":
self._entries.append(entry)

elif self.format == "sqlite":
self._write_sqlite(entry)

def write_all(self, entries: Iterator[Dict[str, Any]]) -> int:
"""
Write all entries from iterator.

Args:
entries: Iterator of CUSF-formatted dictionaries

Returns:
Number of entries written
"""
count = 0
for entry in entries:
self.write(entry)
count += 1
return count

def _init_sqlite_schema(self) -> None:
"""Initialize SQLite schema for CUSF data."""
if self._db_conn is None:
return

cursor = self._db_conn.cursor()

# Sessions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
llm_source TEXT NOT NULL,
llm_model TEXT,
started_at TEXT,
ended_at TEXT,
project_path TEXT,
git_branch TEXT,
cwd TEXT,
machine_id TEXT,
tenant_id TEXT,
user_id TEXT,
total_messages INTEGER,
total_tokens_input INTEGER,
total_tokens_output INTEGER,
end_reason TEXT
)
""")

# Messages table
cursor.execute("""
CREATE TABLE IF NOT EXISTS messages (
message_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT,
timestamp TEXT,
parent_id TEXT,
model TEXT,
tokens_input INTEGER,
tokens_output INTEGER,
thinking TEXT,
stop_reason TEXT,
raw_json TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(session_id)
)
""")

# Tool calls table
cursor.execute("""
CREATE TABLE IF NOT EXISTS tool_calls (
tool_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
tool_name TEXT NOT NULL,
tool_input TEXT,
timestamp TEXT,
parent_id TEXT,
result TEXT,
is_error INTEGER DEFAULT 0,
error_message TEXT,
truncated INTEGER DEFAULT 0,
result_timestamp TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(session_id)
)
""")

# Export metadata table
cursor.execute("""
CREATE TABLE IF NOT EXISTS export_meta (
id INTEGER PRIMARY KEY,
format TEXT,
version TEXT,
exported_at TEXT,
exporter TEXT
)
""")

# Indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_session
ON messages(session_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_timestamp
ON messages(timestamp)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_tool_calls_session
ON tool_calls(session_id)
""")

self._db_conn.commit()

def _write_sqlite(self, entry: Dict[str, Any]) -> None:
"""Write entry to SQLite database."""
if self._db_conn is None:
return

cursor = self._db_conn.cursor()

# Handle meta entry
if "_meta" in entry:
meta = entry["_meta"]
cursor.execute("""
INSERT OR REPLACE INTO export_meta
(id, format, version, exported_at, exporter)
VALUES (1, ?, ?, ?, ?)
""", (
meta.get("format"),
meta.get("version"),
meta.get("exported_at"),
meta.get("exporter")
))
return

entry_type = entry.get("type")

if entry_type == "session_start":
cursor.execute("""
INSERT OR REPLACE INTO sessions
(session_id, llm_source, llm_model, started_at,
project_path, git_branch, cwd, machine_id, tenant_id, user_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
entry.get("session_id"),
entry.get("llm_source"),
entry.get("llm_model"),
entry.get("started_at"),
entry.get("project_path"),
entry.get("git_branch"),
entry.get("cwd"),
entry.get("machine_id"),
entry.get("tenant_id"),
entry.get("user_id")
))

elif entry_type == "session_end":
tokens = entry.get("total_tokens", {})
cursor.execute("""
UPDATE sessions SET
ended_at = ?,
total_messages = ?,
total_tokens_input = ?,
total_tokens_output = ?,
end_reason = ?
WHERE session_id = ?
""", (
entry.get("ended_at"),
entry.get("total_messages"),
tokens.get("input"),
tokens.get("output"),
entry.get("end_reason"),
entry.get("session_id")
))

elif entry_type == "message":
usage = entry.get("usage", {})
cursor.execute("""
INSERT OR REPLACE INTO messages
(message_id, session_id, role, content, timestamp,
parent_id, model, tokens_input, tokens_output,
thinking, stop_reason, raw_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
entry.get("message_id"),
self._current_session_id,
entry.get("role"),
entry.get("content"),
entry.get("timestamp"),
entry.get("parent_id"),
entry.get("model"),
usage.get("input"),
usage.get("output"),
entry.get("thinking"),
entry.get("stop_reason"),
json.dumps(entry)
))

elif entry_type == "tool_use":
cursor.execute("""
INSERT OR REPLACE INTO tool_calls
(tool_id, session_id, tool_name, tool_input,
timestamp, parent_id)
VALUES (?, ?, ?, ?, ?, ?)
""", (
entry.get("tool_id"),
self._current_session_id,
entry.get("tool_name"),
json.dumps(entry.get("tool_input", {})),
entry.get("timestamp"),
entry.get("parent_id")
))

elif entry_type == "tool_result":
cursor.execute("""
UPDATE tool_calls SET
result = ?,
is_error = ?,
error_message = ?,
truncated = ?,
result_timestamp = ?
WHERE tool_id = ?
""", (
entry.get("result"),
1 if entry.get("is_error") else 0,
entry.get("error_message"),
1 if entry.get("truncated") else 0,
entry.get("timestamp"),
entry.get("tool_id")
))

@property
def _current_session_id(self) -> Optional[str]:
"""Get current session ID from recent entries."""
if self.format == "sqlite" and self._db_conn:
cursor = self._db_conn.cursor()
cursor.execute("SELECT session_id FROM sessions ORDER BY started_at DESC LIMIT 1")
row = cursor.fetchone()
return row[0] if row else None
return None

def write_cusf( entries: Iterator[Dict[str, Any]], output_path: Union[str, Path], format: str = "jsonl", compress: bool = False ) -> int: """ Convenience function to write CUSF entries to file.

Args:
entries: Iterator of CUSF-formatted dictionaries
output_path: Output file path
format: Output format (jsonl, json, sqlite)
compress: Apply compression

Returns:
Number of entries written
"""
with OutputWriter(output_path, format=format, compress=compress) as writer:
return writer.write_all(entries)

if name == "main": import argparse

parser = argparse.ArgumentParser(description="Output Writer Test")
parser.add_argument("--format", choices=["jsonl", "json", "sqlite"], default="jsonl")
parser.add_argument("--output", default="/tmp/test-output")
args = parser.parse_args()

# Create test data
test_entries = [
{"_meta": {"format": "cusf", "version": "1.0.0", "exported_at": datetime.now().isoformat(), "exporter": "test"}},
{"type": "session_start", "session_id": "test-123", "llm_source": "claude", "started_at": datetime.now().isoformat()},
{"type": "message", "role": "user", "content": "Hello", "timestamp": datetime.now().isoformat(), "message_id": "msg-1"},
{"type": "message", "role": "assistant", "content": "Hi there!", "timestamp": datetime.now().isoformat(), "message_id": "msg-2"},
{"type": "session_end", "session_id": "test-123", "ended_at": datetime.now().isoformat(), "total_messages": 2, "total_tokens": {"input": 10, "output": 20}}
]

output_path = f"{args.output}.{args.format}"
count = write_cusf(iter(test_entries), output_path, format=args.format)
print(f"Wrote {count} entries to {output_path}")