#!/usr/bin/env python3 """ AI Curriculum Agent Dispatcher
Intelligent agent selection and invocation system for curriculum development. Analyzes workflows and automatically determines optimal agents, skills, and commands.
UPDATED: 2025-11-29 - Integrated with ComponentActivator for dynamic agent discovery """
import json import yaml import logging import sys from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from enum import Enum from pathlib import Path
Import ComponentActivator
try: from component_activator import ComponentActivator except ImportError: # Fallback if not in same directory sys.path.insert(0, str(Path(file).parent)) from component_activator import ComponentActivator
Import AgentInvocation for context-injected agent calls (J.4.9.5)
try: from agent_invocation import invoke_agent, AgentInvocation CONTEXT_INJECTION_AVAILABLE = True except ImportError: CONTEXT_INJECTION_AVAILABLE = False
Configure logging
log_file = Path(file).parent / "agent_dispatcher.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(name)
Custom Exception Classes
class DispatcherError(Exception): """Base exception for agent dispatcher errors""" pass
class WorkflowAnalysisError(DispatcherError): """Raised when workflow analysis fails""" pass
class AgentRecommendationError(DispatcherError): """Raised when agent recommendation fails""" pass
class ScriptGenerationError(DispatcherError): """Raised when script generation fails""" pass
class ValidationError(DispatcherError): """Raised when input validation fails""" pass
class TaskType(Enum): CONTENT_GENERATION = "content_generation" ASSESSMENT_CREATION = "assessment_creation" CURRICULUM_PLANNING = "curriculum_planning" QUALITY_ASSURANCE = "quality_assurance" PROJECT_MANAGEMENT = "project_management" RESEARCH_ANALYSIS = "research_analysis" NOTEBOOKLM_OPTIMIZATION = "notebooklm_optimization"
class ComplexityLevel(Enum): SIMPLE = 1 MODERATE = 2 COMPLEX = 3 ENTERPRISE = 4
@dataclass class TaskRequirement: task_type: TaskType complexity: ComplexityLevel skill_levels: List[str] modules: List[str] deliverables: List[str] timeline: str dependencies: List[str]
@dataclass class AgentRecommendation: primary_agent: str supporting_agents: List[str] required_skills: List[str] recommended_commands: List[str] execution_order: List[str] estimated_tokens: int estimated_duration: str
class CurriculumAgentDispatcher: """ Intelligent agent dispatcher for AI curriculum development
UPDATED: Now uses ComponentActivator for dynamic agent discovery.
Supports all 60 agents instead of just 4 hardcoded ones.
"""
def __init__(self, framework_root: Optional[Path] = None):
"""
Initialize dispatcher with ComponentActivator
Args:
framework_root: Path to framework root (auto-detected if None)
"""
# Initialize ComponentActivator
if framework_root is None:
# Auto-detect framework root (2 levels up from scripts/core/)
framework_root = Path(__file__).parent.parent.parent
self.activator = ComponentActivator(framework_root)
self.activator.load_all_registries()
logger.info(f"Loaded {len(self.activator.agents)} agents from registries")
logger.info(f"Loaded {len(self.activator.skills)} skills from registries")
logger.info(f"Loaded {len(self.activator.commands)} commands from registries")
# Build agent_capabilities dynamically from loaded agents
self.agent_capabilities = self._build_agent_capabilities()
# Legacy hardcoded capabilities (kept for backward compatibility with specific task types)
# These are merged with dynamically loaded agents
self._legacy_capabilities = {
"ai-curriculum-specialist": {
"task_types": [TaskType.CURRICULUM_PLANNING, TaskType.CONTENT_GENERATION],
"complexity_range": [1, 4],
"specialties": ["multi_level_content", "pedagogical_frameworks", "learning_analytics"],
"token_efficiency": "high",
"coordination_ability": "excellent"
},
"educational-content-generator": {
"task_types": [TaskType.CONTENT_GENERATION],
"complexity_range": [1, 3],
"specialties": ["bloom_taxonomy", "progressive_difficulty", "story_driven_content"],
"token_efficiency": "medium",
"coordination_ability": "good"
},
"assessment-creation-agent": {
"task_types": [TaskType.ASSESSMENT_CREATION, TaskType.QUALITY_ASSURANCE],
"complexity_range": [1, 4],
"specialties": ["adaptive_assessment", "bias_detection", "rubric_design"],
"token_efficiency": "high",
"coordination_ability": "good"
},
"orchestrator": {
"task_types": [TaskType.PROJECT_MANAGEMENT],
"complexity_range": [2, 4],
"specialties": ["multi_agent_coordination", "workflow_management", "progress_tracking"],
"token_efficiency": "excellent",
"coordination_ability": "excellent"
}
}
# Merge legacy capabilities (for backward compatibility)
for agent_name, capabilities in self._legacy_capabilities.items():
if agent_name in self.agent_capabilities:
self.agent_capabilities[agent_name].update(capabilities)
# Initialize skill_capabilities
self.skill_capabilities = {
"ai-curriculum-development": {
"use_cases": ["multi_level_content", "bloom_taxonomy", "assessment_integration"],
"automation_level": "high",
"educational_focus": True
},
"notebooklm-content-optimization": {
"use_cases": ["ai_generation_ready", "metadata_enhancement", "cross_references"],
"automation_level": "high",
"educational_focus": True
},
"multi-agent-workflow": {
"use_cases": ["complex_coordination", "token_management", "checkpoint_system"],
"automation_level": "excellent",
"educational_focus": False
}
}
self.command_capabilities = {
"generate-curriculum-content": {
"use_cases": ["structured_content_creation", "multi_level_generation"],
"complexity_level": [1, 3],
"educational_focus": True
},
"create-plan": {
"use_cases": ["project_planning", "task_decomposition"],
"complexity_level": [2, 4],
"educational_focus": False
},
"research": {
"use_cases": ["requirement_analysis", "feasibility_study"],
"complexity_level": [1, 3],
"educational_focus": False
}
}
def _build_agent_capabilities(self) -> Dict:
"""Build agent capabilities from ComponentActivator"""
capabilities = {}
for agent_name, agent_config in self.activator.agents.items():
capabilities[agent_name] = {
"description": agent_config.description,
"category": agent_config.category or "general",
"tags": agent_config.tags,
"tools": agent_config.tools,
"use_cases": agent_config.use_cases or [],
"status": agent_config.status,
"version": agent_config.version,
# Default values for curriculum-specific fields
"task_types": [],
"complexity_range": [1, 4],
"specialties": agent_config.tags,
"token_efficiency": "medium",
"coordination_ability": "good"
}
return capabilities
def analyze_workflow(self, description: str, requirements: Dict) -> TaskRequirement:
"""Analyze workflow description to determine requirements"""
logger.info("Starting workflow analysis")
try:
# Input validation
if not description or not isinstance(description, str):
raise ValidationError("Description must be a non-empty string")
if not isinstance(requirements, dict):
raise ValidationError("Requirements must be a dictionary")
logger.info(f"Analyzing workflow: {description[:100]}...")
# Simple keyword-based analysis (could be enhanced with NLP)
task_type_keywords = {
TaskType.CONTENT_GENERATION: ["content", "material", "learning", "module", "week"],
TaskType.ASSESSMENT_CREATION: ["quiz", "test", "assessment", "evaluation", "grade"],
TaskType.CURRICULUM_PLANNING: ["curriculum", "syllabus", "plan", "structure", "framework"],
TaskType.QUALITY_ASSURANCE: ["quality", "review", "validate", "check", "audit"],
TaskType.PROJECT_MANAGEMENT: ["project", "manage", "coordinate", "track", "organize"],
TaskType.RESEARCH_ANALYSIS: ["research", "analyze", "investigate", "study", "explore"],
TaskType.NOTEBOOKLM_OPTIMIZATION: ["notebooklm", "optimize", "format", "metadata", "ai-ready"]
}
# Determine primary task type
description_lower = description.lower()
task_scores = {}
for task_type, keywords in task_type_keywords.items():
score = sum(1 for keyword in keywords if keyword in description_lower)
if score > 0:
task_scores[task_type] = score
primary_task = max(task_scores.keys(), key=lambda x: task_scores[x]) if task_scores else TaskType.CONTENT_GENERATION
# Determine complexity based on scope
complexity_indicators = {
ComplexityLevel.SIMPLE: ["single", "simple", "basic", "one"],
ComplexityLevel.MODERATE: ["multiple", "several", "moderate", "standard"],
ComplexityLevel.COMPLEX: ["complex", "advanced", "comprehensive", "full"],
ComplexityLevel.ENTERPRISE: ["enterprise", "large-scale", "complete", "all modules"]
}
complexity_scores = {}
for level, indicators in complexity_indicators.items():
score = sum(1 for indicator in indicators if indicator in description_lower)
if score > 0:
complexity_scores[level] = score
complexity = max(complexity_scores.keys(), key=lambda x: complexity_scores[x]) if complexity_scores else ComplexityLevel.MODERATE
# Extract other requirements
skill_levels = requirements.get("skill_levels", ["beginner", "intermediate", "advanced", "expert"])
modules = requirements.get("modules", ["foundations"])
deliverables = requirements.get("deliverables", ["content", "assessments"])
timeline = requirements.get("timeline", "1-2 weeks")
dependencies = requirements.get("dependencies", [])
task_requirement = TaskRequirement(
task_type=primary_task,
complexity=complexity,
skill_levels=skill_levels,
modules=modules,
deliverables=deliverables,
timeline=timeline,
dependencies=dependencies
)
logger.info(f"Workflow analysis completed successfully: {primary_task.value}, complexity {complexity.value}")
return task_requirement
except ValidationError as e:
logger.error(f"Validation error in workflow analysis: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error in workflow analysis: {e}")
raise WorkflowAnalysisError(f"Failed to analyze workflow: {e}") from e
def recommend_agents(self, task_req: TaskRequirement) -> AgentRecommendation:
"""Recommend optimal agents, skills, and commands for task"""
logger.info("Starting agent recommendation")
try:
# Input validation
if not isinstance(task_req, TaskRequirement):
raise ValidationError("task_req must be a TaskRequirement instance")
logger.info(f"Recommending agents for task type: {task_req.task_type.value}")
# Find primary agent
primary_candidates = []
for agent, capabilities in self.agent_capabilities.items():
if (task_req.task_type in capabilities["task_types"] and
capabilities["complexity_range"][0] <= task_req.complexity.value <= capabilities["complexity_range"][1]):
primary_candidates.append((agent, capabilities))
# Select best primary agent (prioritize coordination ability for complex tasks)
if task_req.complexity.value >= 3:
primary_agent = max(primary_candidates, key=lambda x: x[1]["coordination_ability"] == "excellent")[0]
else:
primary_agent = primary_candidates[0][0] if primary_candidates else "ai-curriculum-specialist"
# Determine supporting agents
supporting_agents = []
if task_req.complexity.value >= 2:
if task_req.task_type == TaskType.CURRICULUM_PLANNING:
supporting_agents.extend(["educational-content-generator", "assessment-creation-agent"])
elif task_req.task_type == TaskType.CONTENT_GENERATION:
supporting_agents.append("assessment-creation-agent")
elif task_req.task_type == TaskType.PROJECT_MANAGEMENT:
supporting_agents.extend(["ai-curriculum-specialist", "educational-content-generator"])
# Recommend skills
required_skills = []
for skill, capabilities in self.skill_capabilities.items():
if (any(use_case in str(task_req).lower() for use_case in capabilities["use_cases"]) and
capabilities["educational_focus"]):
required_skills.append(skill)
if not required_skills:
required_skills = ["ai-curriculum-development"] # Default educational skill
# Recommend commands
recommended_commands = []
for command, capabilities in self.command_capabilities.items():
if (any(use_case in str(task_req).lower() for use_case in capabilities["use_cases"]) and
capabilities["complexity_level"][0] <= task_req.complexity.value <= capabilities["complexity_level"][1]):
recommended_commands.append(command)
# Determine execution order
execution_order = self._plan_execution_order(task_req, primary_agent, supporting_agents, recommended_commands)
# Estimate resources
estimated_tokens = self._estimate_tokens(task_req, len(supporting_agents) + 1)
estimated_duration = self._estimate_duration(task_req, estimated_tokens)
recommendation = AgentRecommendation(
primary_agent=primary_agent,
supporting_agents=supporting_agents,
required_skills=required_skills,
recommended_commands=recommended_commands,
execution_order=execution_order,
estimated_tokens=estimated_tokens,
estimated_duration=estimated_duration
)
logger.info(f"Agent recommendation completed: primary={primary_agent}, supporting={len(supporting_agents)}")
return recommendation
except ValidationError as e:
logger.error(f"Validation error in agent recommendation: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error in agent recommendation: {e}")
raise AgentRecommendationError(f"Failed to recommend agents: {e}") from e
def invoke_with_context(
self,
agent_type: str,
task: str,
session_id: Optional[str] = None,
workflow_id: Optional[str] = None,
context_budget: int = 4000,
) -> Optional["AgentInvocation"]:
"""
Invoke an agent with automatic context injection (J.4.9.5).
This method integrates with AgentContextInjector to automatically:
1. Classify task intent
2. Inject relevant context (decisions, patterns, errors)
3. Build a prompt with persona and context
4. Return a Task() call ready for Claude Code
Args:
agent_type: Type of agent (e.g., "security-specialist")
task: Task description
session_id: Session ID (auto-generated if None)
workflow_id: Workflow ID for multi-turn context
context_budget: Token budget for context injection
Returns:
AgentInvocation object with task_call, or None if injection unavailable
Example:
>>> dispatcher = CurriculumAgentDispatcher()
>>> invocation = dispatcher.invoke_with_context(
... "security-specialist",
... "Review the auth code for vulnerabilities"
... )
>>> print(invocation.task_call)
"""
if not CONTEXT_INJECTION_AVAILABLE:
logger.warning("Context injection not available - use invoke_agent directly")
return None
try:
invocation = invoke_agent(
agent_type=agent_type,
task=task,
session_id=session_id,
workflow_id=workflow_id,
context_budget=context_budget,
)
logger.info(f"Context-injected invocation created: agent={agent_type}, tokens={invocation.context_tokens}")
return invocation
except Exception as e:
logger.error(f"Context-injected invocation failed: {e}")
return None
def recommend_and_invoke(
self,
description: str,
requirements: Dict,
session_id: Optional[str] = None,
) -> Tuple[AgentRecommendation, Optional["AgentInvocation"]]:
"""
Analyze task, recommend agent, and create context-injected invocation (J.4.9.5).
Combines workflow analysis, agent recommendation, and context injection
into a single convenient method.
Args:
description: Task/workflow description
requirements: Requirements dictionary
session_id: Session ID
Returns:
Tuple of (AgentRecommendation, AgentInvocation or None)
"""
# Analyze and recommend
task_req = self.analyze_workflow(description, requirements)
recommendation = self.recommend_agents(task_req)
# Invoke with context
invocation = self.invoke_with_context(
agent_type=recommendation.primary_agent,
task=description,
session_id=session_id,
)
return recommendation, invocation
def _plan_execution_order(self, task_req: TaskRequirement, primary_agent: str,
supporting_agents: List[str], commands: List[str]) -> List[str]:
"""Plan optimal execution order for agents and commands"""
execution_plan = []
# Phase 1: Research and Planning
if task_req.complexity.value >= 2:
execution_plan.append("research")
if "create-plan" in commands:
execution_plan.append("create-plan")
# Phase 2: Primary execution
if primary_agent == "orchestrator":
execution_plan.append(f"orchestrator -> coordinate_agents({', '.join(supporting_agents)})")
else:
execution_plan.append(primary_agent)
# Phase 3: Supporting tasks
for agent in supporting_agents:
execution_plan.append(agent)
# Phase 4: Integration and optimization
if "notebooklm-content-optimization" in task_req.deliverables:
execution_plan.append("notebooklm_optimization")
return execution_plan
def _estimate_tokens(self, task_req: TaskRequirement, num_agents: int) -> int:
"""Estimate token usage for task"""
base_tokens = {
ComplexityLevel.SIMPLE: 10000,
ComplexityLevel.MODERATE: 25000,
ComplexityLevel.COMPLEX: 50000,
ComplexityLevel.ENTERPRISE: 100000
}
# Adjust for number of skill levels and modules
multiplier = len(task_req.skill_levels) * len(task_req.modules) * 0.25
# Adjust for number of agents
agent_multiplier = 1 + (num_agents - 1) * 0.3
return int(base_tokens[task_req.complexity] * multiplier * agent_multiplier)
def _estimate_duration(self, task_req: TaskRequirement, tokens: int) -> str:
"""Estimate duration based on complexity and tokens"""
# Rough estimation: 1000 tokens ā 1 minute
minutes = tokens / 1000
if minutes < 60:
return f"{int(minutes)} minutes"
elif minutes < 1440: # 24 hours
hours = minutes / 60
return f"{hours:.1f} hours"
else:
days = minutes / 1440
return f"{days:.1f} days"
def generate_task_script(self, task_req: TaskRequirement, recommendation: AgentRecommendation) -> str:
"""Generate executable task script for agent invocation"""
logger.info("Starting script generation")
try:
# Input validation
if not isinstance(task_req, TaskRequirement):
raise ValidationError("task_req must be a TaskRequirement instance")
if not isinstance(recommendation, AgentRecommendation):
raise ValidationError("recommendation must be an AgentRecommendation instance")
logger.info(f"Generating script for task: {task_req.task_type.value}")
script_template = f'''#!/usr/bin/env python3
""" Generated Task Script for {task_req.task_type.value} Complexity: {task_req.complexity.value}/4 Estimated Duration: {recommendation.estimated_duration} Estimated Tokens: {recommendation.estimated_tokens:,} """
from typing import List, Dict import subprocess import json
class TaskExecution: def init(self): self.primary_agent = "{recommendation.primary_agent}" self.supporting_agents = {recommendation.supporting_agents} self.required_skills = {recommendation.required_skills} self.commands = {recommendation.recommended_commands} self.execution_order = {recommendation.execution_order} self.progress = {{}}
def execute_phase(self, phase: str, agent: str, prompt: str) -> Dict:
"""Execute single phase with specified agent"""
# Using Claude Code Task protocol from CLAUDE.md
task_call = f\"\"\"
Task( subagent_type="general-purpose", description="{{phase}}", prompt=\"\"\"Use {{{{agent}}}} subagent to {{{{prompt}}}}
Context:
- Task Type: {task_req.task_type.value}
- Skill Levels: {task_req.skill_levels}
- Modules: {task_req.modules}
- Deliverables: {task_req.deliverables}
Requirements:
- Follow curriculum development best practices
- Create content with proper metadata for NotebookLM
- Include assessment integration
- Track progress with checkboxes
Report back:
- What was completed
- What remains to be done
- Current status and any blockers
- Recommendations for next steps
\\\"\\\"\\\"
)"""
print(f"Executing Phase: {{phase}}")
print(f"Agent: {{agent}}")
print(f"Task Call:\\n{{task_call}}")
# In real implementation, this would invoke Claude Code
# For now, return mock result
result = {{
"phase": phase,
"agent": agent,
"status": "completed",
"output": "Mock execution result",
"next_steps": []
}}
self.progress[phase] = result
return result
def run_complete_workflow(self):
"""Execute complete workflow according to execution order"""
results = []
for step in self.execution_order:
if "->" in step:
# Complex orchestration step
agent, action = step.split(" -> ")
result = self.execute_orchestrated_phase(agent, action)
else:
# Simple agent invocation
result = self.execute_simple_phase(step)
results.append(result)
return results
def execute_simple_phase(self, agent: str) -> Dict:
"""Execute simple single-agent phase"""
phase_prompts = {{
"research": "Research and analyze requirements for curriculum development task",
"ai-curriculum-specialist": "Generate comprehensive curriculum content with multi-level progression",
"educational-content-generator": "Create engaging educational content with proper pedagogical frameworks",
"assessment-creation-agent": "Design adaptive assessments with bias detection and accessibility features",
"create-plan": "Create detailed project plan with checkboxes and progress tracking"
}}
prompt = phase_prompts.get(agent, f"Execute {{agent}} workflow for curriculum development")
return self.execute_phase(agent, agent, prompt)
def execute_orchestrated_phase(self, orchestrator: str, action: str) -> Dict:
"""Execute complex orchestrated phase"""
orchestrator_prompt = f\"\"\"Act as project manager and {{{{action}}}}.
Create comprehensive project plan with:
- Detailed task breakdown with checkboxes
- Agent assignment and coordination
- Progress tracking and milestone management
- Quality gates and validation steps
- Token budget and timeline management
Coordinate the following workflow:
- Task Type: {task_req.task_type.value}
- Complexity: {task_req.complexity.value}/4
- Deliverables: {task_req.deliverables}
- Timeline: {task_req.timeline}
\"\"\"
return self.execute_phase("orchestration", orchestrator, orchestrator_prompt)
def generate_progress_report(self) -> str:
"""Generate comprehensive progress report"""
total_phases = len(self.execution_order)
completed_phases = len([p for p in self.progress.values() if p["status"] == "completed"])
report = f"""
Curriculum Development Progress Report
Task Overviewā
- Task Type: {task_req.task_type.value}
- Complexity: {task_req.complexity.value}/4
- Timeline: {task_req.timeline}
- Progress: {{completed_phases}}/{{total_phases}} phases completed ({{completed_phases/total_phases*100:.1f}}%)
Execution Summaryā
"""
for phase, result in self.progress.items():
status_icon = "ā
" if result["status"] == "completed" else "š" if result["status"] == "in_progress" else "ā"
report += f"{{status_icon}} **{{phase}}**: {{result['status']}}\\n"
report += f"""
Resource Usageā
- Estimated Tokens: {recommendation.estimated_tokens:,}
- Estimated Duration: {recommendation.estimated_duration}
Next Stepsā
-
Continue with remaining phases in execution order
-
Monitor token usage and adjust if needed
-
Validate deliverables meet quality standards """
return report
if name == "main": # Execute the generated task workflow executor = TaskExecution() results = executor.run_complete_workflow()
print("\\n" + "="*60)
print("CURRICULUM DEVELOPMENT TASK COMPLETED")
print("="*60)
# Generate final report
report = executor.generate_progress_report()
print(report)
# Save results
with open("task_execution_results.json", "w") as f:
json.dump({{
"task_requirements": {{
"task_type": "{task_req.task_type.value}",
"complexity": "{task_req.complexity.value}",
"skill_levels": {task_req.skill_levels},
"modules": {task_req.modules},
"deliverables": {task_req.deliverables}
}},
"recommendations": {{
"primary_agent": "{recommendation.primary_agent}",
"supporting_agents": {recommendation.supporting_agents},
"required_skills": {recommendation.required_skills},
"execution_order": {recommendation.execution_order}
}},
"execution_results": results,
"progress": executor.progress
}}, f, indent=2)
print("\\nš Detailed results saved to task_execution_results.json")
'''
logger.info("Script generation completed successfully")
return script_template
except ValidationError as e:
logger.error(f"Validation error in script generation: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error in script generation: {e}")
raise ScriptGenerationError(f"Failed to generate task script: {e}") from e
def list_all_agents(self) -> List[str]:
"""
List all available agents (NEW - uses ComponentActivator)
Returns:
List of agent names
"""
return list(self.agent_capabilities.keys())
def get_agent_info(self, agent_name: str) -> Optional[Dict]:
"""
Get detailed information about an agent (NEW - uses ComponentActivator)
Args:
agent_name: Name of the agent
Returns:
Agent configuration dictionary or None if not found
"""
return self.agent_capabilities.get(agent_name)
def search_agents(self, query: str, category: Optional[str] = None) -> List[Dict]:
"""
Search agents by keyword (NEW - uses ComponentActivator)
Args:
query: Search query
category: Optional category filter
Returns:
List of matching agents with their info
"""
results = self.activator.search_agents(query)
if category:
results = [a for a in results if a.category == category]
return [
{
"name": agent.name,
"category": agent.category,
"description": agent.description,
"tags": agent.tags
}
for agent in results
]
def discover_agents_by_capability(self, capability: str) -> List[str]:
"""
Discover agents by capability/tag (NEW - uses ComponentActivator)
Args:
capability: Capability to search for
Returns:
List of agent names with that capability
"""
return [
agent.name
for agent in self.activator.list_agents()
if capability.lower() in [tag.lower() for tag in agent.tags]
]
def main(): """Example usage of the agent dispatcher""" logger.info("Agent Dispatcher starting")
try:
dispatcher = CurriculumAgentDispatcher()
# NEW: Test ComponentActivator integration
print("=" * 60)
print("COMPONENTACTIVATOR INTEGRATION TEST")
print("=" * 60)
print(f"\nā
Dispatcher now knows about {len(dispatcher.list_all_agents())} agents")
print(f" (Previously only knew about 4 hardcoded agents)")
print(f"\nTotal components loaded:")
print(f" Agents: {len(dispatcher.activator.agents)}")
print(f" Skills: {len(dispatcher.activator.skills)}")
print(f" Commands: {len(dispatcher.activator.commands)}")
print(f" Scripts: {len(dispatcher.activator.scripts)}")
print(f" Hooks: {len(dispatcher.activator.hooks)}")
# Test agent search
git_agents = dispatcher.search_agents("git")
print(f"\nš Search test: Found {len(git_agents)} agents related to 'git':")
for agent in git_agents[:3]:
print(f" - {agent['name']} ({agent['category']})")
print("\n" + "=" * 60)
print()
# Example workflow analysis
workflow_description = """
Create comprehensive AI curriculum content for Module 3 Deep Learning
across all skill levels (beginner through expert) with integrated
assessments, NotebookLM optimization, and progress tracking.
"""
requirements = {
"skill_levels": ["beginner", "intermediate", "advanced", "expert"],
"modules": ["module3_deep_learning"],
"deliverables": ["content", "assessments", "notebooklm_optimization"],
"timeline": "2-3 weeks",
"dependencies": ["module2_machine_learning"]
}
# Analyze and recommend
logger.info("Executing workflow analysis and agent recommendation")
task_req = dispatcher.analyze_workflow(workflow_description, requirements)
recommendation = dispatcher.recommend_agents(task_req)
# Generate executable script
logger.info("Generating executable task script")
script = dispatcher.generate_task_script(task_req, recommendation)
# Output results
print("="*60)
print("AI CURRICULUM AGENT DISPATCHER - ANALYSIS RESULTS")
print("="*60)
print(f"\\nšÆ **Task Analysis:**")
print(f" Task Type: {task_req.task_type.value}")
print(f" Complexity: {task_req.complexity.value}/4")
print(f" Skill Levels: {', '.join(task_req.skill_levels)}")
print(f" Modules: {', '.join(task_req.modules)}")
print(f"\\nš¤ **Agent Recommendations:**")
print(f" Primary Agent: {recommendation.primary_agent}")
print(f" Supporting Agents: {', '.join(recommendation.supporting_agents)}")
print(f" Required Skills: {', '.join(recommendation.required_skills)}")
print(f" Commands: {', '.join(recommendation.recommended_commands)}")
print(f"\\nā” **Execution Plan:**")
for i, step in enumerate(recommendation.execution_order, 1):
print(f" {i}. {step}")
print(f"\\nš **Resource Estimates:**")
print(f" Estimated Tokens: {recommendation.estimated_tokens:,}")
print(f" Estimated Duration: {recommendation.estimated_duration}")
print(f"\\nš **Generated Script:**")
print(" Executable task script generated with:")
print(" - Agent invocation using Task protocol")
print(" - Progress tracking with checkboxes")
print(" - Autonomous execution and reporting")
print(" - Multi-session state management")
# Save the script
script_filename = f"generated_task_{task_req.task_type.value}.py"
with open(script_filename, "w") as f:
f.write(script)
print(f"\\nā
**Script saved as:** {script_filename}")
print(f" Execute with: python {script_filename}")
print(f"\\nš **Log file:** {log_file}")
logger.info("Agent Dispatcher completed successfully")
return 0
except ValidationError as e:
error_msg = f"Validation error: {e}"
print(f"\\nā ERROR: {error_msg}", file=sys.stderr)
logger.error(error_msg)
return 1
except WorkflowAnalysisError as e:
error_msg = f"Workflow analysis failed: {e}"
print(f"\\nā ERROR: {error_msg}", file=sys.stderr)
logger.error(error_msg)
return 1
except AgentRecommendationError as e:
error_msg = f"Agent recommendation failed: {e}"
print(f"\\nā ERROR: {error_msg}", file=sys.stderr)
logger.error(error_msg)
return 1
except ScriptGenerationError as e:
error_msg = f"Script generation failed: {e}"
print(f"\\nā ERROR: {error_msg}", file=sys.stderr)
logger.error(error_msg)
return 1
except DispatcherError as e:
error_msg = f"Dispatcher error: {e}"
print(f"\\nā ERROR: {error_msg}", file=sys.stderr)
logger.error(error_msg)
return 1
except Exception as e:
error_msg = f"Unexpected error: {e}"
print(f"\\nā CRITICAL ERROR: {error_msg}", file=sys.stderr)
logger.exception("Unexpected error in main()")
return 1
def create_recursive_chain( model_client, usage_tracker=None, config_path: Optional[Path] = None ): """ Create a RecursiveAgentChain with configuration.
Integration point for ADR-077 Recursive Agent Chain Architecture.
Args:
model_client: Model client for LLM calls
usage_tracker: Optional UsageTracker for token tracking
config_path: Optional path to agent-chain-config.json
Returns:
Configured RecursiveAgentChain
"""
from scripts.core.recursive_agent_chain import RecursiveAgentChain
chain = RecursiveAgentChain(model_client, usage_tracker=usage_tracker)
# Load configuration if available
if config_path is None:
config_path = Path(__file__).parent.parent.parent / "config" / "agent-chain-config.json"
if config_path.exists():
with open(config_path) as f:
config = json.load(f)
# Apply configuration
if "recursion" in config:
chain.MAX_DEPTH = config["recursion"].get("max_depth", 5)
chain.MAX_BATCH_SIZE = config["recursion"].get("max_batch_size", 10)
logger.info(f"RecursiveAgentChain configured: max_depth={chain.MAX_DEPTH}, max_batch={chain.MAX_BATCH_SIZE}")
return chain
def create_context_isolation_manager( project_root: Optional[Path] = None ): """ Create a ContextIsolationManager for subagent dispatch.
Integration point for ADR-078 Subagent Context Isolation.
Args:
project_root: Path to project root (auto-detected if None)
Returns:
Configured ContextIsolationManager
Example:
manager = create_context_isolation_manager()
ctx = manager.create_isolated_context(
task_id="A.9.1.3",
task_description="Implement UserService",
required_files=["src/models/user.py"],
constraints=["Extend BaseService"]
)
# Generate Task call
dispatch = manager.dispatch_subagent(ctx, "backend-specialist")
"""
from scripts.core.context_isolation import ContextIsolationManager
if project_root is None:
# Auto-detect project root (2 levels up from scripts/core/)
project_root = Path(__file__).parent.parent.parent
manager = ContextIsolationManager(project_root)
logger.info(f"ContextIsolationManager created for: {project_root}")
return manager
def isolate_and_dispatch_task( task_id: str, task_description: str, agent_type: str = "general", required_files: Optional[List[str]] = None, constraints: Optional[List[str]] = None, project_root: Optional[Path] = None ) -> str: """ Convenience function to create isolated context and generate dispatch.
Args:
task_id: PILOT plan task ID (e.g., A.9.1.3)
task_description: Full task text
agent_type: Type of subagent to dispatch
required_files: Files needed by subagent
constraints: Task constraints
project_root: Path to project root
Returns:
Task() invocation string ready for execution
Example:
dispatch = isolate_and_dispatch_task(
task_id="A.1.1",
task_description="Implement email validation",
agent_type="backend-specialist",
required_files=["src/models/user.py"]
)
# Returns: Task(subagent_type="backend-specialist", prompt="...", description="A.1.1: Execute isolated task")
"""
from scripts.core.context_isolation import isolate_and_dispatch
if project_root is None:
project_root = Path(__file__).parent.parent.parent
return isolate_and_dispatch(
task_id=task_id,
task_description=task_description,
project_root=project_root,
agent_type=agent_type,
required_files=required_files,
constraints=constraints
)
if name == "main": sys.exit(main())