scripts-gemini-extractor
#!/usr/bin/env python3 """​
title: "Gemini Session Extractor" component_type: script version: "1.1.0" audience: contributor status: stable summary: "Extract session data from Google Gemini CLI JSONL and JSON files" keywords: ['gemini', 'extractor', 'session', 'jsonl', 'json', 'google'] tokens: ~400 created: 2026-01-28 updated: 2026-02-04 script_name: "gemini_extractor.py" language: python executable: true usage: "from scripts.extractors.gemini_extractor import GeminiExtractor" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false​
Gemini Session Extractor for CODITECT /sx command.
Extracts session data from Google Gemini CLI session files.
Supported formats:
- JSONL: ~/.gemini/sessions/*.jsonl (native Gemini CLI format)
- JSON: Web UI exports and LOSSLESS session copies (messages array format)
Session file locations:
- Sessions: ~/.gemini/tmp/<workdir_hash>/chats/session-*.json (Gemini CLI 2025+)
- Logs: ~/.gemini/tmp/<workdir_hash>/logs.json
- Pending: ~/PROJECTS/.coditect-data/sessions-export-pending-gemini/*.json
Track: J.13 (Memory - Generic Session Export) Task: J.13.2.3, J.23.6.1 """
from future import annotations
import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional
Add parent paths for imports
_script_dir = Path(file).resolve().parent _scripts_dir = _script_dir.parent _coditect_root = _scripts_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root)) if str(_scripts_dir) not in sys.path: sys.path.insert(0, str(_scripts_dir))
from core.session_extractor import ( SessionExtractor, SessionMetadata, ExtractedEntry, ExtractionResult )
class GeminiExtractor(SessionExtractor): """ Extracts session data from Google Gemini CLI sessions.
Supports:
- Native JSONL session files (~/.gemini/sessions/*.jsonl)
- Export files (~/.gemini/exports/*.jsonl)
- JSON session files (Web UI exports, LOSSLESS copies)
JSONL Entry format (Gemini CLI):
- role: user/model
- parts: Array of content parts
- metadata: Optional metadata including citations
JSON Entry format (Web UI / LOSSLESS exports):
- sessionId: Session UUID
- messages: Array of message objects
- type: user/gemini
- content: Message text
- thoughts: Optional thinking/reasoning (for gemini messages)
- toolCalls: Optional tool invocations
"""
@property
def llm_name(self) -> str:
return "gemini"
def can_extract(self, source: Path) -> bool:
"""Check if this extractor can handle the source."""
if not source.exists():
return False
# JSONL file
if source.suffix == ".jsonl":
try:
with open(source, 'r', encoding='utf-8') as f:
first_line = f.readline()
if first_line:
data = json.loads(first_line)
# Gemini uses 'role' with 'user'/'model' and 'parts'
role = data.get("role", "")
return role in ("user", "model") and "parts" in data
except (json.JSONDecodeError, IOError):
pass
# JSON file (Web UI / Other CLI format)
if source.suffix == ".json":
try:
with open(source, 'r', encoding='utf-8') as f:
# Read start of file to avoid loading huge files entirely if not needed check
# But for JSON we generally need to load it. For safety, just read first chars.
# Actually, let's just try to load it if it's not too huge, or peek.
# Given the session file size (~1MB), loading is fine.
data = json.load(f)
return isinstance(data, dict) and "messages" in data and isinstance(data["messages"], list)
except (json.JSONDecodeError, IOError):
pass
# Check for gemini path pattern
if ".gemini" in str(source):
return True
return False
def extract(
self,
source: Path,
session_id: Optional[str] = None,
include_tool_results: bool = True,
include_thinking: bool = True,
**kwargs
) -> ExtractionResult:
"""
Extract session data from Gemini source.
Args:
source: Path to JSONL or JSON file
session_id: Session ID (extracted from filename if not provided)
include_tool_results: Include tool result content
include_thinking: Include thinking content
Returns:
ExtractionResult with all extracted data
"""
if not source.exists():
return ExtractionResult(
success=False,
metadata=SessionMetadata(
session_id=session_id or "unknown",
llm_source=self.llm_name
),
errors=[f"Source file not found: {source}"]
)
if source.suffix == ".json":
return self._extract_json(source, session_id, include_tool_results, include_thinking)
return self._extract_jsonl(source, session_id, include_tool_results)
def _extract_json(
self,
source: Path,
session_id: Optional[str],
include_tool_results: bool,
include_thinking: bool
) -> ExtractionResult:
"""Extract from Gemini JSON session file."""
entries: List[ExtractedEntry] = []
errors: List[str] = []
warnings: List[str] = []
try:
with open(source, 'r', encoding='utf-8') as f:
data = json.load(f)
except (IOError, json.JSONDecodeError) as e:
return ExtractionResult(
success=False,
metadata=SessionMetadata(session_id=session_id or "unknown", llm_source=self.llm_name),
errors=[f"File read error: {e}"]
)
# Extract session metadata
if session_id is None:
session_id = data.get("sessionId") or source.stem
metadata = SessionMetadata(
session_id=session_id,
llm_source=self.llm_name,
source_path=source
)
messages = data.get("messages", [])
first_timestamp: Optional[datetime] = None
last_timestamp: Optional[datetime] = None
total_input = 0
total_output = 0
last_message_id: Optional[str] = None
for msg in messages:
timestamp = self._parse_timestamp(msg)
if timestamp:
if first_timestamp is None:
first_timestamp = timestamp
last_timestamp = timestamp
msg_type = msg.get("type", "")
content = msg.get("content", "")
# Map roles
role = msg_type
if msg_type == "gemini":
role = "assistant"
message_id = msg.get("id") or self.generate_message_id()
# Extract usage
tokens = msg.get("tokens", {})
usage = None
if tokens:
input_tokens = tokens.get("input", 0)
output_tokens = tokens.get("output", 0)
usage = {
"input": input_tokens,
"output": output_tokens
}
total_input += input_tokens
total_output += output_tokens
# Extract thinking/thoughts if available and requested
thinking = None
if include_thinking:
thoughts = msg.get("thoughts", [])
if thoughts:
thinking_parts = []
for t in thoughts:
desc = t.get("description", "")
subject = t.get("subject", "")
if subject and desc:
thinking_parts.append(f"Subject: {subject}\n{desc}")
elif desc:
thinking_parts.append(desc)
if thinking_parts:
thinking = "\n\n".join(thinking_parts)
# Create message entry
entry = self._create_message(
role=role,
content=content,
timestamp=timestamp or datetime.now(timezone.utc),
message_id=message_id,
parent_id=last_message_id,
model=msg.get("model"),
usage=usage,
thinking=thinking,
raw=msg
)
entries.append(entry)
last_message_id = message_id
# Handle tool calls
tool_calls = msg.get("toolCalls", [])
for tc in tool_calls:
tool_id = tc.get("id") or self.generate_message_id()
tool_name = tc.get("name", "")
tool_args = tc.get("args", {})
tool_entry = self._create_tool_use(
tool_name=tool_name,
tool_input=tool_args,
tool_id=tool_id,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=message_id,
raw=tc
)
entries.append(tool_entry)
# Check for results (often nested or separate in some formats, but here likely in toolCalls 'result' field?)
# Looking at the provided file sample:
# "toolCalls": [ { "id": "...", "result": [ { "functionResponse": { ... "response": { ... } } } ] } ]
# It seems 'result' is a list of objects wrapping functionResponse
results = tc.get("result", [])
for res_wrapper in results:
func_resp_wrapper = res_wrapper.get("functionResponse", {})
# sometimes the result structure is different, handle safely
# If functionResponse is the wrapper
if "response" in func_resp_wrapper:
tool_result = func_resp_wrapper["response"]
# The tool_id for result should match the call.
# Assuming 1:1 mapping if possible, or using the call's tool_id
result_entry = self._create_tool_result(
tool_id=tool_id,
result=json.dumps(tool_result),
timestamp=timestamp or datetime.now(timezone.utc),
is_error=False, # We don't see explicit error field in this sample
raw=res_wrapper
)
entries.append(result_entry)
# Update metadata
metadata.started_at = first_timestamp
metadata.ended_at = last_timestamp
metadata.total_messages = len([e for e in entries if e.type == "message"])
metadata.total_tokens_input = total_input
metadata.total_tokens_output = total_output
return ExtractionResult(
success=True,
metadata=metadata,
entries=entries,
errors=errors,
warnings=warnings
)
def _extract_jsonl(
self,
source: Path,
session_id: Optional[str],
include_tool_results: bool
) -> ExtractionResult:
"""Extract from Gemini JSONL file."""
entries: List[ExtractedEntry] = []
errors: List[str] = []
warnings: List[str] = []
# Session ID from filename
if session_id is None:
session_id = source.stem
# Initialize metadata
metadata = SessionMetadata(
session_id=session_id,
llm_source=self.llm_name,
source_path=source
)
first_timestamp: Optional[datetime] = None
last_timestamp: Optional[datetime] = None
total_input = 0
total_output = 0
last_message_id: Optional[str] = None
try:
with open(source, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError as e:
warnings.append(f"Line {line_num}: JSON parse error: {e}")
continue
timestamp = self._parse_timestamp(data)
if timestamp:
if first_timestamp is None:
first_timestamp = timestamp
last_timestamp = timestamp
role = data.get("role", "")
parts = data.get("parts", [])
# Map Gemini roles to standard roles
if role == "model":
role = "assistant"
# Extract content from parts
content_parts = []
for part in parts:
if isinstance(part, str):
content_parts.append(part)
elif isinstance(part, dict):
if "text" in part:
content_parts.append(part["text"])
elif "functionCall" in part:
# Function/tool call
fc = part["functionCall"]
tool_id = fc.get("id", self.generate_message_id())
entries.append(self._create_tool_use(
tool_name=fc.get("name", ""),
tool_input=fc.get("args", {}),
tool_id=tool_id,
timestamp=timestamp or datetime.now(timezone.utc),
parent_id=last_message_id,
raw=part
))
elif "functionResponse" in part and include_tool_results:
# Function response (tool result)
fr = part["functionResponse"]
entries.append(self._create_tool_result(
tool_id=fr.get("id", ""),
result=json.dumps(fr.get("response", {})),
timestamp=timestamp or datetime.now(timezone.utc),
is_error=fr.get("error") is not None,
error_message=fr.get("error"),
raw=part
))
# Create message if we have content
if content_parts and role in ("user", "assistant"):
message_id = data.get("id") or self.generate_message_id()
# Extract usage from metadata
usage = data.get("usageMetadata", {})
entry = self._create_message(
role=role,
content="\n".join(content_parts),
timestamp=timestamp or datetime.now(timezone.utc),
message_id=message_id,
parent_id=last_message_id,
model=data.get("model") or data.get("modelVersion"),
usage={
"input": usage.get("promptTokenCount", 0),
"output": usage.get("candidatesTokenCount", 0)
} if usage else None,
raw=data
)
entries.append(entry)
last_message_id = message_id
# Track usage
total_input += usage.get("promptTokenCount", 0)
total_output += usage.get("candidatesTokenCount", 0)
# Extract model
if metadata.llm_model is None:
metadata.llm_model = data.get("model") or data.get("modelVersion")
except IOError as e:
errors.append(f"File read error: {e}")
return ExtractionResult(
success=False,
metadata=metadata,
errors=errors
)
# Update metadata
metadata.started_at = first_timestamp
metadata.ended_at = last_timestamp
metadata.total_messages = len([e for e in entries if e.type == "message"])
metadata.total_tokens_input = total_input
metadata.total_tokens_output = total_output
return ExtractionResult(
success=True,
metadata=metadata,
entries=entries,
errors=errors,
warnings=warnings
)
def _parse_timestamp(self, data: Dict[str, Any]) -> Optional[datetime]:
"""Parse timestamp from entry data."""
for field in ("timestamp", "createTime", "created_at"):
ts = data.get(field)
if ts:
try:
if isinstance(ts, (int, float)):
# Gemini often uses milliseconds
if ts > 1e12:
ts = ts / 1000
return datetime.fromtimestamp(ts, tz=timezone.utc)
elif isinstance(ts, str):
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except (ValueError, OSError):
pass
return None
def list_sessions(self, source: Path) -> List[SessionMetadata]:
"""List available sessions in source directory.
Discovers both JSONL and JSON session files (J.23.6.1).
"""
sessions: List[SessionMetadata] = []
if source.is_file():
result = self.extract(source)
if result.success:
sessions.append(result.metadata)
elif source.is_dir():
# Discover both JSONL and JSON files (J.23.6.1)
session_files: List[Path] = []
session_files.extend(source.glob("**/*.jsonl"))
session_files.extend(source.glob("**/*.json"))
for session_file in session_files:
# Skip non-Gemini JSON files by checking if extractable
if session_file.suffix == ".json" and not self.can_extract(session_file):
continue
try:
stat = session_file.stat()
sessions.append(SessionMetadata(
session_id=session_file.stem,
llm_source=self.llm_name,
source_path=session_file,
started_at=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
))
except OSError:
pass
return sessions
if name == "main": import argparse
parser = argparse.ArgumentParser(description="Gemini Session Extractor")
parser.add_argument("source", help="Session file or directory")
parser.add_argument("--list", action="store_true", help="List sessions only")
args = parser.parse_args()
extractor = GeminiExtractor()
source = Path(args.source)
if args.list:
sessions = extractor.list_sessions(source)
for s in sessions:
print(f"{s.session_id}: {s.source_path}")
else:
result = extractor.extract(source)
print(f"Success: {result.success}")
print(f"Session: {result.metadata.session_id}")
print(f"Model: {result.metadata.llm_model}")
print(f"Messages: {result.metadata.total_messages}")
print(f"Entries: {result.entry_count}")
if result.errors:
print(f"Errors: {result.errors}")