Skip to main content

scripts-dependency-resolver

#!/usr/bin/env python3 """

title: "Dependency Resolver" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Dependency Resolver for CODITECT Autonomous Orchestration System" keywords: ['analysis', 'database', 'dependency', 'resolver'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "dependency-resolver.py" language: python executable: true usage: "python3 scripts/dependency-resolver.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Dependency Resolver for CODITECT Autonomous Orchestration System

Analyzes task dependencies, detects cycles, determines execution order, and checks task readiness based on dependency completion status.

Usage: ./dependency-resolver.py --build # Build dependency graph from database ./dependency-resolver.py --check T001.001 # Check if task is ready ./dependency-resolver.py --cycles # Detect dependency cycles ./dependency-resolver.py --order # Show execution order ./dependency-resolver.py --visualize # Output DOT format ./dependency-resolver.py --blocking T001.001 # Show blocking tasks """

import argparse import json import os import re import sqlite3 import sys from collections import defaultdict, deque from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set, Tuple

ADR-114: Use centralized path discovery

_script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))

try: from scripts.core.paths import ( get_context_storage_dir, get_sessions_db_path, SESSIONS_DB, ) CONTEXT_STORAGE = get_context_storage_dir() _SESSIONS_DB = SESSIONS_DB except ImportError: _new_location = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" CONTEXT_STORAGE = _new_location if _new_location.exists() else Path.home() / ".coditect" / "context-storage" _SESSIONS_DB = CONTEXT_STORAGE / "sessions.db"

NOTE: context.db is DEPRECATED - NO FALLBACK per ADR-118

class DependencyGraph: """Dependency graph for task orchestration with cycle detection and topological sorting."""

# Regex patterns for dependency extraction
DEPENDENCY_PATTERNS = [
r'after\s+(T\d{3}\.\d{3})',
r'depends\s+on\s+(T\d{3}\.\d{3})',
r'blocked\s+by[:\s]+(T\d{3}\.\d{3})',
r'requires\s+(T\d{3}\.\d{3})',
r'prerequisite[:\s]+(T\d{3}\.\d{3})',
r'needs\s+(T\d{3}\.\d{3})',
r'awaiting\s+(T\d{3}\.\d{3})',
]

def __init__(self, db_path: Optional[str] = None):
"""
Initialize dependency graph.

Args:
db_path: Path to sessions.db database (ADR-118 Tier 3)
"""
self.graph: Dict[str, List[str]] = defaultdict(list) # task_id -> [dependencies]
self.reverse_graph: Dict[str, List[str]] = defaultdict(list) # task_id -> [dependents]
self.task_status: Dict[str, str] = {} # task_id -> status
self.task_descriptions: Dict[str, str] = {} # task_id -> description
self.db_path = db_path or self._find_database()

def _find_database(self) -> str:
"""Find sessions.db in standard locations (ADR-118 compliant).

Task dependencies are Tier 3 (session-scoped) data in sessions.db.
NOTE: context.db is DEPRECATED - NO FALLBACK per ADR-118.
"""
search_paths = [
_SESSIONS_DB, # ADR-118 Tier 3 primary
Path.cwd() / "sessions.db",
Path.cwd() / ".coditect" / "sessions.db",
Path.cwd().parent / "sessions.db",
Path.cwd().parent / ".coditect" / "sessions.db",
Path.home() / ".coditect" / "context-storage" / "sessions.db",
]

for path in search_paths:
if path.exists():
return str(path)

raise FileNotFoundError(
"sessions.db not found. Please specify path with --db-path or run /cx to create it. "
"NOTE: context.db is DEPRECATED - NO FALLBACK per ADR-118."
)

def add_dependency(self, task_id: str, depends_on: str) -> None:
"""
Add dependency edge to graph.

Args:
task_id: Task that has the dependency
depends_on: Task that must be completed first
"""
if depends_on not in self.graph[task_id]:
self.graph[task_id].append(depends_on)

if task_id not in self.reverse_graph[depends_on]:
self.reverse_graph[depends_on].append(task_id)

def parse_dependencies(self, task: Dict) -> List[str]:
"""
Extract dependencies from task description using regex patterns.

Args:
task: Task dictionary with 'task_id' and 'description' keys

Returns:
List of task IDs that this task depends on
"""
dependencies = []
description = task.get("description", "")
notes = task.get("notes", "")
full_text = f"{description} {notes}"

for pattern in self.DEPENDENCY_PATTERNS:
matches = re.findall(pattern, full_text, re.IGNORECASE)
dependencies.extend(matches)

# Remove duplicates while preserving order
seen = set()
unique_deps = []
for dep in dependencies:
dep_upper = dep.upper()
if dep_upper not in seen:
seen.add(dep_upper)
unique_deps.append(dep_upper)

return unique_deps

def get_dependencies(self, task_id: str) -> List[str]:
"""
Get list of dependencies for a task.

Args:
task_id: Task identifier

Returns:
List of task IDs this task depends on
"""
return self.graph.get(task_id, [])

def get_dependents(self, task_id: str) -> List[str]:
"""
Get list of tasks that depend on this task.

Args:
task_id: Task identifier

Returns:
List of task IDs that depend on this task
"""
return self.reverse_graph.get(task_id, [])

def is_ready(self, task_id: str) -> Tuple[bool, List[str]]:
"""
Check if all dependencies are completed.

Args:
task_id: Task identifier

Returns:
Tuple of (is_ready, blocking_tasks)
"""
dependencies = self.get_dependencies(task_id)

if not dependencies:
return True, []

blocking = []
for dep_id in dependencies:
status = self.task_status.get(dep_id, "pending")
if status not in ["completed", "verified"]:
blocking.append(dep_id)

return len(blocking) == 0, blocking

def detect_cycles(self) -> List[List[str]]:
"""
Detect cycles in dependency graph using DFS.

Returns:
List of cycles, where each cycle is a list of task IDs
"""
cycles = []
visited = set()
rec_stack = set()
path = []

def dfs(node: str) -> bool:
"""DFS helper for cycle detection."""
visited.add(node)
rec_stack.add(node)
path.append(node)

for neighbor in self.graph.get(node, []):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in rec_stack:
# Cycle detected - extract cycle from path
cycle_start = path.index(neighbor)
cycle = path[cycle_start:] + [neighbor]
cycles.append(cycle)
return True

path.pop()
rec_stack.remove(node)
return False

# Check all nodes
for node in self.graph.keys():
if node not in visited:
dfs(node)

return cycles

def get_execution_order(self) -> List[str]:
"""
Get topological execution order using Kahn's algorithm.

Returns:
List of task IDs in execution order

Raises:
ValueError: If graph contains cycles
"""
# Check for cycles first
cycles = self.detect_cycles()
if cycles:
cycle_strs = [" -> ".join(cycle) for cycle in cycles]
raise ValueError(f"Cannot create execution order - cycles detected:\n" + "\n".join(cycle_strs))

# Calculate in-degrees
in_degree = defaultdict(int)
all_nodes = set(self.graph.keys())
for deps in self.graph.values():
all_nodes.update(deps)

for node in all_nodes:
in_degree[node] = len(self.graph.get(node, []))

# Find nodes with no dependencies
queue = deque([node for node in all_nodes if in_degree[node] == 0])
execution_order = []

while queue:
node = queue.popleft()
execution_order.append(node)

# Reduce in-degree for dependent nodes
for dependent in self.reverse_graph.get(node, []):
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)

return execution_order

def build_from_database(self) -> None:
"""Load all tasks from database and build dependency graph."""
try:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()

# Load tasks from v2_tasks table
cursor.execute("""
SELECT task_id, description, status, priority, feature_id
FROM v2_tasks
ORDER BY task_id
""")

tasks = []
for row in cursor.fetchall():
task = {
"task_id": row["task_id"],
"description": row["description"],
"status": row["status"],
"priority": row["priority"],
"feature_id": row["feature_id"] or "",
}
tasks.append(task)

# Store status and description
self.task_status[task["task_id"]] = task["status"]
self.task_descriptions[task["task_id"]] = task["description"]

# Parse dependencies and build graph
for task in tasks:
dependencies = self.parse_dependencies(task)
for dep in dependencies:
self.add_dependency(task["task_id"], dep)

# Try to load from task_dependencies table if it exists
try:
cursor.execute("""
SELECT task_id, depends_on
FROM task_dependencies
""")
for row in cursor.fetchall():
self.add_dependency(row["task_id"], row["depends_on"])
except sqlite3.OperationalError:
# table doesn't exist yet - that's ok, we parsed from descriptions
pass

conn.close()

print(f"✓ Loaded {len(tasks)} tasks from database")
print(f"✓ Found {sum(len(deps) for deps in self.graph.values())} dependencies")

except sqlite3.Error as e:
raise RuntimeError(f"Database error: {e}")

def visualize_dot(self) -> str:
"""
Generate GraphViz DOT format for visualization.

Returns:
DOT format string
"""
lines = ["digraph Dependencies {", ' rankdir=LR;', ' node [shape=box];', ""]

# Add nodes with status colors
status_colors = {
"completed": "lightgreen",
"verified": "darkgreen",
"in_progress": "yellow",
"blocked": "orange",
"pending": "lightgray",
}

all_nodes = set(self.graph.keys())
for deps in self.graph.values():
all_nodes.update(deps)

for node in sorted(all_nodes):
status = self.task_status.get(node, "pending")
color = status_colors.get(status, "white")
label = node
if node in self.task_descriptions:
desc = self.task_descriptions[node][:50]
label = f"{node}\\n{desc}"
lines.append(f' "{node}" [fillcolor={color}, style=filled, label="{label}"];')

lines.append("")

# Add edges
for task_id, dependencies in sorted(self.graph.items()):
for dep in dependencies:
lines.append(f' "{dep}" -> "{task_id}";')

lines.append("}")
return "\n".join(lines)

def get_blocking_tasks(self, task_id: str) -> Dict[str, List[str]]:
"""
Get detailed information about what's blocking a task.

Args:
task_id: Task identifier

Returns:
Dictionary with direct_blockers and transitive_blockers
"""
direct = []
dependencies = self.get_dependencies(task_id)

for dep in dependencies:
status = self.task_status.get(dep, "pending")
if status not in ["completed", "verified"]:
direct.append(dep)

# Find transitive blockers (blockers of blockers)
transitive = []
visited = set()

def find_transitive(node: str) -> None:
if node in visited:
return
visited.add(node)

for dep in self.get_dependencies(node):
status = self.task_status.get(dep, "pending")
if status not in ["completed", "verified"]:
if dep not in direct:
transitive.append(dep)
find_transitive(dep)

for blocker in direct:
find_transitive(blocker)

return {"direct_blockers": direct, "transitive_blockers": transitive}

def main(): """Main CLI entry point.""" parser = argparse.ArgumentParser( description="CODITECT Dependency Resolver - Task dependency analysis and execution planning", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s --build # Build dependency graph from database %(prog)s --check T001.001 # Check if task is ready %(prog)s --cycles # Detect dependency cycles %(prog)s --order # Show execution order %(prog)s --visualize # Output DOT format for graphviz %(prog)s --blocking T001.001 # Show what tasks are blocking this one """, )

parser.add_argument("--db-path", help="Path to sessions.db database (ADR-118 Tier 3)")
parser.add_argument("--build", action="store_true", help="Build dependency graph from database")
parser.add_argument("--check", metavar="TASK_ID", help="Check if specific task is ready")
parser.add_argument("--cycles", action="store_true", help="Detect and report any cycles")
parser.add_argument("--order", action="store_true", help="Show topological execution order")
parser.add_argument("--visualize", action="store_true", help="Output DOT format for graphviz")
parser.add_argument("--blocking", metavar="TASK_ID", help="Show what tasks are blocking this one")
parser.add_argument(
"--output", help="Output file (for --visualize or --order), default: stdout"
)

args = parser.parse_args()

# Need at least one action
if not any([args.build, args.check, args.cycles, args.order, args.visualize, args.blocking]):
parser.print_help()
sys.exit(1)

try:
# Initialize graph
graph = DependencyGraph(db_path=args.db_path)

# Build graph from database
if args.build or args.check or args.cycles or args.order or args.blocking:
print("Building dependency graph from database...")
graph.build_from_database()
print()

# Check task readiness
if args.check:
task_id = args.check.upper()
is_ready, blocking = graph.is_ready(task_id)

print(f"Task: {task_id}")
print(f"Status: {graph.task_status.get(task_id, 'unknown')}")
print(f"Ready: {'✓ YES' if is_ready else '✗ NO'}")

if blocking:
print(f"\nBlocked by {len(blocking)} task(s):")
for blocker in blocking:
status = graph.task_status.get(blocker, "unknown")
desc = graph.task_descriptions.get(blocker, "")[:60]
print(f" • {blocker} [{status}] - {desc}")
else:
dependencies = graph.get_dependencies(task_id)
if dependencies:
print(f"\n✓ All {len(dependencies)} dependencies completed")
else:
print("\n✓ No dependencies")

# Detect cycles
if args.cycles:
print("Detecting dependency cycles...")
cycles = graph.detect_cycles()

if cycles:
print(f"\n✗ Found {len(cycles)} cycle(s):")
for i, cycle in enumerate(cycles, 1):
print(f"\n Cycle {i}: {' -> '.join(cycle)}")
sys.exit(1)
else:
print("\n✓ No cycles detected - dependency graph is valid")

# Show execution order
if args.order:
print("Calculating topological execution order...")
try:
order = graph.get_execution_order()

output = ["# Task Execution Order", ""]
for i, task_id in enumerate(order, 1):
status = graph.task_status.get(task_id, "pending")
desc = graph.task_descriptions.get(task_id, "")[:70]
output.append(f"{i:3}. {task_id} [{status:12}] - {desc}")

output.append("")
output.append(f"Total tasks: {len(order)}")

result = "\n".join(output)

if args.output:
Path(args.output).write_text(result)
print(f"\n✓ Execution order written to {args.output}")
else:
print()
print(result)

except ValueError as e:
print(f"\n✗ Error: {e}")
sys.exit(1)

# Visualize as DOT
if args.visualize:
print("Generating GraphViz DOT format...")
dot = graph.visualize_dot()

if args.output:
Path(args.output).write_text(dot)
print(f"\n✓ DOT file written to {args.output}")
print(f"\nTo generate PNG: dot -Tpng {args.output} -o graph.png")
else:
print()
print(dot)

# Show blocking tasks
if args.blocking:
task_id = args.blocking.upper()
blocking_info = graph.get_blocking_tasks(task_id)

print(f"Blocking analysis for: {task_id}")
print(f"Status: {graph.task_status.get(task_id, 'unknown')}")
print()

direct = blocking_info["direct_blockers"]
transitive = blocking_info["transitive_blockers"]

if not direct and not transitive:
print("✓ No blocking tasks - ready to execute")
else:
if direct:
print(f"Direct blockers ({len(direct)}):")
for blocker in direct:
status = graph.task_status.get(blocker, "unknown")
desc = graph.task_descriptions.get(blocker, "")[:60]
print(f" • {blocker} [{status}] - {desc}")

if transitive:
print(f"\nTransitive blockers ({len(transitive)}):")
for blocker in transitive:
status = graph.task_status.get(blocker, "unknown")
desc = graph.task_descriptions.get(blocker, "")[:60]
print(f" • {blocker} [{status}] - {desc}")

print()

except FileNotFoundError as e:
print(f"✗ Error: {e}", file=sys.stderr)
sys.exit(1)
except RuntimeError as e:
print(f"✗ Error: {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("\n\nInterrupted by user")
sys.exit(130)

if name == "main": main()