Skip to main content

#!/usr/bin/env python3 """ Workflow State Manager - ADR-154 J.4.10

Manages workflow state persistence for long-running agentic workflows. Enables cross-session context continuity and workflow resumption.

Part of: Track J (Memory Intelligence) Task IDs: J.4.10.1-J.4.10.5 ADRs: ADR-154, ADR-118 (Four-Tier Database) """

import json import sqlite3 import uuid from dataclasses import dataclass, field, asdict from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional

Default paths (ADR-118 Tier 3)

DEFAULT_SESSIONS_DB = Path.home() / "PROJECTS/.coditect-data/context-storage/sessions.db"

=============================================================================

Data Classes

=============================================================================

@dataclass class WorkflowState: """Represents the state of a workflow.""" workflow_id: str workflow_type: str status: str = "active" # active, paused, completed, failed

# Context accumulation
anchor_nodes: List[str] = field(default_factory=list)
accumulated_decisions: List[Dict[str, Any]] = field(default_factory=list)
accumulated_errors: List[Dict[str, Any]] = field(default_factory=list)

# Progress tracking
current_step: Optional[str] = None
completed_steps: List[str] = field(default_factory=list)

# Metadata
started_at: Optional[str] = None
updated_at: Optional[str] = None
session_ids: List[str] = field(default_factory=list)

# Multi-tenant (ADR-119)
tenant_id: Optional[str] = None
user_id: Optional[str] = None
team_id: Optional[str] = None
project_id: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return asdict(self)

@dataclass class WorkflowContextSnapshot: """A point-in-time snapshot of workflow context.""" id: Optional[int] = None workflow_id: str = "" session_id: str = "" snapshot_type: str = "checkpoint" # checkpoint, error, decision context_graph_json: Optional[str] = None active_policies: List[str] = field(default_factory=list) created_at: Optional[str] = None

@dataclass class WorkflowDecision: """A decision made during a workflow.""" decision_type: str decision: str rationale: str affected_nodes: List[str] = field(default_factory=list) timestamp: Optional[str] = None step_id: Optional[str] = None

=============================================================================

Database Schema

=============================================================================

WORKFLOW_TABLES_SQL = """ -- Workflow state table (ADR-154 J.4.10.1) CREATE TABLE IF NOT EXISTS workflow_states ( workflow_id TEXT PRIMARY KEY, workflow_type TEXT NOT NULL, status TEXT DEFAULT 'active',

-- Context accumulation (JSON)
anchor_nodes TEXT,
accumulated_decisions TEXT,
accumulated_errors TEXT,

-- Progress
current_step TEXT,
completed_steps TEXT,

-- Metadata
started_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
session_ids TEXT,

-- Multi-tenant (ADR-119)
tenant_id TEXT,
user_id TEXT,
team_id TEXT,
project_id TEXT,
cloud_id TEXT,
synced_at TEXT,
sync_status TEXT DEFAULT 'pending'

);

-- Indexes for workflow_states CREATE INDEX IF NOT EXISTS idx_workflow_states_status ON workflow_states(status); CREATE INDEX IF NOT EXISTS idx_workflow_states_type ON workflow_states(workflow_type); CREATE INDEX IF NOT EXISTS idx_workflow_states_tenant ON workflow_states(tenant_id); CREATE INDEX IF NOT EXISTS idx_workflow_states_user ON workflow_states(tenant_id, user_id); CREATE INDEX IF NOT EXISTS idx_workflow_states_sync ON workflow_states(sync_status, synced_at);

-- Workflow context snapshots (for resumption) CREATE TABLE IF NOT EXISTS workflow_context_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, workflow_id TEXT NOT NULL, session_id TEXT NOT NULL, snapshot_type TEXT,

-- Context at snapshot time
context_graph_json TEXT,
active_policies TEXT,

created_at TEXT DEFAULT CURRENT_TIMESTAMP,

-- Multi-tenant (ADR-119)
tenant_id TEXT,
user_id TEXT,

FOREIGN KEY (workflow_id) REFERENCES workflow_states(workflow_id)

);

-- Indexes for snapshots CREATE INDEX IF NOT EXISTS idx_snapshots_workflow ON workflow_context_snapshots(workflow_id); CREATE INDEX IF NOT EXISTS idx_snapshots_session ON workflow_context_snapshots(session_id); CREATE INDEX IF NOT EXISTS idx_snapshots_type ON workflow_context_snapshots(snapshot_type); """

=============================================================================

Workflow State Manager

=============================================================================

class WorkflowStateManager: """ Manages workflow state persistence for long-running agentic workflows.

Features:
- Cross-session state persistence
- Context accumulation (decisions, errors)
- Workflow checkpointing and resumption
- Multi-tenant support (ADR-119)

Usage:
manager = WorkflowStateManager()

# Start a workflow
workflow = manager.start_workflow("security-audit", session_id="abc123")

# Record decisions during workflow
manager.record_decision(
workflow.workflow_id,
decision_type="security",
decision="Use OAuth 2.0 for API auth",
rationale="Industry standard, well-documented",
affected_nodes=["auth-service", "api-gateway"]
)

# Get workflow context (for agent injection)
context = manager.get_workflow_context(workflow.workflow_id)

# Resume workflow in new session
workflow = manager.resume_workflow(workflow.workflow_id, new_session_id="xyz789")
"""

def __init__(self, db_path: Optional[Path] = None):
"""
Initialize WorkflowStateManager.

Args:
db_path: Path to sessions.db. Defaults to ADR-118 location.
"""
self.db_path = db_path or DEFAULT_SESSIONS_DB
self._ensure_tables()

def _get_connection(self) -> sqlite3.Connection:
"""Get database connection."""
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn

def _ensure_tables(self) -> None:
"""Ensure workflow tables exist."""
if not self.db_path.exists():
self.db_path.parent.mkdir(parents=True, exist_ok=True)

conn = self._get_connection()
try:
conn.executescript(WORKFLOW_TABLES_SQL)
conn.commit()
finally:
conn.close()

# -------------------------------------------------------------------------
# Workflow Lifecycle
# -------------------------------------------------------------------------

def start_workflow(
self,
workflow_type: str,
session_id: str,
initial_step: Optional[str] = None,
anchor_nodes: Optional[List[str]] = None,
tenant_id: Optional[str] = None,
user_id: Optional[str] = None,
project_id: Optional[str] = None
) -> WorkflowState:
"""
Start a new workflow with initial state.

Args:
workflow_type: Type of workflow (e.g., "security-audit", "refactor")
session_id: Current session ID
initial_step: Starting step (e.g., "A.1.1")
anchor_nodes: Initial anchor nodes for context
tenant_id: Tenant ID for multi-tenant support
user_id: User ID
project_id: Project ID

Returns:
WorkflowState with generated workflow_id
"""
workflow_id = f"wf-{uuid.uuid4().hex[:12]}"
now = datetime.now().isoformat()

workflow = WorkflowState(
workflow_id=workflow_id,
workflow_type=workflow_type,
status="active",
current_step=initial_step,
anchor_nodes=anchor_nodes or [],
session_ids=[session_id],
started_at=now,
updated_at=now,
tenant_id=tenant_id,
user_id=user_id,
project_id=project_id
)

conn = self._get_connection()
try:
conn.execute("""
INSERT INTO workflow_states (
workflow_id, workflow_type, status,
anchor_nodes, accumulated_decisions, accumulated_errors,
current_step, completed_steps,
started_at, updated_at, session_ids,
tenant_id, user_id, project_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
workflow.workflow_id,
workflow.workflow_type,
workflow.status,
json.dumps(workflow.anchor_nodes),
json.dumps(workflow.accumulated_decisions),
json.dumps(workflow.accumulated_errors),
workflow.current_step,
json.dumps(workflow.completed_steps),
workflow.started_at,
workflow.updated_at,
json.dumps(workflow.session_ids),
workflow.tenant_id,
workflow.user_id,
workflow.project_id
))
conn.commit()
finally:
conn.close()

return workflow

def get_workflow(self, workflow_id: str) -> Optional[WorkflowState]:
"""
Get workflow state by ID.

Args:
workflow_id: Workflow identifier

Returns:
WorkflowState or None if not found
"""
conn = self._get_connection()
try:
row = conn.execute(
"SELECT * FROM workflow_states WHERE workflow_id = ?",
(workflow_id,)
).fetchone()

if not row:
return None

return WorkflowState(
workflow_id=row["workflow_id"],
workflow_type=row["workflow_type"],
status=row["status"],
anchor_nodes=json.loads(row["anchor_nodes"] or "[]"),
accumulated_decisions=json.loads(row["accumulated_decisions"] or "[]"),
accumulated_errors=json.loads(row["accumulated_errors"] or "[]"),
current_step=row["current_step"],
completed_steps=json.loads(row["completed_steps"] or "[]"),
started_at=row["started_at"],
updated_at=row["updated_at"],
session_ids=json.loads(row["session_ids"] or "[]"),
tenant_id=row["tenant_id"],
user_id=row["user_id"],
team_id=row["team_id"],
project_id=row["project_id"]
)
finally:
conn.close()

def resume_workflow(
self,
workflow_id: str,
new_session_id: str,
create_snapshot: bool = True
) -> Optional[WorkflowState]:
"""
Resume a paused or active workflow in a new session (J.4.10.5).

This enhanced resume:
1. Adds the new session to the workflow
2. Updates status from paused to active if needed
3. Optionally creates a resumption snapshot

Args:
workflow_id: Workflow to resume
new_session_id: New session ID
create_snapshot: Create a resumption snapshot (default: True)

Returns:
Updated WorkflowState or None if not found
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
return None

# Add new session to list
if new_session_id not in workflow.session_ids:
workflow.session_ids.append(new_session_id)

# Track previous status for snapshot
previous_status = workflow.status

# Update status if paused
if workflow.status == "paused":
workflow.status = "active"

workflow.updated_at = datetime.now().isoformat()

conn = self._get_connection()
try:
conn.execute("""
UPDATE workflow_states
SET status = ?, session_ids = ?, updated_at = ?
WHERE workflow_id = ?
""", (
workflow.status,
json.dumps(workflow.session_ids),
workflow.updated_at,
workflow_id
))
conn.commit()
finally:
conn.close()

# Create resumption snapshot if requested
if create_snapshot:
self.create_snapshot(
workflow_id=workflow_id,
session_id=new_session_id,
snapshot_type="resumption",
context_graph_json=json.dumps({
"resumed_from_status": previous_status,
"session_count": len(workflow.session_ids),
"completed_steps": workflow.completed_steps,
"decision_count": len(workflow.accumulated_decisions),
"error_count": len(workflow.accumulated_errors),
})
)

return workflow

def pause_workflow(
self,
workflow_id: str,
create_checkpoint: bool = True,
reason: Optional[str] = None
) -> bool:
"""
Pause an active workflow (J.4.10.5).

Pausing a workflow:
1. Sets status to 'paused'
2. Optionally creates a checkpoint snapshot
3. Records the pause reason if provided

Args:
workflow_id: Workflow to pause
create_checkpoint: Create checkpoint before pausing (default: True)
reason: Optional reason for pausing

Returns:
True if paused successfully, False if not found or already paused
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
return False

if workflow.status != "active":
return False # Can only pause active workflows

now = datetime.now().isoformat()

# Create checkpoint before pausing if requested
if create_checkpoint and workflow.session_ids:
checkpoint_data = {
"pause_reason": reason,
"paused_at_step": workflow.current_step,
"completed_steps": workflow.completed_steps,
"decision_count": len(workflow.accumulated_decisions),
"error_count": len(workflow.accumulated_errors),
}
self.create_snapshot(
workflow_id=workflow_id,
session_id=workflow.session_ids[-1], # Latest session
snapshot_type="pause_checkpoint",
context_graph_json=json.dumps(checkpoint_data)
)

conn = self._get_connection()
try:
conn.execute("""
UPDATE workflow_states
SET status = 'paused', updated_at = ?
WHERE workflow_id = ?
""", (now, workflow_id))
conn.commit()
return True
finally:
conn.close()

def get_resume_summary(
self,
workflow_id: str,
max_decisions: int = 5,
max_errors: int = 3
) -> Optional[str]:
"""
Get a resume summary for a workflow (J.4.10.5).

This is formatted specifically for agent context injection when
resuming a workflow in a new session.

Args:
workflow_id: Workflow ID
max_decisions: Maximum recent decisions to include
max_errors: Maximum recent errors to include

Returns:
Formatted markdown summary for agent injection
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
return None

# Get latest snapshot for context
latest_snapshot = self.get_latest_snapshot(workflow_id)

lines = [
"## Workflow Resume Context",
"",
f"**Resuming:** `{workflow.workflow_id}` ({workflow.workflow_type})",
f"**Status:** {workflow.status}",
f"**Sessions:** {len(workflow.session_ids)} (including this one)",
f"**Started:** {workflow.started_at}",
""
]

# Progress
if workflow.current_step or workflow.completed_steps:
lines.append("### Progress")
lines.append(f"- **Current Step:** {workflow.current_step or 'Not set'}")
lines.append(f"- **Completed:** {len(workflow.completed_steps)} steps")
if workflow.completed_steps:
recent = workflow.completed_steps[-5:] # Last 5
for step in recent:
lines.append(f" - [x] {step}")
lines.append("")

# Key decisions
if workflow.accumulated_decisions:
lines.append("### Key Decisions (for continuity)")
for d in workflow.accumulated_decisions[-max_decisions:]:
lines.append(f"- **{d['decision_type']}**: {d['decision']}")
lines.append("")

# Errors to avoid
if workflow.accumulated_errors:
lines.append("### Previous Issues (avoid repeating)")
for e in workflow.accumulated_errors[-max_errors:]:
lines.append(f"- **{e['error_type']}**: {e['error_message']}")
if e.get('resolution'):
lines.append(f" - Resolved: {e['resolution']}")
lines.append("")

# Snapshot info
if latest_snapshot:
lines.append("### Last Checkpoint")
lines.append(f"- **Type:** {latest_snapshot.snapshot_type}")
lines.append(f"- **Created:** {latest_snapshot.created_at}")
lines.append("")

lines.append("---")
lines.append("*Continue from where we left off. Respect previous decisions.*")

return "\n".join(lines)

def update_workflow_status(
self,
workflow_id: str,
status: str,
current_step: Optional[str] = None
) -> bool:
"""
Update workflow status.

Args:
workflow_id: Workflow to update
status: New status (active, paused, completed, failed)
current_step: Current step to set

Returns:
True if updated, False if not found
"""
conn = self._get_connection()
try:
now = datetime.now().isoformat()
result = conn.execute("""
UPDATE workflow_states
SET status = ?, current_step = COALESCE(?, current_step), updated_at = ?
WHERE workflow_id = ?
""", (status, current_step, now, workflow_id))
conn.commit()
return result.rowcount > 0
finally:
conn.close()

def complete_step(
self,
workflow_id: str,
step_id: str,
next_step: Optional[str] = None
) -> bool:
"""
Mark a step as completed and optionally set next step.

Args:
workflow_id: Workflow ID
step_id: Completed step ID
next_step: Optional next step

Returns:
True if updated
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
return False

if step_id not in workflow.completed_steps:
workflow.completed_steps.append(step_id)

workflow.current_step = next_step
workflow.updated_at = datetime.now().isoformat()

conn = self._get_connection()
try:
conn.execute("""
UPDATE workflow_states
SET completed_steps = ?, current_step = ?, updated_at = ?
WHERE workflow_id = ?
""", (
json.dumps(workflow.completed_steps),
workflow.current_step,
workflow.updated_at,
workflow_id
))
conn.commit()
return True
finally:
conn.close()

# -------------------------------------------------------------------------
# Decision Recording
# -------------------------------------------------------------------------

def record_decision(
self,
workflow_id: str,
decision_type: str,
decision: str,
rationale: str,
affected_nodes: Optional[List[str]] = None,
step_id: Optional[str] = None
) -> bool:
"""
Record a decision made during a workflow.

Args:
workflow_id: Workflow ID
decision_type: Type of decision (e.g., "security", "architecture")
decision: The decision made
rationale: Why this decision was made
affected_nodes: Nodes affected by this decision
step_id: Step where decision was made

Returns:
True if recorded successfully
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
return False

decision_record = WorkflowDecision(
decision_type=decision_type,
decision=decision,
rationale=rationale,
affected_nodes=affected_nodes or [],
timestamp=datetime.now().isoformat(),
step_id=step_id or workflow.current_step
)

workflow.accumulated_decisions.append(asdict(decision_record))
workflow.updated_at = datetime.now().isoformat()

conn = self._get_connection()
try:
conn.execute("""
UPDATE workflow_states
SET accumulated_decisions = ?, updated_at = ?
WHERE workflow_id = ?
""", (
json.dumps(workflow.accumulated_decisions),
workflow.updated_at,
workflow_id
))
conn.commit()
return True
finally:
conn.close()

def record_error(
self,
workflow_id: str,
error_type: str,
error_message: str,
step_id: Optional[str] = None,
resolution: Optional[str] = None
) -> bool:
"""
Record an error encountered during a workflow.

Args:
workflow_id: Workflow ID
error_type: Type of error
error_message: Error message
step_id: Step where error occurred
resolution: How error was resolved (if resolved)

Returns:
True if recorded successfully
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
return False

error_record = {
"error_type": error_type,
"error_message": error_message,
"step_id": step_id or workflow.current_step,
"timestamp": datetime.now().isoformat(),
"resolution": resolution
}

workflow.accumulated_errors.append(error_record)
workflow.updated_at = datetime.now().isoformat()

conn = self._get_connection()
try:
conn.execute("""
UPDATE workflow_states
SET accumulated_errors = ?, updated_at = ?
WHERE workflow_id = ?
""", (
json.dumps(workflow.accumulated_errors),
workflow.updated_at,
workflow_id
))
conn.commit()
return True
finally:
conn.close()

# -------------------------------------------------------------------------
# Context Retrieval
# -------------------------------------------------------------------------

def get_workflow_context(
self,
workflow_id: str,
include_history: bool = True
) -> Optional[Dict[str, Any]]:
"""
Get accumulated context for a workflow.

This is the main method for agent context injection.

Args:
workflow_id: Workflow ID
include_history: Include decision/error history

Returns:
Dictionary with workflow context for agent injection
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
return None

context = {
"workflow_id": workflow.workflow_id,
"workflow_type": workflow.workflow_type,
"status": workflow.status,
"current_step": workflow.current_step,
"completed_steps": workflow.completed_steps,
"progress": f"{len(workflow.completed_steps)} steps completed",
"anchor_nodes": workflow.anchor_nodes,
"started_at": workflow.started_at,
"updated_at": workflow.updated_at,
"session_count": len(workflow.session_ids)
}

if include_history:
context["accumulated_decisions"] = workflow.accumulated_decisions
context["accumulated_errors"] = workflow.accumulated_errors
context["decision_count"] = len(workflow.accumulated_decisions)
context["error_count"] = len(workflow.accumulated_errors)

return context

def format_workflow_context_for_prompt(
self,
workflow_id: str,
max_decisions: int = 10,
max_errors: int = 5
) -> Optional[str]:
"""
Format workflow context as markdown for prompt injection.

Args:
workflow_id: Workflow ID
max_decisions: Max recent decisions to include
max_errors: Max recent errors to include

Returns:
Formatted markdown string
"""
context = self.get_workflow_context(workflow_id, include_history=True)
if not context:
return None

lines = [
f"## Workflow Context: {context['workflow_type']}",
"",
f"**Workflow ID:** `{context['workflow_id']}`",
f"**Status:** {context['status']}",
f"**Current Step:** {context['current_step'] or 'Not set'}",
f"**Progress:** {context['progress']}",
""
]

# Completed steps
if context['completed_steps']:
lines.append("### Completed Steps")
for step in context['completed_steps'][-10:]: # Last 10
lines.append(f"- [x] {step}")
lines.append("")

# Recent decisions
decisions = context.get('accumulated_decisions', [])
if decisions:
lines.append("### Recent Decisions")
for d in decisions[-max_decisions:]:
lines.append(f"- **{d['decision_type']}**: {d['decision']}")
if d.get('rationale'):
lines.append(f" - Rationale: {d['rationale']}")
lines.append("")

# Recent errors
errors = context.get('accumulated_errors', [])
if errors:
lines.append("### Encountered Errors")
for e in errors[-max_errors:]:
lines.append(f"- **{e['error_type']}**: {e['error_message']}")
if e.get('resolution'):
lines.append(f" - Resolution: {e['resolution']}")
lines.append("")

return "\n".join(lines)

# -------------------------------------------------------------------------
# Snapshots
# -------------------------------------------------------------------------

def create_snapshot(
self,
workflow_id: str,
session_id: str,
snapshot_type: str = "checkpoint",
context_graph_json: Optional[str] = None,
active_policies: Optional[List[str]] = None
) -> Optional[int]:
"""
Create a context snapshot for workflow resumption.

Args:
workflow_id: Workflow ID
session_id: Current session ID
snapshot_type: Type: checkpoint, error, decision
context_graph_json: Serialized context graph
active_policies: Active policies at snapshot time

Returns:
Snapshot ID or None if failed
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
return None

conn = self._get_connection()
try:
cursor = conn.execute("""
INSERT INTO workflow_context_snapshots (
workflow_id, session_id, snapshot_type,
context_graph_json, active_policies,
tenant_id, user_id
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
workflow_id,
session_id,
snapshot_type,
context_graph_json,
json.dumps(active_policies or []),
workflow.tenant_id,
workflow.user_id
))
conn.commit()
return cursor.lastrowid
finally:
conn.close()

def get_latest_snapshot(
self,
workflow_id: str,
snapshot_type: Optional[str] = None
) -> Optional[WorkflowContextSnapshot]:
"""
Get the latest snapshot for a workflow.

Args:
workflow_id: Workflow ID
snapshot_type: Optional filter by type

Returns:
WorkflowContextSnapshot or None
"""
conn = self._get_connection()
try:
if snapshot_type:
row = conn.execute("""
SELECT * FROM workflow_context_snapshots
WHERE workflow_id = ? AND snapshot_type = ?
ORDER BY created_at DESC
LIMIT 1
""", (workflow_id, snapshot_type)).fetchone()
else:
row = conn.execute("""
SELECT * FROM workflow_context_snapshots
WHERE workflow_id = ?
ORDER BY created_at DESC
LIMIT 1
""", (workflow_id,)).fetchone()

if not row:
return None

return WorkflowContextSnapshot(
id=row["id"],
workflow_id=row["workflow_id"],
session_id=row["session_id"],
snapshot_type=row["snapshot_type"],
context_graph_json=row["context_graph_json"],
active_policies=json.loads(row["active_policies"] or "[]"),
created_at=row["created_at"]
)
finally:
conn.close()

# -------------------------------------------------------------------------
# Listing and Search
# -------------------------------------------------------------------------

def list_workflows(
self,
status: Optional[str] = None,
workflow_type: Optional[str] = None,
limit: int = 20
) -> List[WorkflowState]:
"""
List workflows with optional filters.

Args:
status: Filter by status
workflow_type: Filter by type
limit: Maximum results

Returns:
List of WorkflowState objects
"""
conn = self._get_connection()
try:
query = "SELECT * FROM workflow_states WHERE 1=1"
params: List[Any] = []

if status:
query += " AND status = ?"
params.append(status)

if workflow_type:
query += " AND workflow_type = ?"
params.append(workflow_type)

query += " ORDER BY updated_at DESC LIMIT ?"
params.append(limit)

rows = conn.execute(query, params).fetchall()

workflows = []
for row in rows:
workflows.append(WorkflowState(
workflow_id=row["workflow_id"],
workflow_type=row["workflow_type"],
status=row["status"],
anchor_nodes=json.loads(row["anchor_nodes"] or "[]"),
accumulated_decisions=json.loads(row["accumulated_decisions"] or "[]"),
accumulated_errors=json.loads(row["accumulated_errors"] or "[]"),
current_step=row["current_step"],
completed_steps=json.loads(row["completed_steps"] or "[]"),
started_at=row["started_at"],
updated_at=row["updated_at"],
session_ids=json.loads(row["session_ids"] or "[]"),
tenant_id=row["tenant_id"],
user_id=row["user_id"],
project_id=row["project_id"]
))

return workflows
finally:
conn.close()

def list_active_workflows(self, limit: int = 20) -> List[WorkflowState]:
"""List active workflows."""
return self.list_workflows(status="active", limit=limit)

=============================================================================

Convenience Functions

=============================================================================

def get_workflow_context(workflow_id: str, db_path: Optional[Path] = None) -> Optional[Dict[str, Any]]: """ Get workflow context for agent injection.

Convenience function for MCP tool.
"""
manager = WorkflowStateManager(db_path)
return manager.get_workflow_context(workflow_id, include_history=True)

def record_workflow_decision( workflow_id: str, decision_type: str, decision: str, rationale: str, affected_nodes: Optional[List[str]] = None, db_path: Optional[Path] = None ) -> bool: """ Record a workflow decision.

Convenience function for MCP tool.
"""
manager = WorkflowStateManager(db_path)
return manager.record_decision(
workflow_id=workflow_id,
decision_type=decision_type,
decision=decision,
rationale=rationale,
affected_nodes=affected_nodes
)

def resume_workflow( workflow_id: str, session_id: str, create_snapshot: bool = True, db_path: Optional[Path] = None ) -> Optional[Dict[str, Any]]: """ Resume a workflow in a new session (J.4.10.5).

Convenience function for MCP tool.

Returns:
Dictionary with workflow state and resume summary, or None if not found
"""
manager = WorkflowStateManager(db_path)
workflow = manager.resume_workflow(
workflow_id=workflow_id,
new_session_id=session_id,
create_snapshot=create_snapshot
)

if not workflow:
return None

return {
"workflow_id": workflow.workflow_id,
"workflow_type": workflow.workflow_type,
"status": workflow.status,
"current_step": workflow.current_step,
"completed_steps": len(workflow.completed_steps),
"decision_count": len(workflow.accumulated_decisions),
"session_count": len(workflow.session_ids),
"resume_summary": manager.get_resume_summary(workflow_id)
}

def pause_workflow( workflow_id: str, create_checkpoint: bool = True, reason: Optional[str] = None, db_path: Optional[Path] = None ) -> bool: """ Pause an active workflow (J.4.10.5).

Convenience function for MCP tool.
"""
manager = WorkflowStateManager(db_path)
return manager.pause_workflow(
workflow_id=workflow_id,
create_checkpoint=create_checkpoint,
reason=reason
)

=============================================================================

CLI Interface

=============================================================================

def main(): """CLI interface for workflow state management.""" import argparse

parser = argparse.ArgumentParser(
description="Workflow State Manager - ADR-154 J.4.10"
)
subparsers = parser.add_subparsers(dest="command", help="Commands")

# Start workflow
start_parser = subparsers.add_parser("start", help="Start a new workflow")
start_parser.add_argument("workflow_type", help="Workflow type")
start_parser.add_argument("--session", required=True, help="Session ID")
start_parser.add_argument("--step", help="Initial step")

# Get workflow
get_parser = subparsers.add_parser("get", help="Get workflow state")
get_parser.add_argument("workflow_id", help="Workflow ID")
get_parser.add_argument("--format", choices=["json", "markdown"], default="json")

# List workflows
list_parser = subparsers.add_parser("list", help="List workflows")
list_parser.add_argument("--status", help="Filter by status")
list_parser.add_argument("--type", help="Filter by type")
list_parser.add_argument("--limit", type=int, default=20)

# Record decision
decision_parser = subparsers.add_parser("decision", help="Record a decision")
decision_parser.add_argument("workflow_id", help="Workflow ID")
decision_parser.add_argument("--type", required=True, help="Decision type")
decision_parser.add_argument("--decision", required=True, help="The decision")
decision_parser.add_argument("--rationale", required=True, help="Rationale")

# Context
context_parser = subparsers.add_parser("context", help="Get workflow context")
context_parser.add_argument("workflow_id", help="Workflow ID")
context_parser.add_argument("--format", choices=["json", "markdown"], default="markdown")

# Resume workflow (J.4.10.5)
resume_parser = subparsers.add_parser("resume", help="Resume a workflow in new session")
resume_parser.add_argument("workflow_id", help="Workflow ID to resume")
resume_parser.add_argument("--session", required=True, help="New session ID")
resume_parser.add_argument("--no-snapshot", action="store_true", help="Skip resumption snapshot")

# Pause workflow (J.4.10.5)
pause_parser = subparsers.add_parser("pause", help="Pause an active workflow")
pause_parser.add_argument("workflow_id", help="Workflow ID to pause")
pause_parser.add_argument("--reason", help="Reason for pausing")
pause_parser.add_argument("--no-checkpoint", action="store_true", help="Skip checkpoint snapshot")

# Resume summary (J.4.10.5)
summary_parser = subparsers.add_parser("resume-summary", help="Get resume summary")
summary_parser.add_argument("workflow_id", help="Workflow ID")

# Init tables
subparsers.add_parser("init", help="Initialize workflow tables")

args = parser.parse_args()

manager = WorkflowStateManager()

if args.command == "init":
print("Workflow tables initialized.")
print(f"Database: {manager.db_path}")

elif args.command == "start":
workflow = manager.start_workflow(
workflow_type=args.workflow_type,
session_id=args.session,
initial_step=args.step
)
print(f"Started workflow: {workflow.workflow_id}")
print(f"Type: {workflow.workflow_type}")
print(f"Status: {workflow.status}")

elif args.command == "get":
workflow = manager.get_workflow(args.workflow_id)
if workflow:
if args.format == "json":
print(json.dumps(workflow.to_dict(), indent=2))
else:
print(manager.format_workflow_context_for_prompt(args.workflow_id))
else:
print(f"Workflow not found: {args.workflow_id}")

elif args.command == "list":
workflows = manager.list_workflows(
status=args.status,
workflow_type=args.type,
limit=args.limit
)
print(f"Found {len(workflows)} workflow(s):\n")
for w in workflows:
print(f" {w.workflow_id} | {w.workflow_type} | {w.status} | {w.current_step or '-'}")

elif args.command == "decision":
success = manager.record_decision(
workflow_id=args.workflow_id,
decision_type=args.type,
decision=args.decision,
rationale=args.rationale
)
if success:
print("Decision recorded.")
else:
print("Failed to record decision (workflow not found).")

elif args.command == "context":
if args.format == "markdown":
output = manager.format_workflow_context_for_prompt(args.workflow_id)
if output:
print(output)
else:
print(f"Workflow not found: {args.workflow_id}")
else:
context = manager.get_workflow_context(args.workflow_id)
if context:
print(json.dumps(context, indent=2))
else:
print(f"Workflow not found: {args.workflow_id}")

elif args.command == "resume":
workflow = manager.resume_workflow(
workflow_id=args.workflow_id,
new_session_id=args.session,
create_snapshot=not args.no_snapshot
)
if workflow:
print(f"Resumed workflow: {workflow.workflow_id}")
print(f"Status: {workflow.status}")
print(f"Sessions: {len(workflow.session_ids)}")
print(f"Current step: {workflow.current_step or 'Not set'}")
print(f"\n--- Resume Summary ---\n")
summary = manager.get_resume_summary(args.workflow_id)
if summary:
print(summary)
else:
print(f"Workflow not found: {args.workflow_id}")

elif args.command == "pause":
success = manager.pause_workflow(
workflow_id=args.workflow_id,
create_checkpoint=not args.no_checkpoint,
reason=args.reason
)
if success:
print(f"Paused workflow: {args.workflow_id}")
if args.reason:
print(f"Reason: {args.reason}")
else:
print(f"Failed to pause workflow: {args.workflow_id}")
print("(Workflow may not exist or is not active)")

elif args.command == "resume-summary":
summary = manager.get_resume_summary(args.workflow_id)
if summary:
print(summary)
else:
print(f"Workflow not found: {args.workflow_id}")

else:
parser.print_help()

if name == "main": main()