Skip to main content

Section to Add to PROJECT PLAN EXECUTOR REFACTORING.md

Section to Add to PROJECT-PLAN-EXECUTOR-REFACTORING.md

Insert this section after "Strategic Context & Rationale" and before "Technical Architecture"


User Feedback: Async Executor Alignment

User Request

"I agree with your analysis. Please explicitly detail making TaskExecutor.execute async to align seamlessly with ProjectOrchestrator.execute_task's async nature."

Strategic Decision: Make TaskExecutor.execute() Async

APPROVED: Based on user feedback, we will make TaskExecutor.execute() an async method as part of this refactoring project.

Rationale

  1. Seamless Integration: ProjectOrchestrator.execute_task() will become async, enabling end-to-end async flow without asyncio.run() wrappers.

  2. Eliminates Async/Sync Boundary: Current architecture uses asyncio.run() in _execute_via_llm(), which:

    • Creates new event loop for each task (~10-20ms overhead)
    • Blocks async orchestration
    • Prevents parallel task execution
    • Makes Phase 1 Message Bus impossible
  3. Enables Parallel Execution: Async executor enables concurrent LLM API calls:

    # Current (Sequential): 6 seconds for 3 tasks
    Task 1: |---LLM API (2s)---|
    Task 2: |---LLM API (2s)---|
    Task 3: |---LLM API (2s)---|

    # Target (Parallel): 2 seconds for 3 tasks
    Task 1: |---LLM API (2s)---|
    Task 2: |---LLM API (2s)---|
    Task 3: |---LLM API (2s)---|

4. **Phase 1 Foundation:** Message Bus autonomous agents **require** async executor for agent-to-agent coordination.

### Method Signature Changes

#### Current (Sync)

```python
def execute(
self,
task: AgentTask,
agent: Optional[str] = None,
mode: Optional[str] = None
) -> ExecutionResult:
"""Execute a single task using specified agent."""
# Synchronous execution with asyncio.run() wrapper
...

Target (Async)

async def execute(
self,
task: AgentTask,
agent: Optional[str] = None,
mode: Optional[str] = None
) -> ExecutionResult:
"""
Execute a single task using specified agent.

Note:
This method is async to enable concurrent task execution
and seamless integration with async LLM providers.
"""
# Direct async execution, no wrappers needed
...

Cascading Changes

  • TaskExecutor._execute_via_llm() → async
  • TaskExecutor._execute_api() → async
  • ProjectOrchestrator.execute_task() → async
  • All callers → use await executor.execute()
  • All tests → migrate to pytest-asyncio

Performance Impact

Baseline (Current)

  • Single task overhead: 180-350ms (subprocess) → 66-126ms (direct LLM)
  • Improvement: 30-50%

With Async (Target)

  • Single task overhead: 66-126ms (same)
  • Parallel tasks (3): 6s (sequential) → 2s (concurrent)
  • Improvement: 30-50% (single) + 3x (parallel)

Budget Impact

Original Phase 1B: 16 hours, $2,000 Updated Phase 1B (Async): 22 hours, $2,750 Delta: +6 hours, +$750

Total Project Budget

  • Original: $10,000 (80 hours)
  • Updated: $10,750 (86 hours)
  • Delta: +$750 (7.5% increase)

Justification: The $750 investment enables:

  • End-to-end async flow (eliminates event loop overhead)
  • Parallel task execution (3x speedup)
  • Phase 1 Message Bus foundation (enables $100K+ autonomous agent implementation)
  • ROI: 10x+

Timeline Impact

Original Timeline: 4 weeks (80 hours) Updated Timeline: 4.5 weeks (86 hours) Delta: +3 days (concentrated in Phase 1B)

Week 1 (Updated)

  • Days 1-2: Phase 1A (Foundation) - 18 hours
  • Days 3-5.5: Phase 1B (Async Executor) - 22 hours

Weeks 2-4: No changes (providers already async-compatible)

Testing Strategy

Requirement: All tests must use pytest-asyncio for async test support.

Add to requirements-dev.txt


pytest-asyncio==0.23.0

Test Migration Example

Before (Sync)

def test_execute_task():
result = executor.execute(task)
assert result.status == ExecutionStatus.SUCCESS

After (Async)

@pytest.mark.asyncio
async def test_execute_task():
result = await executor.execute(task)
assert result.status == ExecutionStatus.SUCCESS

Parallel Execution Test

@pytest.mark.asyncio
async def test_parallel_execution():
tasks = [AgentTask(...) for i in range(3)]

start = time.time()
results = await asyncio.gather(*[executor.execute(task) for task in tasks])
end = time.time()

assert all(r.status == ExecutionStatus.SUCCESS for r in results)
assert end - start < 3.0 # Verify parallel execution

Updated Success Criteria

Original

  • ✅ Performance benchmarks show 30%+ improvement
  • ✅ Async/sync compatibility layer functional

Updated

  • ✅ Performance benchmarks show 30%+ improvement (single task)
  • Performance benchmarks show 3x improvement (parallel tasks) ← NEW
  • Fully async execution (no asyncio.run() wrappers) ← UPDATED
  • Parallel task execution validated (asyncio.gather) ← NEW
  • pytest-asyncio test suite operational ← NEW

Updated Phase 1B Task Breakdown

Original Phase 1B (16 hours)

  1. Add _execute_via_llm() method - 8 hours
  2. Add async/sync compatibility layer - 4 hours
  3. Write unit tests - 4 hours

Updated Phase 1B (22 hours)

  1. Make _execute_via_llm() fully async - 2 hours (remove asyncio.run())
  2. Make TaskExecutor.execute() async - 4 hours (signature change + caller updates)
  3. Make TaskExecutor._execute_api() async - 2 hours (await calls)
  4. Make ProjectOrchestrator.execute_task() async - 2 hours (signature change)
  5. Update internal method calls with await - 4 hours (orchestrator methods)
  6. Write async unit tests with pytest-asyncio - 6 hours (test migration)
  7. Integration tests for async flow - 2 hours (end-to-end async validation)

Risk Mitigation

New Risks

Risk IDDescriptionMitigation
R11Async/await bugs in caller codeComprehensive async test suite; mypy type checking
R12pytest-asyncio compatibilityUse latest pytest-asyncio (0.23+); test early in Phase 1B
R13Event loop conflicts in testsUse asyncio_mode=auto in pytest.ini
R14Parallel execution race conditionsAdd asyncio.Lock() for shared state; thorough testing

Rollback Plan

If async refactoring causes critical issues:

  1. Revert TaskExecutor.execute() to sync
  2. Re-add asyncio.run() wrapper in _execute_via_llm()
  3. Keep script execution as fallback
  4. Defer async refactoring to Phase 2

For complete details, see: docs/ASYNC-EXECUTOR-STRATEGIC-PLAN.md (62KB comprehensive analysis)

Decision Status: ✅ APPROVED (based on user feedback) Implementation Phase: Phase 1B (Week 1, Days 3-5.5) Budget Approved: $10,750 (up from $10,000) Timeline Approved: 4.5 weeks (up from 4 weeks)