Skip to main content

scripts-research-pipeline-orchestrator

#!/usr/bin/env python3 """ CODITECT Research Pipeline Orchestrator (ADR-206)

Generates execution plans for autonomous research pipeline runs. Used by /research-pipeline command and /execute-workflow.

Author: Hal Casteel, CEO/CTO AZ1.AI Inc. Copyright 2026 AZ1.AI Inc. """

import argparse import json import re import sys import uuid from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any

def slugify(text: str) -> str: """ Convert text to URL-safe slug.

Args:
text: Input text to slugify

Returns:
Slugified string (lowercase, hyphens only)
"""
return re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-')

def collect_inputs() -> Dict[str, Any]: """ Interactive intake to collect user inputs.

Returns:
Dictionary with topic, urls, repos, docs_path, output_dir, extended, genesis
"""
print("=== CODITECT Research Pipeline - Interactive Intake ===\n")

# Required: Topic
topic = input("Research Topic (required): ").strip()
while not topic:
print("ERROR: Topic is required.")
topic = input("Research Topic (required): ").strip()

# Optional: URLs
urls_input = input("URLs to crawl (comma-separated, optional): ").strip()
urls = [u.strip() for u in urls_input.split(',') if u.strip()] if urls_input else []

# Optional: GitHub repos
repos_input = input("GitHub repository URLs (comma-separated, optional): ").strip()
repos = [r.strip() for r in repos_input.split(',') if r.strip()] if repos_input else []

# Optional: Original docs
docs_path_input = input("Path to original documents directory (optional): ").strip()
docs_path = Path(docs_path_input) if docs_path_input else None

# Output directory
default_output = f"analyze-new-artifacts/{slugify(topic)}"
output_input = input(f"Output directory (default: {default_output}): ").strip()
output_dir = Path(output_input) if output_input else Path(default_output)

# Extended mode
extended_input = input("Extended mode (6 dashboards) [y/N]: ").strip().lower()
extended = extended_input in ['y', 'yes']

# Genesis mode
genesis_input = input("Create project after completion (Genesis mode) [y/N]: ").strip().lower()
genesis = genesis_input in ['y', 'yes']

return {
'topic': topic,
'urls': urls,
'repos': repos,
'docs_path': docs_path,
'output_dir': output_dir,
'extended': extended,
'genesis': genesis
}

def setup_output_directory(output_dir: Path) -> None: """ Create output directory structure.

Args:
output_dir: Base output directory path
"""
directories = [
output_dir,
output_dir / 'artifacts' / 'adrs',
output_dir / 'dashboards',
output_dir / 'ideation',
output_dir / 'quality-reports'
]

for directory in directories:
directory.mkdir(parents=True, exist_ok=True)

print(f"\nCreated output directory structure at: {output_dir.resolve()}")

def load_template(template_name: str, config_dir: Path) -> Optional[str]: """ Load a template file from config/templates/research-pipeline/.

Args:
template_name: Name of template file (e.g., 'sdd.md')
config_dir: Path to config directory

Returns:
Template content or None if not found
"""
template_path = config_dir / 'templates' / 'research-pipeline' / template_name
if template_path.exists():
return template_path.read_text(encoding='utf-8')
return None

def substitute_template(template: str, variables: Dict[str, str]) -> str: """ Substitute template variables.

Args:
template: Template string with {{VAR}} placeholders
variables: Dictionary of variable name -> value

Returns:
Template with variables substituted
"""
result = template
for key, value in variables.items():
result = result.replace(f"{{{{{key}}}}}", value)
return result

def get_phase_dependencies() -> Dict[str, Dict[str, Any]]: """ Define pipeline phases and their dependencies.

Returns:
Dictionary mapping phase name to configuration:
- dependencies: List of phase names that must complete first
- agents: List of agent tasks in this phase
"""
return {
'phase1_intake': {
'dependencies': [],
'agents': [
{
'name': 'web-crawler',
'description': 'Crawl URLs and extract research context',
'output': 'research-context.json'
},
{
'name': 'repository-analyzer',
'description': 'Analyze GitHub repositories',
'output': 'repository-analysis.json',
'condition': 'repos'
},
{
'name': 'document-extractor',
'description': 'Extract content from original documents',
'output': 'documents-extracted.json',
'condition': 'docs_path'
}
]
},
'phase2_core_artifacts': {
'dependencies': ['phase1_intake'],
'agents': [
{
'name': 'quick-start-writer',
'description': 'Generate 1-2-3 Detailed Quick Start',
'output': 'artifacts/1-2-3-detailed-quick-start.md',
'template': '1-2-3-detailed-quick-start.md'
},
{
'name': 'impact-analyzer',
'description': 'Generate CODITECT Impact Analysis',
'output': 'artifacts/coditect-impact.md',
'template': 'coditect-impact.md'
},
{
'name': 'executive-summary-writer',
'description': 'Generate Executive Summary',
'output': 'artifacts/executive-summary.md',
'template': 'executive-summary.md'
},
{
'name': 'sdd-writer',
'description': 'Generate System Design Document',
'output': 'artifacts/sdd.md',
'template': 'sdd.md'
},
{
'name': 'tdd-writer',
'description': 'Generate Technical Deep Dive',
'output': 'artifacts/tdd.md',
'template': 'tdd.md'
}
]
},
'phase3_architecture': {
'dependencies': ['phase2_core_artifacts'],
'agents': [
{
'name': 'c4-architect',
'description': 'Generate C4 Architecture Diagrams',
'output': 'artifacts/c4-architecture.md',
'template': 'c4-architecture.md'
},
{
'name': 'glossary-builder',
'description': 'Build Glossary',
'output': 'artifacts/glossary.md',
'template': 'glossary.md'
},
{
'name': 'diagram-generator',
'description': 'Generate Mermaid Diagrams',
'output': 'artifacts/mermaid-diagrams.md',
'template': 'mermaid-diagrams.md'
},
{
'name': 'adr-writer',
'description': 'Generate Architecture Decision Records',
'output': 'artifacts/adrs/',
'template': 'adr-template.md'
}
]
},
'phase4_dashboards': {
'dependencies': ['phase3_architecture'],
'agents': [
{
'name': 'tech-dashboard-builder',
'description': 'Build Tech Architecture Analyzer Dashboard',
'output': 'dashboards/tech-architecture-analyzer.jsx',
'template': 'tech-architecture-analyzer.jsx'
},
{
'name': 'strategic-dashboard-builder',
'description': 'Build Strategic Fit Dashboard',
'output': 'dashboards/strategic-fit-dashboard.jsx',
'template': 'strategic-fit-dashboard.jsx'
},
{
'name': 'integration-dashboard-builder',
'description': 'Build CODITECT Integration Playbook',
'output': 'dashboards/coditect-integration-playbook.jsx',
'template': 'coditect-integration-playbook.jsx'
},
{
'name': 'decision-dashboard-builder',
'description': 'Build Executive Decision Brief',
'output': 'dashboards/executive-decision-brief.jsx',
'template': 'executive-decision-brief.jsx'
}
]
},
'phase4_extended_dashboards': {
'dependencies': ['phase4_dashboards'],
'condition': 'extended',
'agents': [
{
'name': 'competitive-dashboard-builder',
'description': 'Build Competitive Comparison Dashboard',
'output': 'dashboards/competitive-comparison.jsx',
'template': 'competitive-comparison.jsx'
},
{
'name': 'implementation-dashboard-builder',
'description': 'Build Implementation Planner Dashboard',
'output': 'dashboards/implementation-planner.jsx',
'template': 'implementation-planner.jsx'
}
]
},
'phase5_ideation': {
'dependencies': ['phase4_dashboards'],
'agents': [
{
'name': 'deep-dive-prompter',
'description': 'Generate Deep Dive Prompts',
'output': 'ideation/deep-dive-prompts.md',
'template': 'deep-dive-prompts.md'
}
]
},
'phase6_quality': {
'dependencies': ['phase5_ideation'],
'agents': [
{
'name': 'quality-gate-1',
'description': 'QG1: Phase 1 Intake Quality',
'output': 'quality-reports/qg1-phase1-report.yaml'
},
{
'name': 'quality-gate-2',
'description': 'QG2: Phase 2 Artifacts Quality',
'output': 'quality-reports/qg2-phase2-report.yaml'
},
{
'name': 'quality-gate-3',
'description': 'QG3: Phase 3 Architecture Quality',
'output': 'quality-reports/qg3-phase3-report.yaml'
}
]
}
}

def build_execution_plan( inputs: Dict[str, Any], config_dir: Path, skip_phases: Optional[List[str]] = None ) -> Dict[str, Any]: """ Build execution plan for pipeline run.

Args:
inputs: User inputs from collect_inputs()
config_dir: Path to config directory
skip_phases: List of phase names to skip

Returns:
Execution plan dictionary
"""
skip_phases = skip_phases or []
phases = get_phase_dependencies()

# Template variables
variables = {
'TOPIC': inputs['topic'],
'TECHNOLOGY': inputs['topic'], # Alias
'DATE': datetime.utcnow().strftime('%Y-%m-%d'),
'RESEARCH_CONTEXT': '' # Populated after phase1
}

# Filter phases based on conditions
execution_plan = {
'run_id': str(uuid.uuid4()),
'topic': inputs['topic'],
'urls': inputs['urls'],
'repos': inputs['repos'],
'docs_path': str(inputs['docs_path']) if inputs['docs_path'] else None,
'output_dir': str(inputs['output_dir']),
'extended': inputs['extended'],
'genesis': inputs['genesis'],
'start_time': datetime.utcnow().isoformat(),
'phases': []
}

for phase_name, phase_config in phases.items():
# Skip if in skip list
if phase_name in skip_phases:
continue

# Check conditions
if 'condition' in phase_config:
condition = phase_config['condition']
if condition == 'extended' and not inputs['extended']:
continue

# Filter agents based on conditions
agents = []
for agent in phase_config['agents']:
if 'condition' in agent:
condition = agent['condition']
if condition == 'repos' and not inputs['repos']:
continue
if condition == 'docs_path' and not inputs['docs_path']:
continue

# Load template if specified
template_content = None
if 'template' in agent:
template_content = load_template(agent['template'], config_dir)

agents.append({
'name': agent['name'],
'description': agent['description'],
'output': agent['output'],
'template': template_content,
'variables': variables.copy(),
'status': 'pending'
})

if agents:
execution_plan['phases'].append({
'name': phase_name,
'dependencies': phase_config['dependencies'],
'agents': agents
})

return execution_plan

def save_manifest(execution_plan: Dict[str, Any], output_dir: Path) -> None: """ Save pipeline manifest to output directory.

Args:
execution_plan: Execution plan dictionary
output_dir: Output directory path
"""
manifest_path = output_dir / 'pipeline-manifest.json'
with manifest_path.open('w', encoding='utf-8') as f:
json.dump(execution_plan, f, indent=2)

print(f"Pipeline manifest saved: {manifest_path.resolve()}")

def print_execution_plan(execution_plan: Dict[str, Any]) -> None: """ Pretty-print execution plan summary.

Args:
execution_plan: Execution plan dictionary
"""
print("\n=== Execution Plan Summary ===")
print(f"Run ID: {execution_plan['run_id']}")
print(f"Topic: {execution_plan['topic']}")
print(f"Output: {execution_plan['output_dir']}")
print(f"Extended: {execution_plan['extended']}")
print(f"Genesis: {execution_plan['genesis']}")
print(f"\nPhases: {len(execution_plan['phases'])}")

total_agents = 0
for phase in execution_plan['phases']:
agent_count = len(phase['agents'])
total_agents += agent_count
print(f" - {phase['name']}: {agent_count} agents")

print(f"\nTotal Agents: {total_agents}")

def main() -> int: """ Main entry point for CLI.

Returns:
Exit code (0 for success, 1 for error)
"""
parser = argparse.ArgumentParser(
description='CODITECT Research Pipeline Orchestrator (ADR-206)',
formatter_class=argparse.RawDescriptionHelpFormatter
)

parser.add_argument(
'--topic',
type=str,
help='Research topic (overrides interactive input)'
)

parser.add_argument(
'--urls',
type=str,
help='Comma-separated URLs to crawl'
)

parser.add_argument(
'--repos',
type=str,
help='Comma-separated GitHub repository URLs'
)

parser.add_argument(
'--docs',
type=str,
help='Path to original documents directory'
)

parser.add_argument(
'--output',
type=str,
help='Output directory (default: analyze-new-artifacts/{topic})'
)

parser.add_argument(
'--extended',
action='store_true',
help='Enable extended mode (6 dashboards)'
)

parser.add_argument(
'--genesis',
action='store_true',
help='Create project after completion (Genesis mode)'
)

parser.add_argument(
'--dry-run',
action='store_true',
help='Preview execution plan without running'
)

parser.add_argument(
'--skip-phase',
action='append',
dest='skip_phases',
help='Skip specific phase (can be repeated)'
)

parser.add_argument(
'--config-dir',
type=str,
default='config',
help='Path to config directory (default: config)'
)

args = parser.parse_args()

# Collect inputs (interactive or from args)
if args.topic:
inputs = {
'topic': args.topic,
'urls': [u.strip() for u in args.urls.split(',') if u.strip()] if args.urls else [],
'repos': [r.strip() for r in args.repos.split(',') if r.strip()] if args.repos else [],
'docs_path': Path(args.docs) if args.docs else None,
'output_dir': Path(args.output) if args.output else Path(f"analyze-new-artifacts/{slugify(args.topic)}"),
'extended': args.extended,
'genesis': args.genesis
}
else:
inputs = collect_inputs()

# Setup output directory
setup_output_directory(inputs['output_dir'])

# Build execution plan
config_dir = Path(args.config_dir)
execution_plan = build_execution_plan(
inputs,
config_dir,
skip_phases=args.skip_phases
)

# Save manifest
save_manifest(execution_plan, inputs['output_dir'])

# Print summary
print_execution_plan(execution_plan)

if args.dry_run:
print("\n[DRY RUN] Execution plan generated. No agents dispatched.")
return 0

print("\n[READY] Execute with: /execute-workflow research-pipeline --manifest {}/pipeline-manifest.json".format(
inputs['output_dir']
))

return 0

if name == 'main': sys.exit(main())