Skip to main content

scripts-circular-fix-detector

#!/usr/bin/env python3 """

title: "Circular Fix Detector" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Circular Fix Detection System for CODITECT-core" keywords: ['analysis', 'circular', 'detector', 'fix', 'testing'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "circular-fix-detector.py" language: python executable: true usage: "python3 scripts/circular-fix-detector.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Circular Fix Detection System for CODITECT-core

Detects when an agent is repeatedly attempting similar fixes that have already failed, using Jaccard similarity to identify circular patterns and suggest alternative strategies.

Inspired by Auto-Claude patterns but implemented from scratch for CODITECT. """

import json import re from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Set, Optional, Tuple from dataclasses import dataclass, asdict from enum import Enum

class FixOutcome(Enum): """Possible outcomes of a fix attempt.""" FAILED = "failed" SUCCESS = "success" PARTIAL = "partial" SKIPPED = "skipped"

class AlternativeStrategy(Enum): """Categories of alternative strategies when circular pattern detected.""" DIFFERENT_FILES = "different_files" DIFFERENT_METHOD = "different_method" ESCALATE_HUMAN = "escalate_human" SKIP_SUBTASK = "skip_subtask" ANALYZE_ROOT_CAUSE = "analyze_root_cause"

@dataclass class FixAttempt: """Record of a single fix attempt.""" attempt_num: int approach: str files_modified: List[str] timestamp: str outcome: str error_message: Optional[str] = None similarity_to_previous: Optional[float] = None

def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return asdict(self)

@dataclass class CircularDetectionResult: """Result of circular detection analysis.""" is_circular: bool similar_attempts: List[int] similarity_scores: List[float] suggested_strategy: Optional[AlternativeStrategy] reasoning: str

class CircularFixDetector: """ Detects circular fix patterns using Jaccard similarity.

Core algorithm:
1. Tokenize and normalize approach descriptions
2. Calculate Jaccard similarity (intersection/union)
3. Detect patterns when similarity exceeds threshold
4. Suggest alternative strategies
"""

def __init__(self, config_path: Optional[Path] = None):
"""Initialize detector with configuration."""
self.config_path = config_path or Path(__file__).parent.parent / "config" / "circular-fix-config.json"
self.state_path = Path(__file__).parent.parent / ".coditect" / "fix-attempts.json"

# Load configuration
self.config = self._load_config()

# Load stopwords
self.stopwords = self._load_stopwords()

# Load attempt history
self.history = self._load_history()

def _load_config(self) -> dict:
"""Load configuration with defaults."""
if self.config_path.exists():
with open(self.config_path, 'r') as f:
return json.load(f)

# Default configuration
return {
"similarity_threshold": 0.30,
"min_similar_attempts": 2,
"lookback_window": 5,
"auto_cleanup_on_success": True,
"enable_visualization": True
}

def _load_stopwords(self) -> Set[str]:
"""Load programming-aware stopwords."""
stopwords_path = Path(__file__).parent.parent / "data" / "stopwords.txt"

if stopwords_path.exists():
with open(stopwords_path, 'r') as f:
return set(line.strip().lower() for line in f if line.strip())

# Default programming-aware stopwords
return {
# Common English stopwords
'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'should',
'can', 'could', 'may', 'might', 'must', 'shall', 'to', 'of', 'in',
'for', 'on', 'with', 'at', 'by', 'from', 'as', 'into', 'through',
'during', 'before', 'after', 'above', 'below', 'between', 'under',
'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where',
'why', 'how', 'all', 'both', 'each', 'few', 'more', 'most', 'other',
'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so',
'than', 'too', 'very', 'just', 'and', 'but', 'or', 'if', 'because',
'while', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she',
'it', 'we', 'they', 'what', 'which', 'who', 'whom', 'whose',
# Programming filler words
'fix', 'fixed', 'fixing', 'update', 'updated', 'updating',
'change', 'changed', 'changing', 'modify', 'modified', 'modifying',
'add', 'added', 'adding', 'remove', 'removed', 'removing',
'try', 'trying', 'attempt', 'attempting'
}

def _load_history(self) -> Dict[str, List[FixAttempt]]:
"""Load attempt history from state file."""
if not self.state_path.exists():
return {}

with open(self.state_path, 'r') as f:
data = json.load(f)

# Convert dictionaries back to FixAttempt objects
history = {}
for subtask_id, attempts in data.items():
history[subtask_id] = [
FixAttempt(**attempt) for attempt in attempts
]

return history

def _save_history(self):
"""Save attempt history to state file."""
# Ensure directory exists
self.state_path.parent.mkdir(parents=True, exist_ok=True)

# Convert FixAttempt objects to dictionaries
data = {
subtask_id: [attempt.to_dict() for attempt in attempts]
for subtask_id, attempts in self.history.items()
}

with open(self.state_path, 'w') as f:
json.dump(data, f, indent=2)

def normalize_tokens(self, text: str) -> List[str]:
"""
Tokenize and normalize text for comparison.

Steps:
1. Lowercase
2. Remove punctuation (except underscores in identifiers)
3. Split on whitespace
4. Remove stopwords
5. Basic stemming for common programming terms
"""
# Lowercase
text = text.lower()

# Remove punctuation except underscores (preserve identifiers)
text = re.sub(r'[^\w\s_]', ' ', text)

# Split into tokens
tokens = text.split()

# Remove stopwords
tokens = [t for t in tokens if t not in self.stopwords]

# Basic stemming for common programming terms
stemmed = []
for token in tokens:
# Remove common suffixes
if token.endswith('ing'):
stemmed.append(token[:-3])
elif token.endswith('ed'):
stemmed.append(token[:-2])
elif token.endswith('s') and len(token) > 3:
stemmed.append(token[:-1])
else:
stemmed.append(token)

return stemmed

def calculate_jaccard_similarity(self, text1: str, text2: str) -> float:
"""
Calculate Jaccard similarity between two texts.

Jaccard similarity = |intersection| / |union|

Returns:
Float between 0.0 (no overlap) and 1.0 (identical)
"""
# Tokenize and normalize both texts
words1 = set(self.normalize_tokens(text1))
words2 = set(self.normalize_tokens(text2))

# Calculate intersection and union
intersection = words1 & words2
union = words1 | words2

# Return similarity (handle empty case)
if not union:
return 0.0

return len(intersection) / len(union)

def record_attempt(
self,
subtask_id: str,
approach: str,
files_modified: List[str],
outcome: FixOutcome,
error_message: Optional[str] = None
) -> FixAttempt:
"""
Record a fix attempt in the history.

Args:
subtask_id: Unique identifier for the subtask
approach: Description of the fix approach
files_modified: List of files that were modified
outcome: Result of the attempt
error_message: Optional error message if failed

Returns:
FixAttempt object
"""
# Get or create attempt list for this subtask
if subtask_id not in self.history:
self.history[subtask_id] = []

attempts = self.history[subtask_id]

# Calculate similarity to previous attempt if exists
similarity_to_previous = None
if attempts:
previous = attempts[-1]
similarity_to_previous = self.calculate_jaccard_similarity(
approach,
previous.approach
)

# Create new attempt
attempt = FixAttempt(
attempt_num=len(attempts) + 1,
approach=approach,
files_modified=files_modified,
timestamp=datetime.now(timezone.utc).isoformat(),
outcome=outcome.value,
error_message=error_message,
similarity_to_previous=similarity_to_previous
)

# Add to history
attempts.append(attempt)

# Save to disk
self._save_history()

# Auto-cleanup on success if enabled
if outcome == FixOutcome.SUCCESS and self.config.get("auto_cleanup_on_success", True):
self.cleanup_subtask(subtask_id)

return attempt

def is_circular_fix(
self,
subtask_id: str,
current_approach: str
) -> CircularDetectionResult:
"""
Detect if we're attempting a fix we've already tried.

Algorithm:
1. Get recent attempts (lookback window)
2. Calculate similarity to each
3. Count similar attempts (above threshold)
4. If count >= min_similar_attempts, circular detected
5. Suggest alternative strategy

Args:
subtask_id: Unique identifier for the subtask
current_approach: Description of the current fix approach

Returns:
CircularDetectionResult with detection status and recommendations
"""
# Get history for this subtask
if subtask_id not in self.history or not self.history[subtask_id]:
return CircularDetectionResult(
is_circular=False,
similar_attempts=[],
similarity_scores=[],
suggested_strategy=None,
reasoning="No previous attempts recorded"
)

attempts = self.history[subtask_id]

# Get recent attempts (lookback window)
lookback = self.config.get("lookback_window", 5)
recent_attempts = attempts[-lookback:]

# Calculate similarities
similar_attempts = []
similarity_scores = []
threshold = self.config.get("similarity_threshold", 0.30)

for attempt in recent_attempts:
similarity = self.calculate_jaccard_similarity(
current_approach,
attempt.approach
)

if similarity > threshold:
similar_attempts.append(attempt.attempt_num)
similarity_scores.append(similarity)

# Check if circular
min_similar = self.config.get("min_similar_attempts", 2)
is_circular = len(similar_attempts) >= min_similar

# Suggest strategy if circular
suggested_strategy = None
reasoning = ""

if is_circular:
suggested_strategy, reasoning = self._suggest_alternative_strategy(
subtask_id,
similar_attempts,
similarity_scores
)
else:
reasoning = f"Found {len(similar_attempts)} similar attempts (threshold: {min_similar})"

return CircularDetectionResult(
is_circular=is_circular,
similar_attempts=similar_attempts,
similarity_scores=similarity_scores,
suggested_strategy=suggested_strategy,
reasoning=reasoning
)

def _suggest_alternative_strategy(
self,
subtask_id: str,
similar_attempts: List[int],
similarity_scores: List[float]
) -> Tuple[AlternativeStrategy, str]:
"""
Suggest an alternative strategy based on pattern analysis.

Decision logic:
1. If many attempts (>5): Escalate to human
2. If high similarity (>0.7): Try different method
3. If same files modified: Try different files
4. Default: Analyze root cause
"""
attempts = self.history[subtask_id]

# Check total attempt count
if len(attempts) > 5:
return (
AlternativeStrategy.ESCALATE_HUMAN,
f"Already tried {len(attempts)} times. Human intervention recommended."
)

# Check average similarity
avg_similarity = sum(similarity_scores) / len(similarity_scores)
if avg_similarity > 0.7:
return (
AlternativeStrategy.DIFFERENT_METHOD,
f"Very high similarity ({avg_similarity:.2%}). Try a completely different approach."
)

# Check if same files are being modified
recent_files = set()
for attempt in attempts[-3:]:
recent_files.update(attempt.files_modified)

if len(recent_files) <= 2:
return (
AlternativeStrategy.DIFFERENT_FILES,
f"Repeatedly modifying same files: {', '.join(recent_files)}. Try different files."
)

# Default: analyze root cause
return (
AlternativeStrategy.ANALYZE_ROOT_CAUSE,
f"Detected circular pattern ({len(similar_attempts)} similar attempts). "
"Recommend deeper root cause analysis before next attempt."
)

def cleanup_subtask(self, subtask_id: str):
"""Remove attempt history for a completed subtask."""
if subtask_id in self.history:
del self.history[subtask_id]
self._save_history()

def get_visualization(self, subtask_id: str) -> str:
"""
Generate a text visualization of attempt history and similarities.

Returns:
Formatted string showing attempt timeline and similarity scores
"""
if subtask_id not in self.history or not self.history[subtask_id]:
return "No attempts recorded for this subtask."

attempts = self.history[subtask_id]

lines = [
f"\n=== Fix Attempt History for {subtask_id} ===\n",
f"Total Attempts: {len(attempts)}\n"
]

for i, attempt in enumerate(attempts):
lines.append(f"\nAttempt #{attempt.attempt_num}:")
lines.append(f" Approach: {attempt.approach[:80]}...")
lines.append(f" Files: {', '.join(attempt.files_modified)}")
lines.append(f" Outcome: {attempt.outcome}")

if attempt.similarity_to_previous is not None:
# Visual similarity bar
bar_length = int(attempt.similarity_to_previous * 20)
bar = '█' * bar_length + '░' * (20 - bar_length)
lines.append(f" Similarity to previous: [{bar}] {attempt.similarity_to_previous:.2%}")

lines.append(f" Timestamp: {attempt.timestamp}")

return '\n'.join(lines)

def get_summary(self) -> dict:
"""Get summary statistics across all subtasks."""
total_subtasks = len(self.history)
total_attempts = sum(len(attempts) for attempts in self.history.values())

# Calculate average attempts per subtask
avg_attempts = total_attempts / total_subtasks if total_subtasks > 0 else 0

# Count outcomes
outcomes = {outcome.value: 0 for outcome in FixOutcome}
for attempts in self.history.values():
for attempt in attempts:
outcomes[attempt.outcome] = outcomes.get(attempt.outcome, 0) + 1

# Find subtasks with most attempts
most_attempts = sorted(
[(sid, len(attempts)) for sid, attempts in self.history.items()],
key=lambda x: x[1],
reverse=True
)[:5]

return {
"total_subtasks": total_subtasks,
"total_attempts": total_attempts,
"average_attempts_per_subtask": round(avg_attempts, 2),
"outcomes": outcomes,
"subtasks_with_most_attempts": most_attempts,
"config": self.config
}

def main(): """CLI interface for testing and demonstration.""" import argparse

parser = argparse.ArgumentParser(description="Circular Fix Detection System")
parser.add_argument('--subtask', help="Subtask ID")
parser.add_argument('--approach', help="Fix approach description")
parser.add_argument('--files', nargs='+', help="Files modified")
parser.add_argument('--outcome', choices=['failed', 'success', 'partial', 'skipped'],
help="Outcome of attempt")
parser.add_argument('--check', action='store_true',
help="Check if current approach is circular")
parser.add_argument('--visualize', action='store_true',
help="Show visualization for subtask")
parser.add_argument('--summary', action='store_true',
help="Show summary statistics")
parser.add_argument('--cleanup', action='store_true',
help="Cleanup completed subtask")

args = parser.parse_args()

detector = CircularFixDetector()

if args.summary:
summary = detector.get_summary()
print("\n=== Circular Fix Detection Summary ===")
print(json.dumps(summary, indent=2))
return

if args.visualize and args.subtask:
print(detector.get_visualization(args.subtask))
return

if args.cleanup and args.subtask:
detector.cleanup_subtask(args.subtask)
print(f"Cleaned up history for {args.subtask}")
return

if args.check and args.subtask and args.approach:
result = detector.is_circular_fix(args.subtask, args.approach)

print(f"\n=== Circular Detection Result ===")
print(f"Is Circular: {result.is_circular}")
print(f"Similar Attempts: {result.similar_attempts}")
print(f"Similarity Scores: {[f'{s:.2%}' for s in result.similarity_scores]}")
print(f"Suggested Strategy: {result.suggested_strategy.value if result.suggested_strategy else 'None'}")
print(f"Reasoning: {result.reasoning}")

if detector.config.get("enable_visualization", True):
print(detector.get_visualization(args.subtask))
return

if args.subtask and args.approach and args.files and args.outcome:
outcome = FixOutcome(args.outcome)
attempt = detector.record_attempt(
args.subtask,
args.approach,
args.files,
outcome
)
print(f"\n=== Recorded Attempt #{attempt.attempt_num} ===")
print(f"Subtask: {args.subtask}")
print(f"Approach: {args.approach}")
print(f"Files: {', '.join(args.files)}")
print(f"Outcome: {outcome.value}")
if attempt.similarity_to_previous:
print(f"Similarity to previous: {attempt.similarity_to_previous:.2%}")
return

parser.print_help()

if name == 'main': main()