scripts-complexity-assessor
#!/usr/bin/env python3 """ā
title: "Complexity Assessor" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Complexity Scaling System for CODITECT Core" keywords: ['analysis', 'api', 'assessor', 'backend', 'complexity'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "complexity-assessor.py" language: python executable: true usage: "python3 scripts/complexity-assessor.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: falseā
Complexity Scaling System for CODITECT Core
Automatically assesses task complexity and recommends appropriate workflow phases, token budgets, and agent sequences. Inspired by Auto-Claude patterns but built """
from scratch for CODITECT architecture.
Usage: python3 scripts/complexity-assessor.py "Add user authentication system" python3 scripts/complexity-assessor.py --task-file tasks.json --output assessment.json python3 scripts/complexity-assessor.py --interactive """
import argparse import json import os import re import sys from dataclasses import dataclass, asdict from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Dict, List, Optional, Tuple
class ComplexityLevel(Enum): """Task complexity classifications""" SIMPLE = "simple" # 1-2 files, single service, low risk STANDARD = "standard" # 3-10 files, multiple services, moderate risk COMPLEX = "complex" # 10+ files, cross-cutting, high risk
class RiskLevel(Enum): """Risk assessment levels""" LOW = "low" # Non-breaking, isolated changes MEDIUM = "medium" # Minor breaking changes, limited scope HIGH = "high" # Major breaking changes, security implications
@dataclass class ComplexityAssessment: """Complete complexity assessment result""" task: str complexity: ComplexityLevel confidence: float phases: List[str] estimated_files: int services_affected: List[str] integration_complexity: str risk_level: RiskLevel dependency_depth: int recommended_workflow: str token_budget: int estimated_duration: str agent_sequence: List[str] rationale: str timestamp: str
class ComplexityAssessor: """Assess task complexity and recommend appropriate workflows"""
def __init__(self, config_path: Optional[str] = None):
"""Initialize complexity assessor with configuration"""
self.config_path = config_path or self._default_config_path()
self.config = self._load_config()
self.assessment_history = []
def _default_config_path(self) -> str:
"""Get default config path relative to script location"""
script_dir = Path(__file__).parent
return str(script_dir.parent / "config" / "complexity-config.json")
def _load_config(self) -> Dict:
"""Load complexity configuration"""
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
else:
# Return default configuration
return self._default_config()
def _default_config(self) -> Dict:
"""Default complexity configuration"""
return {
"file_count_thresholds": {
"simple": 2,
"standard": 10
},
"service_patterns": {
"backend": ["api", "backend", "server", "database", "auth"],
"frontend": ["ui", "frontend", "component", "view", "page"],
"database": ["schema", "migration", "model", "table"],
"worker": ["job", "task", "queue", "background"],
"cache": ["redis", "cache", "memcache"],
"messaging": ["message", "event", "queue", "pub", "sub"]
},
"integration_keywords": {
"external_api": ["api integration", "third-party", "external service"],
"payment": ["payment", "stripe", "paypal", "billing"],
"auth": ["oauth", "sso", "authentication", "authorization"],
"analytics": ["tracking", "analytics", "metrics"]
},
"risk_keywords": {
"high": ["security", "auth", "payment", "data migration", "breaking"],
"medium": ["api change", "schema change", "refactor"],
"low": ["ui update", "documentation", "logging"]
},
"phase_configs": {
"simple": [
"discovery",
"quick_implementation",
"validation"
],
"standard": [
"discovery",
"requirements",
"context_gathering",
"implementation",
"testing",
"validation"
],
"complex": [
"discovery",
"requirements",
"research",
"context_gathering",
"architecture_review",
"implementation",
"testing",
"self_critique",
"validation"
]
},
"token_budgets": {
"simple": 15000,
"standard": 50000,
"complex": 150000
},
"durations": {
"simple": "15-30m",
"standard": "30m-2h",
"complex": "2h+"
},
"workflow_mappings": {
"feature": "feature-development",
"bugfix": "bug-fix",
"refactor": "code-refactoring",
"security": "security-audit",
"performance": "performance-optimization",
"migration": "data-migration",
"integration": "api-integration"
}
}
def assess(self, task_description: str) -> ComplexityAssessment:
"""
Assess task complexity using rule-based analysis.
Args:
task_description: Natural language task description
Returns:
ComplexityAssessment with recommendations
"""
# Rule-based complexity analysis
file_count = self._estimate_file_count(task_description)
services = self._identify_services(task_description)
integration = self._assess_integration_complexity(task_description)
risk = self._assess_risk_level(task_description)
dependencies = self._estimate_dependency_depth(task_description)
# Determine complexity level
complexity, confidence = self._determine_complexity(
file_count, services, integration, risk, dependencies
)
# Get phase configuration
phases = self.config["phase_configs"][complexity.value]
# Determine workflow type
workflow = self._recommend_workflow(task_description)
# Get token budget
token_budget = self.config["token_budgets"][complexity.value]
# Get duration estimate
duration = self.config["durations"][complexity.value]
# Recommend agent sequence
agent_sequence = self._recommend_agents(complexity, workflow, services)
# Generate rationale
rationale = self._generate_rationale(
complexity, file_count, services, integration, risk, dependencies
)
# Create assessment
assessment = ComplexityAssessment(
task=task_description,
complexity=complexity,
confidence=confidence,
phases=phases,
estimated_files=file_count,
services_affected=services,
integration_complexity=integration,
risk_level=risk,
dependency_depth=dependencies,
recommended_workflow=workflow,
token_budget=token_budget,
estimated_duration=duration,
agent_sequence=agent_sequence,
rationale=rationale,
timestamp=datetime.now(timezone.utc).isoformat()
)
self.assessment_history.append(assessment)
return assessment
def _estimate_file_count(self, description: str) -> int:
"""Estimate number of files to modify"""
desc_lower = description.lower()
# Check for explicit counts
count_patterns = [
r'(\d+)\s*files?',
r'modify\s*(\d+)',
r'update\s*(\d+)'
]
for pattern in count_patterns:
match = re.search(pattern, desc_lower)
if match:
return int(match.group(1))
# Heuristic-based estimation
file_indicators = {
'ui': 3,
'frontend': 3,
'component': 2,
'page': 2,
'view': 2,
'api': 4,
'endpoint': 2,
'route': 2,
'backend': 5,
'database': 3,
'schema': 2,
'migration': 1,
'model': 2,
'authentication': 8,
'auth': 8,
'payment': 10,
'integration': 6,
'dashboard': 5,
'admin': 6,
'crud': 5,
'search': 4,
'filter': 3,
'report': 4,
'export': 3
}
estimated_files = 1 # Minimum
for keyword, file_count in file_indicators.items():
if keyword in desc_lower:
estimated_files = max(estimated_files, file_count)
# Multipliers for scope keywords
if 'complete' in desc_lower or 'comprehensive' in desc_lower:
estimated_files = int(estimated_files * 1.5)
elif 'full' in desc_lower or 'entire' in desc_lower:
estimated_files = int(estimated_files * 1.3)
return min(estimated_files, 50) # Cap at 50
def _identify_services(self, description: str) -> List[str]:
"""Identify affected services"""
desc_lower = " " + description.lower() + " " # Add spaces for boundary matching
services = []
for service_type, keywords in self.config["service_patterns"].items():
# Skip metadata keys
if service_type.startswith("_"):
continue
if any(keyword in desc_lower for keyword in keywords):
services.append(service_type)
return services if services else ["backend"]
def _assess_integration_complexity(self, description: str) -> str:
"""Assess external integration complexity"""
desc_lower = description.lower()
integration_count = 0
for integration_type, keywords in self.config["integration_keywords"].items():
if any(keyword in desc_lower for keyword in keywords):
integration_count += 1
if integration_count >= 2:
return "high"
elif integration_count == 1:
return "medium"
else:
return "low"
def _assess_risk_level(self, description: str) -> RiskLevel:
"""Assess risk level of changes"""
desc_lower = description.lower()
for risk_keywords in self.config["risk_keywords"]["high"]:
if risk_keywords in desc_lower:
return RiskLevel.HIGH
for risk_keywords in self.config["risk_keywords"]["medium"]:
if risk_keywords in desc_lower:
return RiskLevel.MEDIUM
return RiskLevel.LOW
def _estimate_dependency_depth(self, description: str) -> int:
"""Estimate dependency depth (how many components affected)"""
desc_lower = description.lower()
depth_indicators = {
'refactor': 3,
'migrate': 4,
'redesign': 4,
'architecture': 5,
'system-wide': 5,
'platform': 4,
'framework': 4,
'infrastructure': 4
}
max_depth = 1
for keyword, depth in depth_indicators.items():
if keyword in desc_lower:
max_depth = max(max_depth, depth)
return max_depth
def _determine_complexity(
self,
file_count: int,
services: List[str],
integration: str,
risk: RiskLevel,
dependencies: int
) -> Tuple[ComplexityLevel, float]:
"""
Determine overall complexity level with confidence score.
Returns:
Tuple of (ComplexityLevel, confidence_score)
"""
complexity_score = 0
weights = {
'file_count': 0.3,
'services': 0.2,
'integration': 0.2,
'risk': 0.2,
'dependencies': 0.1
}
# File count scoring (0-10)
if file_count <= self.config["file_count_thresholds"]["simple"]:
file_score = 2
elif file_count <= self.config["file_count_thresholds"]["standard"]:
file_score = 5
else:
file_score = 9
# Services scoring (0-10)
service_score = min(len(services) * 2.5, 10)
# Integration scoring (0-10)
integration_scores = {"low": 2, "medium": 5, "high": 9}
integration_score = integration_scores[integration]
# Risk scoring (0-10)
risk_scores = {RiskLevel.LOW: 2, RiskLevel.MEDIUM: 5, RiskLevel.HIGH: 9}
risk_score = risk_scores[risk]
# Dependency scoring (0-10)
dependency_score = min(dependencies * 2, 10)
# Calculate weighted score
complexity_score = (
file_score * weights['file_count'] +
service_score * weights['services'] +
integration_score * weights['integration'] +
risk_score * weights['risk'] +
dependency_score * weights['dependencies']
)
# Determine complexity level
if complexity_score <= 3.5:
complexity = ComplexityLevel.SIMPLE
confidence = 0.75 + (3.5 - complexity_score) / 10
elif complexity_score <= 6.5:
complexity = ComplexityLevel.STANDARD
confidence = 0.80
else:
complexity = ComplexityLevel.COMPLEX
confidence = 0.70 + (complexity_score - 6.5) / 10
return complexity, min(confidence, 0.95)
def _recommend_workflow(self, description: str) -> str:
"""Recommend workflow from library"""
desc_lower = description.lower()
# Check workflow mappings
for keyword, workflow in self.config["workflow_mappings"].items():
if keyword in desc_lower:
return workflow
# Default to feature development
return "feature-development"
def _recommend_agents(
self,
complexity: ComplexityLevel,
workflow: str,
services: List[str]
) -> List[str]:
"""Recommend agent sequence based on complexity and services"""
agents = []
# Always start with orchestrator for standard/complex
if complexity != ComplexityLevel.SIMPLE:
agents.append("orchestrator")
# Add service-specific agents
service_agents = {
"backend": "backend-development",
"frontend": "frontend-development",
"database": "database-specialist",
"worker": "background-job-specialist",
"cache": "performance-optimization",
"messaging": "event-driven-architect"
}
for service in services:
if service in service_agents:
agent = service_agents[service]
if agent not in agents:
agents.append(agent)
# Add workflow-specific agents
workflow_agents = {
"security-audit": ["security-auditor", "penetration-testing-agent"],
"performance-optimization": ["performance-optimization", "codebase-analyzer"],
"api-integration": ["api-integration-specialist", "testing-specialist"],
"data-migration": ["database-specialist", "data-engineer"]
}
if workflow in workflow_agents:
for agent in workflow_agents[workflow]:
if agent not in agents:
agents.append(agent)
# Add testing for standard/complex
if complexity != ComplexityLevel.SIMPLE:
if "testing-specialist" not in agents:
agents.append("testing-specialist")
# Add QA for complex
if complexity == ComplexityLevel.COMPLEX:
if "qa-specialist" not in agents:
agents.append("qa-specialist")
return agents if agents else ["backend-development"]
def _generate_rationale(
self,
complexity: ComplexityLevel,
file_count: int,
services: List[str],
integration: str,
risk: RiskLevel,
dependencies: int
) -> str:
"""Generate human-readable rationale for complexity assessment"""
reasons = []
if file_count <= 2:
reasons.append(f"minimal file changes ({file_count} files)")
elif file_count <= 10:
reasons.append(f"moderate file changes ({file_count} files)")
else:
reasons.append(f"extensive file changes ({file_count} files)")
if len(services) == 1:
reasons.append(f"single service affected ({services[0]})")
elif len(services) <= 3:
reasons.append(f"multiple services affected ({', '.join(services)})")
else:
reasons.append(f"cross-cutting changes across {len(services)} services")
if integration != "low":
reasons.append(f"{integration} integration complexity")
if risk == RiskLevel.HIGH:
reasons.append("high-risk changes requiring extra scrutiny")
elif risk == RiskLevel.MEDIUM:
reasons.append("moderate risk requiring careful testing")
if dependencies >= 3:
reasons.append(f"deep dependency chain (depth {dependencies})")
rationale = (
f"Classified as {complexity.value.upper()} due to: " +
"; ".join(reasons) + "."
)
return rationale
def save_assessment(self, assessment: ComplexityAssessment, output_path: str):
"""Save assessment to JSON file"""
output_data = asdict(assessment)
# Convert enums to strings
output_data['complexity'] = assessment.complexity.value
output_data['risk_level'] = assessment.risk_level.value
with open(output_path, 'w') as f:
json.dump(output_data, f, indent=2)
print(f"Assessment saved to: {output_path}")
def print_assessment(self, assessment: ComplexityAssessment):
"""Print formatted assessment to console"""
print("\n" + "=" * 70)
print("COMPLEXITY ASSESSMENT REPORT")
print("=" * 70)
print(f"\nTask: {assessment.task}")
print(f"\nComplexity: {assessment.complexity.value.upper()}")
print(f"Confidence: {assessment.confidence:.1%}")
print(f"Risk Level: {assessment.risk_level.value.upper()}")
print(f"\nš METRICS:")
print(f" Estimated Files: {assessment.estimated_files}")
print(f" Services Affected: {', '.join(assessment.services_affected)}")
print(f" Integration Complexity: {assessment.integration_complexity}")
print(f" Dependency Depth: {assessment.dependency_depth}")
print(f"\nš WORKFLOW PLAN:")
print(f" Recommended Workflow: {assessment.recommended_workflow}")
print(f" Estimated Duration: {assessment.estimated_duration}")
print(f" Token Budget: {assessment.token_budget:,}")
print(f"\nš PHASES ({len(assessment.phases)}):")
for i, phase in enumerate(assessment.phases, 1):
print(f" {i}. {phase.replace('_', ' ').title()}")
print(f"\nš¤ AGENT SEQUENCE ({len(assessment.agent_sequence)}):")
for i, agent in enumerate(assessment.agent_sequence, 1):
print(f" {i}. {agent}")
print(f"\nš” RATIONALE:")
print(f" {assessment.rationale}")
print("\n" + "=" * 70)
def main(): """Command-line interface for complexity assessor""" parser = argparse.ArgumentParser( description="Assess task complexity and recommend workflows", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples:
Assess single task
python3 complexity-assessor.py "Add user authentication system"
Assess with custom config
python3 complexity-assessor.py "Migrate to new database" --config custom-config.json
Assess and save to file
python3 complexity-assessor.py "Build analytics dashboard" --output assessment.json
Interactive mode
python3 complexity-assessor.py --interactive """ )
parser.add_argument(
'task',
nargs='?',
help='Task description to assess'
)
parser.add_argument(
'--config',
help='Path to custom complexity configuration'
)
parser.add_argument(
'--output', '-o',
help='Save assessment to JSON file'
)
parser.add_argument(
'--interactive', '-i',
action='store_true',
help='Interactive mode - prompt for task description'
)
parser.add_argument(
'--batch',
help='Assess multiple tasks from JSON file'
)
parser.add_argument(
'--quiet', '-q',
action='store_true',
help='Suppress detailed output, only show summary'
)
args = parser.parse_args()
# Initialize assessor
assessor = ComplexityAssessor(config_path=args.config)
# Interactive mode
if args.interactive:
print("CODITECT Complexity Assessor - Interactive Mode")
print("=" * 70)
task = input("\nEnter task description: ").strip()
if not task:
print("Error: Task description required")
sys.exit(1)
assessment = assessor.assess(task)
assessor.print_assessment(assessment)
if args.output:
assessor.save_assessment(assessment, args.output)
return
# Batch mode
if args.batch:
with open(args.batch, 'r') as f:
tasks = json.load(f)
assessments = []
for task in tasks:
assessment = assessor.assess(task)
assessments.append(assessment)
if not args.quiet:
assessor.print_assessment(assessment)
if args.output:
with open(args.output, 'w') as f:
json.dump([asdict(a) for a in assessments], f, indent=2, default=str)
print(f"\nBatch assessments saved to: {args.output}")
return
# Single task mode
if not args.task:
parser.print_help()
sys.exit(1)
assessment = assessor.assess(args.task)
if not args.quiet:
assessor.print_assessment(assessment)
if args.output:
assessor.save_assessment(assessment, args.output)
else:
# Print compact JSON for pipeline integration
if args.quiet:
output = asdict(assessment)
output['complexity'] = assessment.complexity.value
output['risk_level'] = assessment.risk_level.value
print(json.dumps(output, default=str))
if name == "main": main()