Skip to main content

#!/usr/bin/env python3 """ Batch Processing Pipeline - CODITECT Adapter

Implements the staged batch processing pipeline pattern from Agent-Skills-for-Context-Engineering for LLM-powered batch processing.

Usage: python3 scripts/context-engineering/batch_pipeline.py --init PROJECT_DIR python3 scripts/context-engineering/batch_pipeline.py --run PROJECT_DIR --stage acquire python3 scripts/context-engineering/batch_pipeline.py --status PROJECT_DIR

Source: external/Agent-Skills-for-Context-Engineering/skills/project-development/scripts/ """

import sys import json import os import argparse from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Callable from dataclasses import dataclass, asdict from enum import Enum

Add external module to path

EXTERNAL_PATH = Path(file).parent.parent.parent / "external" / "Agent-Skills-for-Context-Engineering" sys.path.insert(0, str(EXTERNAL_PATH / "skills" / "project-development" / "scripts"))

try: from pipeline_template import ( PipelineStage, PipelineRunner, estimate_costs ) EXTERNAL_AVAILABLE = True except ImportError: EXTERNAL_AVAILABLE = False

class Stage(Enum): """Pipeline stages.""" ACQUIRE = "acquire" PREPARE = "prepare" PROCESS = "process" PARSE = "parse" RENDER = "render"

@dataclass class StageConfig: """Configuration for a pipeline stage.""" name: str deterministic: bool cost: str output_file: str description: str

The Canonical Pipeline: acquire → prepare → process → parse → render

STAGE_CONFIGS = { Stage.ACQUIRE: StageConfig( name="acquire", deterministic=True, cost="low", output_file="raw.json", description="Fetch raw data" ), Stage.PREPARE: StageConfig( name="prepare", deterministic=True, cost="low", output_file="prompt.md", description="Transform to prompts" ), Stage.PROCESS: StageConfig( name="process", deterministic=False, # Key insight: only non-deterministic stage cost="high", # Token costs here output_file="response.md", description="Execute LLM calls" ), Stage.PARSE: StageConfig( name="parse", deterministic=True, cost="low", output_file="parsed.json", description="Extract structured data" ), Stage.RENDER: StageConfig( name="render", deterministic=True, cost="low", output_file="output.json", description="Generate final outputs" ) }

class CoditechBatchPipeline: """CODITECT-integrated batch processing pipeline."""

def __init__(self, project_dir: str):
self.project_dir = Path(project_dir)
self.data_dir = self.project_dir / "data"
self.config_file = self.project_dir / "pipeline.json"

# Stage handlers (can be overridden)
self.handlers: Dict[Stage, Callable] = {}

def initialize(self, items: List[str], config: Dict = None) -> Dict:
"""
Initialize a new pipeline project.

Args:
items: List of item IDs to process
config: Optional configuration overrides

Returns:
Initialization status
"""
# Create directories
self.project_dir.mkdir(parents=True, exist_ok=True)
self.data_dir.mkdir(exist_ok=True)

# Create item directories
for item_id in items:
(self.data_dir / item_id).mkdir(exist_ok=True)

# Create config
pipeline_config = {
"created": datetime.now(timezone.utc).isoformat(),
"items": items,
"stages": {s.value: asdict(STAGE_CONFIGS[s]) for s in Stage},
"status": {item_id: {"current_stage": None, "completed": []} for item_id in items},
"config": config or {}
}

with open(self.config_file, 'w') as f:
json.dump(pipeline_config, f, indent=2)

return {
"initialized": True,
"project_dir": str(self.project_dir),
"items": len(items),
"stages": [s.value for s in Stage]
}

def run(self, stage: Stage = None, items: List[str] = None,
dry_run: bool = False) -> Dict:
"""
Run pipeline for specified stage and items.

Args:
stage: Stage to run (None for all pending)
items: Specific items to process (None for all)
dry_run: If True, just report what would be done

Returns:
Execution results
"""
if not self.config_file.exists():
return {"error": "Pipeline not initialized. Run --init first."}

with open(self.config_file, 'r') as f:
config = json.load(f)

target_items = items or config["items"]
target_stages = [stage] if stage else list(Stage)

results = {
"processed": [],
"skipped": [],
"failed": [],
"dry_run": dry_run
}

for item_id in target_items:
for stg in target_stages:
# Check if needs processing
if not self._needs_processing(item_id, stg):
results["skipped"].append({
"item": item_id,
"stage": stg.value,
"reason": "already_complete"
})
continue

# Check prerequisites
if not self._prerequisites_met(item_id, stg, config):
results["skipped"].append({
"item": item_id,
"stage": stg.value,
"reason": "prerequisites_not_met"
})
continue

if dry_run:
results["processed"].append({
"item": item_id,
"stage": stg.value,
"status": "would_process"
})
else:
# Execute stage
try:
stage_result = self._execute_stage(item_id, stg, config)
results["processed"].append({
"item": item_id,
"stage": stg.value,
"status": "success",
"result": stage_result
})

# Update status
self._update_status(item_id, stg, config)

except Exception as e:
results["failed"].append({
"item": item_id,
"stage": stg.value,
"error": str(e)
})

return results

def status(self) -> Dict:
"""Get pipeline status."""
if not self.config_file.exists():
return {"error": "Pipeline not initialized."}

with open(self.config_file, 'r') as f:
config = json.load(f)

items = config["items"]
status_summary = {
"total_items": len(items),
"by_stage": {s.value: {"complete": 0, "pending": 0} for s in Stage},
"overall_progress": 0,
"items": {}
}

for item_id in items:
item_status = config["status"].get(item_id, {})
completed = item_status.get("completed", [])

status_summary["items"][item_id] = {
"completed_stages": completed,
"next_stage": self._get_next_stage(completed),
"progress": len(completed) / len(Stage)
}

for stg in Stage:
if stg.value in completed:
status_summary["by_stage"][stg.value]["complete"] += 1
else:
status_summary["by_stage"][stg.value]["pending"] += 1

# Calculate overall progress
total_stages = len(items) * len(Stage)
completed_stages = sum(
len(s["completed_stages"])
for s in status_summary["items"].values()
)
status_summary["overall_progress"] = completed_stages / total_stages if total_stages > 0 else 0

return status_summary

def estimate_cost(self, items: List[str] = None, tokens_per_item: int = 2000,
price_per_1k_tokens: float = 0.01) -> Dict:
"""
Estimate processing costs.

Args:
items: Items to estimate for (None for all)
tokens_per_item: Average tokens per item (input + output)
price_per_1k_tokens: Price per 1000 tokens

Returns:
Cost estimation
"""
if not self.config_file.exists():
return {"error": "Pipeline not initialized."}

with open(self.config_file, 'r') as f:
config = json.load(f)

target_items = items or config["items"]

# Count items needing LLM processing
items_needing_process = sum(
1 for item_id in target_items
if Stage.PROCESS.value not in config["status"].get(item_id, {}).get("completed", [])
)

# Calculate costs
total_tokens = items_needing_process * tokens_per_item
base_cost = (total_tokens / 1000) * price_per_1k_tokens

# Add 25% buffer for retries
buffer_cost = base_cost * 0.25

return {
"items_to_process": items_needing_process,
"estimated_tokens": total_tokens,
"base_cost": base_cost,
"buffer_cost": buffer_cost,
"total_estimated": base_cost + buffer_cost,
"cost_per_item": (base_cost + buffer_cost) / items_needing_process if items_needing_process > 0 else 0,
"formula": "total_cost = (items × tokens_per_item × price_per_token) + 25% buffer"
}

def register_handler(self, stage: Stage, handler: Callable):
"""Register a custom handler for a stage."""
self.handlers[stage] = handler

def _needs_processing(self, item_id: str, stage: Stage) -> bool:
"""Check if item needs processing for stage."""
output_file = self.data_dir / item_id / STAGE_CONFIGS[stage].output_file
return not output_file.exists()

def _prerequisites_met(self, item_id: str, stage: Stage, config: Dict) -> bool:
"""Check if prerequisites are met for stage."""
# First stage has no prerequisites
if stage == Stage.ACQUIRE:
return True

# Get previous stage
stages_list = list(Stage)
stage_idx = stages_list.index(stage)
prev_stage = stages_list[stage_idx - 1]

# Check if previous stage is complete
completed = config["status"].get(item_id, {}).get("completed", [])
return prev_stage.value in completed

def _execute_stage(self, item_id: str, stage: Stage, config: Dict) -> Dict:
"""Execute a pipeline stage."""
item_dir = self.data_dir / item_id

# Use custom handler if registered
if stage in self.handlers:
return self.handlers[stage](item_id, item_dir, config)

# Default implementations
if stage == Stage.ACQUIRE:
return self._default_acquire(item_id, item_dir, config)
elif stage == Stage.PREPARE:
return self._default_prepare(item_id, item_dir, config)
elif stage == Stage.PROCESS:
return self._default_process(item_id, item_dir, config)
elif stage == Stage.PARSE:
return self._default_parse(item_id, item_dir, config)
elif stage == Stage.RENDER:
return self._default_render(item_id, item_dir, config)

def _default_acquire(self, item_id: str, item_dir: Path, config: Dict) -> Dict:
"""Default acquire stage - creates placeholder raw data."""
raw_data = {
"item_id": item_id,
"acquired_at": datetime.now(timezone.utc).isoformat(),
"data": f"Raw data for {item_id}",
"source": "placeholder"
}

with open(item_dir / "raw.json", 'w') as f:
json.dump(raw_data, f, indent=2)

return {"status": "acquired", "output": "raw.json"}

def _default_prepare(self, item_id: str, item_dir: Path, config: Dict) -> Dict:
"""Default prepare stage - creates prompt from raw data."""
with open(item_dir / "raw.json", 'r') as f:
raw_data = json.load(f)

prompt = f"""# Processing Request for {item_id}

Input Data

{json.dumps(raw_data.get('data', ''), indent=2)}

Instructions

Process this data and provide structured output in the following format:

  • Summary: Brief summary
  • Key Points: List of key points
  • Metadata: Any relevant metadata

Respond in valid JSON format. """

    with open(item_dir / "prompt.md", 'w') as f:
f.write(prompt)

return {"status": "prepared", "output": "prompt.md"}

def _default_process(self, item_id: str, item_dir: Path, config: Dict) -> Dict:
"""Default process stage - placeholder for LLM processing."""
# This is where LLM calls would happen
# In production, this would call the actual LLM API

response = f"""{{

"summary": "Processed data for {item_id}", "key_points": [ "Point 1", "Point 2", "Point 3" ], "metadata": {{ "processed_at": "{datetime.now(timezone.utc).isoformat()}", "model": "placeholder" }} }}"""

    with open(item_dir / "response.md", 'w') as f:
f.write(response)

return {"status": "processed", "output": "response.md"}

def _default_parse(self, item_id: str, item_dir: Path, config: Dict) -> Dict:
"""Default parse stage - extracts structured data from response."""
with open(item_dir / "response.md", 'r') as f:
response = f.read()

try:
parsed = json.loads(response)
except json.JSONDecodeError:
# Fallback parsing
parsed = {
"raw_response": response,
"parse_error": True
}

with open(item_dir / "parsed.json", 'w') as f:
json.dump(parsed, f, indent=2)

return {"status": "parsed", "output": "parsed.json", "parse_error": parsed.get("parse_error", False)}

def _default_render(self, item_id: str, item_dir: Path, config: Dict) -> Dict:
"""Default render stage - generates final output."""
with open(item_dir / "parsed.json", 'r') as f:
parsed = json.load(f)

output = {
"item_id": item_id,
"result": parsed,
"rendered_at": datetime.now(timezone.utc).isoformat(),
"pipeline_version": "1.0.0"
}

with open(item_dir / "output.json", 'w') as f:
json.dump(output, f, indent=2)

return {"status": "rendered", "output": "output.json"}

def _update_status(self, item_id: str, stage: Stage, config: Dict):
"""Update pipeline status after stage completion."""
if item_id not in config["status"]:
config["status"][item_id] = {"completed": []}

if stage.value not in config["status"][item_id]["completed"]:
config["status"][item_id]["completed"].append(stage.value)
config["status"][item_id]["current_stage"] = stage.value

with open(self.config_file, 'w') as f:
json.dump(config, f, indent=2)

def _get_next_stage(self, completed: List[str]) -> Optional[str]:
"""Get next stage to process."""
for stage in Stage:
if stage.value not in completed:
return stage.value
return None

def main(): parser = argparse.ArgumentParser( description="CODITECT Batch Processing Pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 batch_pipeline.py --init ./my_project --items item1,item2,item3 python3 batch_pipeline.py --run ./my_project --stage acquire python3 batch_pipeline.py --status ./my_project python3 batch_pipeline.py --estimate ./my_project --tokens 2000 --price 0.01 """ )

parser.add_argument("project_dir", nargs="?", help="Project directory")
parser.add_argument("--init", action="store_true", help="Initialize new pipeline")
parser.add_argument("--items", help="Comma-separated list of item IDs")
parser.add_argument("--run", action="store_true", help="Run pipeline")
parser.add_argument("--stage", choices=[s.value for s in Stage], help="Specific stage to run")
parser.add_argument("--dry-run", action="store_true", help="Show what would be processed")
parser.add_argument("--status", action="store_true", help="Show pipeline status")
parser.add_argument("--estimate", action="store_true", help="Estimate costs")
parser.add_argument("--tokens", type=int, default=2000, help="Tokens per item for estimate")
parser.add_argument("--price", type=float, default=0.01, help="Price per 1K tokens")
parser.add_argument("--json", "-j", action="store_true", help="Output JSON format")

args = parser.parse_args()

if not args.project_dir:
parser.print_help()
sys.exit(1)

pipeline = CoditechBatchPipeline(args.project_dir)

if args.init:
items = args.items.split(",") if args.items else ["item1", "item2", "item3"]
result = pipeline.initialize(items)

elif args.run:
stage = Stage(args.stage) if args.stage else None
items = args.items.split(",") if args.items else None
result = pipeline.run(stage=stage, items=items, dry_run=args.dry_run)

elif args.status:
result = pipeline.status()

elif args.estimate:
result = pipeline.estimate_cost(tokens_per_item=args.tokens, price_per_1k_tokens=args.price)

else:
result = pipeline.status()

if args.json:
print(json.dumps(result, indent=2))
else:
print(f"\n{'='*60}")
print("CODITECT Batch Pipeline")
print(f"{'='*60}")

if "error" in result:
print(f"\nError: {result['error']}")
elif "initialized" in result:
print(f"\nPipeline Initialized:")
print(f" Project: {result['project_dir']}")
print(f" Items: {result['items']}")
print(f" Stages: {', '.join(result['stages'])}")
elif "overall_progress" in result:
print(f"\nOverall Progress: {result['overall_progress']*100:.1f}%")
print(f"\nBy Stage:")
for stage, counts in result["by_stage"].items():
print(f" {stage}: {counts['complete']} complete, {counts['pending']} pending")
elif "processed" in result:
print(f"\nProcessed: {len(result['processed'])}")
print(f"Skipped: {len(result['skipped'])}")
print(f"Failed: {len(result['failed'])}")
elif "total_estimated" in result:
print(f"\nCost Estimate:")
print(f" Items to process: {result['items_to_process']}")
print(f" Estimated tokens: {result['estimated_tokens']}")
print(f" Total cost: ${result['total_estimated']:.2f}")

print(f"\n{'='*60}\n")

if name == "main": main()