scripts-codex-extractor
#!/usr/bin/env python3 """​
title: "Codex Session Extractor" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Extract session data from OpenAI Codex CLI JSONL files" keywords: ['codex', 'extractor', 'session', 'jsonl', 'openai'] tokens: ~400 created: 2026-01-28 updated: 2026-01-28 script_name: "codex_extractor.py" language: python executable: true usage: "from scripts.extractors.codex_extractor import CodexExtractor" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false​
Codex Session Extractor for CODITECT /sx command.
Extracts session data from OpenAI Codex CLI JSONL files.
Session file locations:
- History: ~/.codex/history.jsonl (flat file with session_id)
- Sessions: ~/.codex/sessions/YYYY/MM/DD/*.jsonl (date hierarchy)
Track: J.13 (Memory - Generic Session Export) Task: J.13.2.2 """
from future import annotations
import json import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterator, List, Optional
Add parent paths for imports
_script_dir = Path(file).resolve().parent _scripts_dir = _script_dir.parent _coditect_root = _scripts_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root)) if str(_scripts_dir) not in sys.path: sys.path.insert(0, str(_scripts_dir))
from core.session_extractor import ( SessionExtractor, SessionMetadata, ExtractedEntry, ExtractionResult )
class CodexExtractor(SessionExtractor): """ Extracts session data from OpenAI Codex CLI sessions.
Supports:
- Flat history.jsonl with session_id grouping
- Date-organized session directories
Entry format (Codex CLI):
- session_id: UUID for session grouping
- role: user/assistant
- content: Message content
- timestamp: ISO timestamp
- model: Model identifier (e.g., gpt-4)
- tool_calls: Array of tool invocations
"""
@property
def llm_name(self) -> str:
return "codex"
def can_extract(self, source: Path) -> bool:
"""Check if this extractor can handle the source."""
if not source.exists():
return False
# JSONL file
if source.suffix == ".jsonl":
try:
with open(source, 'r', encoding='utf-8') as f:
first_line = f.readline()
if first_line:
data = json.loads(first_line)
# Codex CLI format: type + payload structure
if data.get("type") == "session_meta":
return True
# Legacy format: session_id + role at top level
if "session_id" in data and "role" in data:
return True
except (json.JSONDecodeError, IOError):
pass
# Check for codex path pattern
if ".codex" in str(source):
return True
return False
def extract(
self,
source: Path,
session_id: Optional[str] = None,
include_tool_results: bool = True,
include_thinking: bool = True,
**kwargs
) -> ExtractionResult:
"""
Extract session data from Codex source.
Args:
source: Path to history.jsonl or session file
session_id: Specific session ID to extract (for history.jsonl)
include_tool_results: Include tool result content
include_thinking: Include thinking content (Codex doesn't have this)
Returns:
ExtractionResult with all extracted data
"""
if not source.exists():
return ExtractionResult(
success=False,
metadata=SessionMetadata(
session_id=session_id or "unknown",
llm_source=self.llm_name
),
errors=[f"Source file not found: {source}"]
)
return self._extract_jsonl(source, session_id, include_tool_results)
def _extract_jsonl(
self,
source: Path,
session_id: Optional[str],
include_tool_results: bool
) -> ExtractionResult:
"""Extract from Codex JSONL file."""
entries: List[ExtractedEntry] = []
errors: List[str] = []
warnings: List[str] = []
# Detect format and collect entries
raw_entries: List[Dict[str, Any]] = []
is_cli_format = False
session_meta: Optional[Dict[str, Any]] = None
try:
with open(source, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError as e:
warnings.append(f"Line {line_num}: JSON parse error: {e}")
continue
# Detect format from first entry
if line_num == 1:
is_cli_format = data.get("type") == "session_meta"
raw_entries.append(data)
# Capture session_meta
if data.get("type") == "session_meta":
session_meta = data.get("payload", {})
except IOError as e:
errors.append(f"File read error: {e}")
return ExtractionResult(
success=False,
metadata=SessionMetadata(
session_id=session_id or "unknown",
llm_source=self.llm_name
),
errors=errors
)
# Process based on format
if is_cli_format:
return self._extract_cli_format(
raw_entries, session_meta, source, session_id, include_tool_results, warnings
)
else:
return self._extract_legacy_format(
raw_entries, source, session_id, include_tool_results, warnings
)
def _extract_cli_format(
self,
raw_entries: List[Dict[str, Any]],
session_meta: Optional[Dict[str, Any]],
source: Path,
session_id: Optional[str],
include_tool_results: bool,
warnings: List[str]
) -> ExtractionResult:
"""Extract from Codex CLI format (type + payload structure)."""
entries: List[ExtractedEntry] = []
# Get session ID from metadata
target_session = session_id or (session_meta.get("id") if session_meta else None) or source.stem
# Initialize metadata
metadata = SessionMetadata(
session_id=target_session,
llm_source=self.llm_name,
source_path=source
)
# Extract model from session_meta
if session_meta:
metadata.llm_model = session_meta.get("model") or session_meta.get("model_provider")
if session_meta.get("cwd"):
metadata.project_path = session_meta.get("cwd")
first_timestamp: Optional[datetime] = None
last_timestamp: Optional[datetime] = None
total_input = 0
total_output = 0
last_message_id: Optional[str] = None
for data in raw_entries:
entry_type = data.get("type", "")
timestamp = self._parse_timestamp(data)
if timestamp:
if first_timestamp is None:
first_timestamp = timestamp
last_timestamp = timestamp
payload = data.get("payload", {})
# Handle response_item (messages, function calls, reasoning, etc.)
if entry_type == "response_item":
msg_type = payload.get("type", "")
role = payload.get("role", "")
content_blocks = payload.get("content", [])
# Map roles: developer -> system
if role == "developer":
role = "system"
# Extract text content from content blocks
content_parts = []
for block in content_blocks if content_blocks else []:
if isinstance(block, dict):
# Check for text in various keys
text = block.get("text") or block.get("input_text") or block.get("output_text", "")
if text:
content_parts.append(text)
elif isinstance(block, str):
content_parts.append(block)
content = "\n".join(content_parts)
# Handle message type
if msg_type == "message" and role in ("user", "assistant", "system"):
message_id = payload.get("id") or self.generate_message_id()
# Extract usage if present
usage = payload.get("usage", {})
entry = self._create_message(
role=role,
content=content,
timestamp=timestamp or datetime.now(timezone.utc),
message_id=message_id,
parent_id=last_message_id,
model=payload.get("model") or metadata.llm_model,
usage={
"input": usage.get("input_tokens", 0),
"output": usage.get("output_tokens", 0)
} if usage else None,
raw=data
)
entries.append(entry)
last_message_id = message_id
total_input += usage.get("input_tokens", 0)
total_output += usage.get("output_tokens", 0)
# Handle function_call (tool use) - in response_item
elif msg_type == "function_call":
tool_id = payload.get("call_id") or self.generate_message_id()
tool_name = payload.get("name", "")
tool_args = self._parse_tool_arguments(payload.get("arguments", "{}"))
entries.append(self._create_tool_use(
tool_name=tool_name,
tool_input=tool_args,
tool_id=tool_id,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=last_message_id,
raw=data
))
# Handle function_call_output (tool result) - in response_item
elif msg_type == "function_call_output" and include_tool_results:
tool_id = payload.get("call_id") or ""
output = payload.get("output", "")
entries.append(self._create_tool_result(
tool_id=tool_id,
result=output if isinstance(output, str) else json.dumps(output),
timestamp=timestamp or datetime.now(timezone.utc),
is_error=payload.get("is_error", False) or "error" in str(output).lower()[:50],
raw=data
))
# Handle web_search_call
elif msg_type == "web_search_call":
tool_id = payload.get("call_id") or self.generate_message_id()
query = payload.get("query", "")
entries.append(self._create_tool_use(
tool_name="web_search",
tool_input={"query": query},
tool_id=tool_id,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=last_message_id,
raw=data
))
# Handle custom_tool_call
elif msg_type == "custom_tool_call":
tool_id = payload.get("call_id") or self.generate_message_id()
tool_name = payload.get("name", "custom_tool")
tool_args = self._parse_tool_arguments(payload.get("arguments", "{}"))
entries.append(self._create_tool_use(
tool_name=tool_name,
tool_input=tool_args,
tool_id=tool_id,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=last_message_id,
raw=data
))
# Handle custom_tool_call_output
elif msg_type == "custom_tool_call_output" and include_tool_results:
tool_id = payload.get("call_id") or ""
output = payload.get("output", "")
entries.append(self._create_tool_result(
tool_id=tool_id,
result=output if isinstance(output, str) else json.dumps(output),
timestamp=timestamp or datetime.now(timezone.utc),
is_error=payload.get("is_error", False),
raw=data
))
# Handle reasoning (thinking) - store as message with thinking flag
elif msg_type == "reasoning":
reasoning_content = payload.get("content", "")
if reasoning_content:
# Note: reasoning doesn't count as a message in total_messages
pass # Could store if needed for future analysis
# Handle event_msg (legacy format or additional events)
elif entry_type == "event_msg":
event_type = payload.get("type", "")
if event_type == "function_call":
tool_id = payload.get("call_id") or payload.get("id") or self.generate_message_id()
tool_name = payload.get("name", "")
tool_args = self._parse_tool_arguments(payload.get("arguments", "{}"))
entries.append(self._create_tool_use(
tool_name=tool_name,
tool_input=tool_args,
tool_id=tool_id,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=last_message_id,
raw=data
))
elif event_type == "function_call_output" and include_tool_results:
tool_id = payload.get("call_id") or payload.get("id", "")
output = payload.get("output", "")
entries.append(self._create_tool_result(
tool_id=tool_id,
result=output if isinstance(output, str) else json.dumps(output),
timestamp=timestamp or datetime.now(timezone.utc),
is_error=payload.get("is_error", False),
raw=data
))
# Update metadata
metadata.started_at = first_timestamp
metadata.ended_at = last_timestamp
metadata.total_messages = len([e for e in entries if e.type == "message"])
metadata.total_tokens_input = total_input
metadata.total_tokens_output = total_output
return ExtractionResult(
success=True,
metadata=metadata,
entries=entries,
warnings=warnings
)
def _extract_legacy_format(
self,
raw_entries: List[Dict[str, Any]],
source: Path,
session_id: Optional[str],
include_tool_results: bool,
warnings: List[str]
) -> ExtractionResult:
"""Extract from legacy Codex format (session_id + role at top level)."""
entries: List[ExtractedEntry] = []
# Track sessions found
sessions: Dict[str, List[Dict[str, Any]]] = {}
target_session = session_id
for data in raw_entries:
sid = data.get("session_id", "default")
# Filter by session_id if specified
if target_session and sid != target_session:
continue
if sid not in sessions:
sessions[sid] = []
sessions[sid].append(data)
# If no target session, use first/only session
if not target_session:
if len(sessions) == 1:
target_session = list(sessions.keys())[0]
elif len(sessions) > 1:
target_session = list(sessions.keys())[-1]
warnings.append(f"Multiple sessions found, using: {target_session}")
if not target_session or target_session not in sessions:
return ExtractionResult(
success=False,
metadata=SessionMetadata(
session_id=target_session or "unknown",
llm_source=self.llm_name
),
errors=["No session data found"]
)
session_data = sessions[target_session]
# Initialize metadata
metadata = SessionMetadata(
session_id=target_session,
llm_source=self.llm_name,
source_path=source
)
first_timestamp: Optional[datetime] = None
last_timestamp: Optional[datetime] = None
total_input = 0
total_output = 0
last_message_id: Optional[str] = None
for data in session_data:
timestamp = self._parse_timestamp(data)
if timestamp:
if first_timestamp is None:
first_timestamp = timestamp
last_timestamp = timestamp
role = data.get("role", "")
content = data.get("content", "")
# Extract model
if metadata.llm_model is None:
metadata.llm_model = data.get("model")
# Create message entry
message_id = data.get("id") or self.generate_message_id()
if role in ("user", "assistant", "system"):
usage = data.get("usage", {})
entry = self._create_message(
role=role,
content=content,
timestamp=timestamp or datetime.now(timezone.utc),
message_id=message_id,
parent_id=last_message_id,
model=data.get("model"),
usage={
"input": usage.get("prompt_tokens", 0),
"output": usage.get("completion_tokens", 0)
} if usage else None,
raw=data
)
entries.append(entry)
last_message_id = message_id
# Track usage
total_input += usage.get("prompt_tokens", 0)
total_output += usage.get("completion_tokens", 0)
# Extract tool calls
tool_calls = data.get("tool_calls", [])
for tc in tool_calls:
tool_id = tc.get("id", self.generate_message_id())
entries.append(self._create_tool_use(
tool_name=tc.get("function", {}).get("name", ""),
tool_input=self._parse_tool_arguments(tc.get("function", {}).get("arguments", "{}")),
tool_id=tool_id,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=message_id,
raw=tc
))
# Extract tool results (in content for tool role)
if role == "tool" and include_tool_results:
tool_call_id = data.get("tool_call_id", "")
entries.append(self._create_tool_result(
tool_id=tool_call_id,
result=content,
timestamp=timestamp or datetime.now(timezone.utc),
is_error=data.get("is_error", False),
raw=data
))
# Update metadata
metadata.started_at = first_timestamp
metadata.ended_at = last_timestamp
metadata.total_messages = len([e for e in entries if e.type == "message"])
metadata.total_tokens_input = total_input
metadata.total_tokens_output = total_output
return ExtractionResult(
success=True,
metadata=metadata,
entries=entries,
warnings=warnings
)
def _parse_timestamp(self, data: Dict[str, Any]) -> Optional[datetime]:
"""Parse timestamp from entry data."""
for field in ("timestamp", "created_at", "ts"):
ts = data.get(field)
if ts:
try:
if isinstance(ts, (int, float)):
return datetime.fromtimestamp(ts, tz=timezone.utc)
elif isinstance(ts, str):
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except (ValueError, OSError):
pass
return None
def _parse_tool_arguments(self, args: str) -> Dict[str, Any]:
"""Parse tool arguments from JSON string."""
if isinstance(args, dict):
return args
try:
return json.loads(args)
except (json.JSONDecodeError, TypeError):
return {"raw": args}
def list_sessions(self, source: Path) -> List[SessionMetadata]:
"""List available sessions in source."""
sessions: List[SessionMetadata] = []
if source.is_file():
# Scan history.jsonl for session IDs
session_ids: Dict[str, datetime] = {}
try:
with open(source, 'r', encoding='utf-8') as f:
for line in f:
if not line.strip():
continue
try:
data = json.loads(line)
sid = data.get("session_id")
if sid:
ts = self._parse_timestamp(data)
if sid not in session_ids or (ts and ts > session_ids.get(sid, datetime.min.replace(tzinfo=timezone.utc))):
session_ids[sid] = ts or datetime.now(timezone.utc)
except json.JSONDecodeError:
pass
for sid, ts in session_ids.items():
sessions.append(SessionMetadata(
session_id=sid,
llm_source=self.llm_name,
source_path=source,
started_at=ts
))
except IOError:
pass
elif source.is_dir():
# Scan for JSONL files
for jsonl_file in source.glob("**/*.jsonl"):
try:
stat = jsonl_file.stat()
sessions.append(SessionMetadata(
session_id=jsonl_file.stem,
llm_source=self.llm_name,
source_path=jsonl_file,
started_at=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
))
except OSError:
pass
return sessions
if name == "main": import argparse
parser = argparse.ArgumentParser(description="Codex Session Extractor")
parser.add_argument("source", help="Session file or directory")
parser.add_argument("--session-id", help="Specific session ID to extract")
parser.add_argument("--list", action="store_true", help="List sessions only")
args = parser.parse_args()
extractor = CodexExtractor()
source = Path(args.source)
if args.list:
sessions = extractor.list_sessions(source)
for s in sessions:
print(f"{s.session_id}: {s.source_path}")
else:
result = extractor.extract(source, session_id=args.session_id)
print(f"Success: {result.success}")
print(f"Session: {result.metadata.session_id}")
print(f"Model: {result.metadata.llm_model}")
print(f"Messages: {result.metadata.total_messages}")
print(f"Entries: {result.entry_count}")
if result.errors:
print(f"Errors: {result.errors}")