Section to Add to PROJECT PLAN EXECUTOR REFACTORING.md
Section to Add to PROJECT-PLAN-EXECUTOR-REFACTORING.md
Insert this section after "Strategic Context & Rationale" and before "Technical Architecture"
User Feedback: Async Executor Alignment
User Request
"I agree with your analysis. Please explicitly detail making
TaskExecutor.executeasync to align seamlessly withProjectOrchestrator.execute_task's async nature."
Strategic Decision: Make TaskExecutor.execute() Async
APPROVED: Based on user feedback, we will make TaskExecutor.execute() an async method as part of this refactoring project.
Rationale
-
Seamless Integration:
ProjectOrchestrator.execute_task()will become async, enabling end-to-end async flow withoutasyncio.run()wrappers. -
Eliminates Async/Sync Boundary: Current architecture uses
asyncio.run()in_execute_via_llm(), which:- Creates new event loop for each task (~10-20ms overhead)
- Blocks async orchestration
- Prevents parallel task execution
- Makes Phase 1 Message Bus impossible
-
Enables Parallel Execution: Async executor enables concurrent LLM API calls:
# Current (Sequential): 6 seconds for 3 tasks
Task 1: |---LLM API (2s)---|
Task 2: |---LLM API (2s)---|
Task 3: |---LLM API (2s)---|
# Target (Parallel): 2 seconds for 3 tasks
Task 1: |---LLM API (2s)---|
Task 2: |---LLM API (2s)---|
Task 3: |---LLM API (2s)---|
4. **Phase 1 Foundation:** Message Bus autonomous agents **require** async executor for agent-to-agent coordination.
### Method Signature Changes
#### Current (Sync)
```python
def execute(
self,
task: AgentTask,
agent: Optional[str] = None,
mode: Optional[str] = None
) -> ExecutionResult:
"""Execute a single task using specified agent."""
# Synchronous execution with asyncio.run() wrapper
...
Target (Async)
async def execute(
self,
task: AgentTask,
agent: Optional[str] = None,
mode: Optional[str] = None
) -> ExecutionResult:
"""
Execute a single task using specified agent.
Note:
This method is async to enable concurrent task execution
and seamless integration with async LLM providers.
"""
# Direct async execution, no wrappers needed
...
Cascading Changes
TaskExecutor._execute_via_llm()→ asyncTaskExecutor._execute_api()→ asyncProjectOrchestrator.execute_task()→ async- All callers → use
await executor.execute() - All tests → migrate to
pytest-asyncio
Performance Impact
Baseline (Current)
- Single task overhead: 180-350ms (subprocess) → 66-126ms (direct LLM)
- Improvement: 30-50%
With Async (Target)
- Single task overhead: 66-126ms (same)
- Parallel tasks (3): 6s (sequential) → 2s (concurrent)
- Improvement: 30-50% (single) + 3x (parallel)
Budget Impact
Original Phase 1B: 16 hours, $2,000 Updated Phase 1B (Async): 22 hours, $2,750 Delta: +6 hours, +$750
Total Project Budget
- Original: $10,000 (80 hours)
- Updated: $10,750 (86 hours)
- Delta: +$750 (7.5% increase)
Justification: The $750 investment enables:
- End-to-end async flow (eliminates event loop overhead)
- Parallel task execution (3x speedup)
- Phase 1 Message Bus foundation (enables $100K+ autonomous agent implementation)
- ROI: 10x+
Timeline Impact
Original Timeline: 4 weeks (80 hours) Updated Timeline: 4.5 weeks (86 hours) Delta: +3 days (concentrated in Phase 1B)
Week 1 (Updated)
- Days 1-2: Phase 1A (Foundation) - 18 hours
- Days 3-5.5: Phase 1B (Async Executor) - 22 hours
Weeks 2-4: No changes (providers already async-compatible)
Testing Strategy
Requirement: All tests must use pytest-asyncio for async test support.
Add to requirements-dev.txt
pytest-asyncio==0.23.0
Test Migration Example
Before (Sync)
def test_execute_task():
result = executor.execute(task)
assert result.status == ExecutionStatus.SUCCESS
After (Async)
@pytest.mark.asyncio
async def test_execute_task():
result = await executor.execute(task)
assert result.status == ExecutionStatus.SUCCESS
Parallel Execution Test
@pytest.mark.asyncio
async def test_parallel_execution():
tasks = [AgentTask(...) for i in range(3)]
start = time.time()
results = await asyncio.gather(*[executor.execute(task) for task in tasks])
end = time.time()
assert all(r.status == ExecutionStatus.SUCCESS for r in results)
assert end - start < 3.0 # Verify parallel execution
Updated Success Criteria
Original
- ✅ Performance benchmarks show 30%+ improvement
- ✅ Async/sync compatibility layer functional
Updated
- ✅ Performance benchmarks show 30%+ improvement (single task)
- ✅ Performance benchmarks show 3x improvement (parallel tasks) ← NEW
- ✅ Fully async execution (no asyncio.run() wrappers) ← UPDATED
- ✅ Parallel task execution validated (asyncio.gather) ← NEW
- ✅ pytest-asyncio test suite operational ← NEW
Updated Phase 1B Task Breakdown
Original Phase 1B (16 hours)
- Add
_execute_via_llm()method - 8 hours - Add async/sync compatibility layer - 4 hours
- Write unit tests - 4 hours
Updated Phase 1B (22 hours)
- Make
_execute_via_llm()fully async - 2 hours (remove asyncio.run()) - Make
TaskExecutor.execute()async - 4 hours (signature change + caller updates) - Make
TaskExecutor._execute_api()async - 2 hours (await calls) - Make
ProjectOrchestrator.execute_task()async - 2 hours (signature change) - Update internal method calls with await - 4 hours (orchestrator methods)
- Write async unit tests with pytest-asyncio - 6 hours (test migration)
- Integration tests for async flow - 2 hours (end-to-end async validation)
Risk Mitigation
New Risks
| Risk ID | Description | Mitigation |
|---|---|---|
| R11 | Async/await bugs in caller code | Comprehensive async test suite; mypy type checking |
| R12 | pytest-asyncio compatibility | Use latest pytest-asyncio (0.23+); test early in Phase 1B |
| R13 | Event loop conflicts in tests | Use asyncio_mode=auto in pytest.ini |
| R14 | Parallel execution race conditions | Add asyncio.Lock() for shared state; thorough testing |
Rollback Plan
If async refactoring causes critical issues:
- Revert
TaskExecutor.execute()to sync - Re-add
asyncio.run()wrapper in_execute_via_llm() - Keep script execution as fallback
- Defer async refactoring to Phase 2
For complete details, see: docs/ASYNC-EXECUTOR-STRATEGIC-PLAN.md (62KB comprehensive analysis)
Decision Status: ✅ APPROVED (based on user feedback) Implementation Phase: Phase 1B (Week 1, Days 3-5.5) Budget Approved: $10,750 (up from $10,000) Timeline Approved: 4.5 weeks (up from 4 weeks)