scripts-kimi-extractor
#!/usr/bin/env python3 """​
title: "KIMI Session Extractor" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Extract session data from KIMI CLI JSONL files" keywords: ['kimi', 'extractor', 'session', 'jsonl', 'moonshot'] tokens: ~500 created: 2026-01-28 updated: 2026-01-28 script_name: "kimi_extractor.py" language: python executable: true usage: "from scripts.extractors.kimi_extractor import KimiExtractor" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false​
KIMI Session Extractor for CODITECT /sx command.
Extracts session data from KIMI CLI (Moonshot AI) native JSONL files.
Session file locations:
- Native: ~/.kimi/sessions/<workdir_hash>/<session_uuid>/context.jsonl
- Wire Log: ~/.kimi/sessions/<workdir_hash>/<session_uuid>/wire.jsonl
- User History: ~/.kimi/user-history/<workdir_hash>.jsonl
Track: KIMI.1 (KIMI Integration - Session Export) Task: KIMI.1.1 """
from future import annotations
import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Tuple
Add parent paths for imports
_script_dir = Path(file).resolve().parent _scripts_dir = _script_dir.parent _coditect_root = _scripts_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root)) if str(_scripts_dir) not in sys.path: sys.path.insert(0, str(_scripts_dir))
from core.session_extractor import ( SessionExtractor, SessionMetadata, ExtractedEntry, ExtractionResult )
class KimiExtractor(SessionExtractor): """ Extracts session data from KIMI CLI sessions.
Supports:
- Native context.jsonl session files (~/.kimi/sessions/<hash>/<uuid>/context.jsonl)
- Wire protocol logs (wire.jsonl)
- User history files (~/.kimi/user-history/<hash>.jsonl)
Entry types extracted:
- user: User messages
- assistant: AI responses
- system: System messages, checkpoints, usage markers
"""
# KIMI JSONL role constants
ROLE_USER = "user"
ROLE_ASSISTANT = "assistant"
ROLE_SYSTEM = "system"
ROLE_CHECKPOINT = "_checkpoint"
ROLE_USAGE = "_usage"
@property
def llm_name(self) -> str:
return "kimi"
def can_extract(self, source: Path) -> bool:
"""Check if this extractor can handle the source."""
if not source.exists():
return False
# Context JSONL file
if source.name == "context.jsonl":
try:
with open(source, 'r', encoding='utf-8') as f:
first_line = f.readline()
if first_line:
data = json.loads(first_line)
# KIMI uses "role" field with specific values
role = data.get("role", "")
return role in ("user", "assistant", "system", "_checkpoint", "_usage")
except (json.JSONDecodeError, IOError):
pass
# User history JSONL
if source.suffix == ".jsonl" and "user-history" in str(source):
try:
with open(source, 'r', encoding='utf-8') as f:
first_line = f.readline()
if first_line:
data = json.loads(first_line)
# User history has simple "content" field
return "content" in data and "role" not in data
except (json.JSONDecodeError, IOError):
pass
# Wire JSONL
if source.name == "wire.jsonl":
try:
with open(source, 'r', encoding='utf-8') as f:
first_line = f.readline()
if first_line:
data = json.loads(first_line)
# Wire format has "type" and "protocol_version"
return data.get("type") == "metadata" and "protocol_version" in data
except (json.JSONDecodeError, IOError):
pass
return False
def extract(
self,
source: Path,
session_id: Optional[str] = None,
include_system: bool = True,
include_checkpoints: bool = False,
**kwargs
) -> ExtractionResult:
"""
Extract session data from KIMI source.
Args:
source: Path to context.jsonl, wire.jsonl, or user-history.jsonl
session_id: Session ID (extracted from path if not provided)
include_system: Include system messages and checkpoints
include_checkpoints: Include _checkpoint entries (metadata)
Returns:
ExtractionResult with all extracted data
"""
if not source.exists():
return ExtractionResult(
success=False,
metadata=SessionMetadata(
session_id=session_id or "unknown",
llm_source=self.llm_name
),
errors=[f"Source file not found: {source}"]
)
# Determine format and extract
if source.name == "context.jsonl":
return self._extract_context_jsonl(source, session_id, include_system, include_checkpoints)
elif source.name == "wire.jsonl":
# wire.jsonl is ALWAYS extracted losslessly - preserves ALL protocol data
return self._extract_wire_jsonl(source, session_id)
elif "user-history" in str(source):
return self._extract_user_history(source, session_id)
else:
return ExtractionResult(
success=False,
metadata=SessionMetadata(
session_id=session_id or "unknown",
llm_source=self.llm_name
),
errors=[f"Unsupported file format: {source.name}"]
)
def _extract_context_jsonl(
self,
source: Path,
session_id: Optional[str],
include_system: bool,
include_checkpoints: bool
) -> ExtractionResult:
"""Extract from native context.jsonl session file."""
entries: List[ExtractedEntry] = []
errors: List[str] = []
warnings: List[str] = []
# Session ID from parent directory name
if session_id is None:
session_id = source.parent.name
# Initialize metadata
metadata = SessionMetadata(
session_id=session_id,
llm_source=self.llm_name,
source_path=source
)
first_timestamp: Optional[datetime] = None
last_timestamp: Optional[datetime] = None
message_count = 0
checkpoint_count = 0
last_message_id: Optional[str] = None
try:
with open(source, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError as e:
warnings.append(f"Line {line_num}: JSON parse error: {e}")
continue
# Extract timestamp from content if present
timestamp = self._parse_timestamp(data)
if timestamp:
if first_timestamp is None:
first_timestamp = timestamp
last_timestamp = timestamp
# Process by role type
role = data.get("role", "")
raw_content = data.get("content", "")
# Normalize content - handle list of content parts
if isinstance(raw_content, list):
content = self._extract_text_from_parts(raw_content)
else:
content = str(raw_content) if raw_content else ""
if role == self.ROLE_USER:
entry = self._create_message(
role="user",
content=content,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=last_message_id,
raw=data
)
entries.append(entry)
message_count += 1
last_message_id = entry.data.get("message_id")
elif role == self.ROLE_ASSISTANT:
entry = self._create_message(
role="assistant",
content=content,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=last_message_id,
raw=data
)
entries.append(entry)
message_count += 1
last_message_id = entry.data.get("message_id")
elif role == self.ROLE_SYSTEM and include_system:
entry = self._create_message(
role="system",
content=content,
timestamp=timestamp or datetime.now(timezone.utc),
raw=data
)
entries.append(entry)
elif role == self.ROLE_CHECKPOINT:
checkpoint_count += 1
if include_checkpoints:
entry = self._create_message(
role="system",
content=f"[Checkpoint {data.get('id', 'unknown')}]",
timestamp=timestamp or datetime.now(timezone.utc),
raw=data
)
entries.append(entry)
elif role == self.ROLE_USAGE:
# Token usage marker - update metadata
token_count = data.get("token_count", 0)
if token_count:
metadata.total_tokens_output = token_count
except IOError as e:
errors.append(f"File read error: {e}")
return ExtractionResult(
success=False,
metadata=metadata,
errors=errors
)
# Update metadata
metadata.started_at = first_timestamp
metadata.ended_at = last_timestamp
metadata.total_messages = message_count
# Extract project path from directory structure
self._extract_path_metadata(source, metadata)
return ExtractionResult(
success=True,
metadata=metadata,
entries=entries,
errors=errors,
warnings=warnings
)
def _extract_user_history(
self,
source: Path,
session_id: Optional[str]
) -> ExtractionResult:
"""Extract from user history JSONL (simple input log)."""
entries: List[ExtractedEntry] = []
errors: List[str] = []
if session_id is None:
session_id = source.stem
metadata = SessionMetadata(
session_id=session_id,
llm_source=self.llm_name,
source_path=source
)
try:
with open(source, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError as e:
continue
content = data.get("content", "")
if content:
entry = self._create_message(
role="user",
content=content,
timestamp=datetime.now(timezone.utc),
raw=data
)
entries.append(entry)
except IOError as e:
errors.append(f"File read error: {e}")
metadata.total_messages = len(entries)
return ExtractionResult(
success=len(entries) > 0,
metadata=metadata,
entries=entries,
errors=errors
)
def _extract_wire_jsonl(
self,
source: Path,
session_id: Optional[str]
) -> ExtractionResult:
"""Extract from wire protocol log - preserves ALL entries without consolidation."""
entries: List[ExtractedEntry] = []
errors: List[str] = []
warnings: List[str] = []
total_input_tokens = 0
total_output_tokens = 0
first_timestamp: Optional[datetime] = None
last_timestamp: Optional[datetime] = None
if session_id is None:
session_id = source.parent.name
metadata = SessionMetadata(
session_id=session_id,
llm_source=self.llm_name,
source_path=source
)
try:
with open(source, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError as e:
warnings.append(f"Line {line_num}: JSON parse error: {e}")
continue
timestamp = self._parse_timestamp(data)
if timestamp:
if first_timestamp is None:
first_timestamp = timestamp
last_timestamp = timestamp
message = data.get("message", {})
msg_type = message.get("type", "")
payload = message.get("payload", {})
if data.get("type") == "metadata":
metadata.raw_metadata = data
continue
# Build entry data
entry_data = {
"wire_type": msg_type,
"timestamp": timestamp.isoformat() if timestamp else None,
"line_number": line_num,
}
# Type-specific extraction - determine entry_type for CUSF compliance
entry_type = "message" # Default
if msg_type == "TurnBegin":
entry_data["role"] = "user"
user_input = payload.get("user_input", "")
# Handle list of content parts (e.g., [{"type": "text", "text": "..."}])
if isinstance(user_input, list):
entry_data["content"] = self._extract_text_from_parts(user_input)
else:
entry_data["content"] = user_input
entry_data["turn_id"] = payload.get("turn_id")
elif msg_type == "ContentPart":
entry_data["role"] = "assistant"
entry_data["content_type"] = payload.get("type")
content = payload.get("text") or payload.get("think") or payload
# Ensure content is a string
if isinstance(content, (dict, list)):
entry_data["content"] = self._extract_text_from_parts(content if isinstance(content, list) else [content])
else:
entry_data["content"] = str(content) if content else ""
entry_data["encrypted"] = payload.get("encrypted")
elif msg_type == "StatusUpdate":
entry_data["role"] = "system"
entry_data["context_usage"] = payload.get("context_usage")
entry_data["message_id"] = payload.get("message_id")
token_usage = payload.get("token_usage", {})
if token_usage:
entry_data["token_usage"] = token_usage
entry_data["usage"] = {
"input_tokens": token_usage.get("input_other", 0) + token_usage.get("input_cache_read", 0),
"output_tokens": token_usage.get("output", 0),
"cache_read": token_usage.get("input_cache_read", 0),
"cache_creation": token_usage.get("input_cache_creation", 0),
}
total_input_tokens += entry_data["usage"]["input_tokens"]
total_output_tokens += entry_data["usage"]["output_tokens"]
elif msg_type == "ToolCall":
entry_type = "tool_use" # CUSF: tool_use for tool calls
entry_data["tool_type"] = payload.get("type")
entry_data["tool_id"] = payload.get("id")
entry_data["tool_name"] = payload.get("function", {}).get("name")
entry_data["tool_input"] = payload.get("function", {}).get("arguments")
elif msg_type == "ToolResult":
entry_type = "tool_result" # CUSF: tool_result for results
entry_data["tool_id"] = payload.get("tool_call_id")
return_value = payload.get("return_value", {})
entry_data["is_error"] = return_value.get("is_error", False)
entry_data["result"] = return_value.get("output", "")
elif msg_type == "ToolCallPart":
entry_type = "tool_use" # Streaming tool call - also tool_use
entry_data["tool_id"] = payload.get("id")
entry_data["partial"] = True
entry_data["tool_input"] = payload
elif msg_type == "TurnEnd":
entry_data["role"] = "system"
entry_data["content"] = "[Turn End]"
elif msg_type == "StepBegin":
entry_data["role"] = "system"
entry_data["step_number"] = payload.get("n")
entry_data["content"] = f"[Step {payload.get('n', '?')} Begin]"
elif msg_type == "AssistantMessage":
entry_data["role"] = "assistant"
entry_data["content"] = payload.get("content", "")
else:
entry_data["role"] = "unknown"
entry_data["content"] = str(payload)[:1000]
entry_data["_unknown_type"] = msg_type
entry = ExtractedEntry(
type=entry_type,
timestamp=timestamp or datetime.now(timezone.utc),
data=entry_data,
raw=data
)
entries.append(entry)
except IOError as e:
errors.append(f"File read error: {e}")
metadata.started_at = first_timestamp
metadata.ended_at = last_timestamp
metadata.total_messages = len(entries)
metadata.total_tokens_input = total_input_tokens
metadata.total_tokens_output = total_output_tokens
self._extract_path_metadata(source, metadata)
return ExtractionResult(
success=len(entries) > 0,
metadata=metadata,
entries=entries,
errors=errors,
warnings=warnings
)
def _extract_text_from_parts(self, parts: List[Any]) -> str:
"""Extract text content from a list of content parts."""
texts = []
for part in parts:
if isinstance(part, str):
texts.append(part)
elif isinstance(part, dict):
# Try common text fields
text = part.get("text") or part.get("content") or part.get("think") or ""
if text:
texts.append(str(text))
return "\n".join(texts) if texts else ""
def _parse_timestamp(self, data: Dict[str, Any]) -> Optional[datetime]:
"""Parse timestamp from entry data."""
for field in ("timestamp", "ts", "time", "created_at"):
ts = data.get(field)
if ts:
try:
if isinstance(ts, (int, float)):
return datetime.fromtimestamp(ts, tz=timezone.utc)
elif isinstance(ts, str):
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except (ValueError, OSError):
pass
return None
def _extract_path_metadata(self, source: Path, metadata: SessionMetadata) -> None:
"""Extract project/cwd from session file path."""
# KIMI sessions are in ~/.kimi/sessions/<workdir_hash>/<session_uuid>/
parts = source.parts
if "sessions" in parts:
idx = parts.index("sessions")
if idx + 1 < len(parts):
workdir_hash = parts[idx + 1]
metadata.project_path = workdir_hash
# Try to resolve workdir from kimi.json
try:
kimi_json = Path.home() / ".kimi" / "kimi.json"
if kimi_json.exists():
with open(kimi_json, 'r') as f:
config = json.load(f)
for work_dir in config.get("work_dirs", []):
# Simple hash match (would need actual MD5 check)
if workdir_hash in str(work_dir.get("path", "")):
metadata.cwd = work_dir.get("path")
break
except (IOError, json.JSONDecodeError):
pass
def list_sessions(self, source: Path) -> List[SessionMetadata]:
"""List available sessions in source directory."""
sessions: List[SessionMetadata] = []
if source.is_file():
result = self.extract(source)
if result.success:
sessions.append(result.metadata)
elif source.is_dir():
# Look for session directories (UUIDs)
for item in source.iterdir():
if item.is_dir():
context_file = item / "context.jsonl"
if context_file.exists():
try:
stat = context_file.stat()
sessions.append(SessionMetadata(
session_id=item.name,
llm_source=self.llm_name,
source_path=context_file,
started_at=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
))
except OSError:
pass
return sessions
if name == "main": import argparse
parser = argparse.ArgumentParser(description="KIMI Session Extractor")
parser.add_argument("source", help="Session file or directory")
parser.add_argument("--list", action="store_true", help="List sessions only")
parser.add_argument("--include-checkpoints", action="store_true", help="Include checkpoint entries")
args = parser.parse_args()
extractor = KimiExtractor()
source = Path(args.source)
if args.list:
sessions = extractor.list_sessions(source)
for s in sessions:
print(f"{s.session_id}: {s.source_path}")
else:
result = extractor.extract(source, include_checkpoints=args.include_checkpoints)
print(f"Success: {result.success}")
print(f"Session: {result.metadata.session_id}")
print(f"Entries: {result.entry_count}")
if result.metadata.total_tokens_input or result.metadata.total_tokens_output:
print(f"Input tokens: {result.metadata.total_tokens_input:,}")
print(f"Output tokens: {result.metadata.total_tokens_output:,}")
if result.errors:
print(f"Errors: {result.errors}")