scripts-qa-validation-loop
#!/usr/bin/env python3 """ā
title: "Constants" component_type: script version: "1.0.0" audience: contributor status: stable summary: "QA Self-Healing Loop - Autonomous Quality Assurance with Auto-Remediation" keywords: ['analysis', 'loop', 'review', 'validation'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "qa-validation-loop.py" language: python executable: true usage: "python3 scripts/qa-validation-loop.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: falseā
QA Self-Healing Loop - Autonomous Quality Assurance with Auto-Remediation
Implements an autonomous QA validation loop inspired by Auto-Claude patterns:
- Run QA review using council-orchestrator (multi-agent review)
- If approved ā SUCCESS
- If rejected ā Check for recurring issues
- If recurring (3+ similar) ā Escalate to human
- Else ā Run fixer agent, continue loop
- Track state, detect patterns, escalate intelligently
Part of CODITECT Core Framework Created: 2025-12-22 Version: 1.0.0
Usage: # Basic validation python3 scripts/qa-validation-loop.py path/to/artifact.md
# With custom config
python3 scripts/qa-validation-loop.py path/to/artifact.md --config custom-config.json
# Dry run (no fixes applied)
python3 scripts/qa-validation-loop.py path/to/artifact.md --dry-run
# Verbose output
python3 scripts/qa-validation-loop.py path/to/artifact.md --verbose
"""
import sys import json import argparse import subprocess import hashlib import difflib import re from pathlib import Path from datetime import datetime, timezone from typing import Dict, Any, List, Optional, Tuple from dataclasses import dataclass, asdict from collections import defaultdict
Constants
MAX_ITERATIONS = 50 MAX_CONSECUTIVE_ERRORS = 3 RECURRING_THRESHOLD = 3 SIMILARITY_THRESHOLD = 0.8
@dataclass class QAResult: """Result from a QA review iteration""" status: str # "approved", "rejected", "error" score: float # 0.0-1.0 critical_findings: int high_findings: int medium_findings: int low_findings: int issues: List[Dict[str, Any]] consensus: Optional[float] = None verdict: Optional[str] = None timestamp: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return asdict(self)
@dataclass class IterationRecord: """Record of a single loop iteration""" iteration: int status: str issues_found: int duration_seconds: float timestamp: str qa_result: Optional[QAResult] = None error_message: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
data = asdict(self)
if self.qa_result:
data['qa_result'] = self.qa_result.to_dict()
return data
class IssueNormalizer: """Normalize and compare issues for similarity detection"""
@staticmethod
def normalize_key(issue_text: str) -> str:
"""
Normalize issue text for comparison
Args:
issue_text: Raw issue description
Returns:
Normalized lowercase text with punctuation removed
"""
# Convert to lowercase
normalized = issue_text.lower()
# Remove punctuation
normalized = re.sub(r'[^\w\s]', '', normalized)
# Remove extra whitespace
normalized = ' '.join(normalized.split())
return normalized
@staticmethod
def calculate_similarity(text1: str, text2: str) -> float:
"""
Calculate similarity between two issue texts
Args:
text1: First issue text
text2: Second issue text
Returns:
Similarity score 0.0-1.0
"""
norm1 = IssueNormalizer.normalize_key(text1)
norm2 = IssueNormalizer.normalize_key(text2)
return difflib.SequenceMatcher(None, norm1, norm2).ratio()
@staticmethod
def is_similar(text1: str, text2: str, threshold: float = SIMILARITY_THRESHOLD) -> bool:
"""
Check if two issues are similar
Args:
text1: First issue text
text2: Second issue text
threshold: Similarity threshold (default 0.8)
Returns:
True if similar, False otherwise
"""
return IssueNormalizer.calculate_similarity(text1, text2) >= threshold
class RecurringIssueDetector: """Detect and track recurring issues across iterations"""
def __init__(self, threshold: int = RECURRING_THRESHOLD,
similarity_threshold: float = SIMILARITY_THRESHOLD):
"""
Initialize detector
Args:
threshold: Number of occurrences to consider recurring
similarity_threshold: Similarity threshold for matching issues
"""
self.threshold = threshold
self.similarity_threshold = similarity_threshold
self.issue_history: List[str] = []
self.issue_counts: Dict[str, int] = defaultdict(int)
def add_issues(self, issues: List[Dict[str, Any]]) -> None:
"""
Add issues from current iteration
Args:
issues: List of issue dictionaries
"""
for issue in issues:
issue_text = issue.get('title', '') + ' ' + issue.get('description', '')
normalized = IssueNormalizer.normalize_key(issue_text)
# Check if similar to any existing issue
matched = False
for existing_key in list(self.issue_counts.keys()):
if IssueNormalizer.is_similar(normalized, existing_key,
self.similarity_threshold):
self.issue_counts[existing_key] += 1
matched = True
break
if not matched:
self.issue_counts[normalized] = 1
self.issue_history.append(normalized)
def has_recurring_issues(self) -> bool:
"""
Check if any issues have recurred beyond threshold
Returns:
True if recurring issues detected
"""
return any(count >= self.threshold for count in self.issue_counts.values())
def get_recurring_issues(self) -> List[Tuple[str, int]]:
"""
Get list of recurring issues with counts
Returns:
List of (issue_text, count) tuples for recurring issues
"""
return [(issue, count) for issue, count in self.issue_counts.items()
if count >= self.threshold]
def get_top_issues(self, n: int = 5) -> List[Tuple[str, int]]:
"""
Get top N most frequent issues
Args:
n: Number of top issues to return
Returns:
List of (issue_text, count) tuples
"""
sorted_issues = sorted(self.issue_counts.items(),
key=lambda x: x[1], reverse=True)
return sorted_issues[:n]
class QAHistoryManager: """Manage QA loop history persistence"""
def __init__(self, history_file: Path):
"""
Initialize history manager
Args:
history_file: Path to history JSON file
"""
self.history_file = history_file
self.history: Dict[str, Any] = self._load_history()
def _load_history(self) -> Dict[str, Any]:
"""Load history from file"""
if self.history_file.exists():
with open(self.history_file, 'r') as f:
return json.load(f)
return {
"version": "1.0.0",
"runs": []
}
def save_history(self) -> None:
"""Save history to file"""
self.history_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.history_file, 'w') as f:
json.dump(self.history, f, indent=2)
def add_run(self, artifact_path: str, iterations: List[IterationRecord],
result: str, escalation_reason: Optional[str] = None) -> None:
"""
Add a completed run to history
Args:
artifact_path: Path to artifact that was validated
iterations: List of iteration records
result: Final result ("success", "escalated", "error")
escalation_reason: Reason for escalation if applicable
"""
run_record = {
"artifact": artifact_path,
"timestamp": datetime.now(timezone.utc).isoformat(),
"total_iterations": len(iterations),
"result": result,
"escalation_reason": escalation_reason,
"iterations": [iter_rec.to_dict() for iter_rec in iterations]
}
self.history["runs"].append(run_record)
self.save_history()
class QASelfHealingLoop: """Main QA self-healing loop implementation"""
def __init__(self, artifact_path: Path, config: Dict[str, Any],
dry_run: bool = False, verbose: bool = False):
"""
Initialize QA loop
Args:
artifact_path: Path to artifact to validate
config: Configuration dictionary
dry_run: If True, don't apply fixes
verbose: If True, print detailed output
"""
self.artifact_path = artifact_path
self.config = config
self.dry_run = dry_run
self.verbose = verbose
# Extract config parameters
loop_params = config.get('loop_parameters', {})
self.max_iterations = loop_params.get('max_iterations', MAX_ITERATIONS)
self.max_consecutive_errors = loop_params.get('max_consecutive_errors',
MAX_CONSECUTIVE_ERRORS)
self.recurring_threshold = loop_params.get('recurring_threshold',
RECURRING_THRESHOLD)
self.similarity_threshold = loop_params.get('similarity_threshold',
SIMILARITY_THRESHOLD)
# Initialize components
self.issue_detector = RecurringIssueDetector(
threshold=self.recurring_threshold,
similarity_threshold=self.similarity_threshold
)
history_file = Path(config['state_tracking']['history_file'])
self.history_manager = QAHistoryManager(history_file)
# State tracking
self.iterations: List[IterationRecord] = []
self.consecutive_errors = 0
def log(self, message: str, level: str = "INFO") -> None:
"""
Log message if verbose mode enabled
Args:
message: Message to log
level: Log level (INFO, WARNING, ERROR)
"""
if self.verbose or level in ["WARNING", "ERROR"]:
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def run_qa_review(self) -> QAResult:
"""
Run QA review using council-orchestrator
Returns:
QAResult object with review outcome
"""
self.log(f"Running QA review for {self.artifact_path}")
# In a real implementation, this would invoke the council-orchestrator agent
# For now, we simulate the integration point
# TODO: Integrate with actual council-orchestrator agent
try:
# Simulated QA review result
# In production, this would call: Task(subagent_type="council-orchestrator", ...)
result = QAResult(
status="approved", # or "rejected"
score=0.85,
critical_findings=0,
high_findings=1,
medium_findings=3,
low_findings=5,
issues=[],
consensus=0.75,
verdict="APPROVED",
timestamp=datetime.now(timezone.utc).isoformat()
)
self.log(f"QA Review complete: {result.verdict} (score: {result.score:.2f})")
return result
except Exception as e:
self.log(f"QA review error: {e}", "ERROR")
return QAResult(
status="error",
score=0.0,
critical_findings=0,
high_findings=0,
medium_findings=0,
low_findings=0,
issues=[],
timestamp=datetime.now(timezone.utc).isoformat()
)
def run_fixer(self, qa_result: QAResult) -> bool:
"""
Run fixer agent to remediate issues
Args:
qa_result: QA result with issues to fix
Returns:
True if fixes applied successfully
"""
if self.dry_run:
self.log("DRY RUN: Skipping fixer execution", "WARNING")
return False
self.log(f"Running fixer agent for {len(qa_result.issues)} issues")
# In a real implementation, this would invoke the qa-reviewer agent
# with the specific issues to fix
# TODO: Integrate with actual qa-reviewer agent
try:
# Simulated fixer execution
# In production, this would call: Task(subagent_type="qa-reviewer", ...)
self.log("Fixer execution complete")
return True
except Exception as e:
self.log(f"Fixer error: {e}", "ERROR")
return False
def should_escalate(self) -> Tuple[bool, Optional[str]]:
"""
Check if conditions require human escalation
Returns:
Tuple of (should_escalate, reason)
"""
# Check for recurring issues
if self.issue_detector.has_recurring_issues():
recurring = self.issue_detector.get_recurring_issues()
reason = f"Recurring issues detected: {len(recurring)} issues occurring {self.recurring_threshold}+ times"
return True, reason
# Check for consecutive errors
if self.consecutive_errors >= self.max_consecutive_errors:
reason = f"Consecutive errors threshold reached: {self.consecutive_errors}/{self.max_consecutive_errors}"
return True, reason
# Check for max iterations
if len(self.iterations) >= self.max_iterations:
reason = f"Max iterations reached: {len(self.iterations)}/{self.max_iterations}"
return True, reason
return False, None
def generate_escalation_report(self, reason: str) -> Path:
"""
Generate human escalation report
Args:
reason: Escalation reason
Returns:
Path to generated report
"""
self.log(f"Generating escalation report: {reason}", "WARNING")
# Load template
template_path = Path(self.config['output']['escalation_template'])
with open(template_path, 'r') as f:
template = f.read()
# Build iteration history table
history_rows = []
for iter_rec in self.iterations:
history_rows.append(
f"| {iter_rec.iteration} | {iter_rec.status} | "
f"{iter_rec.issues_found} | {iter_rec.duration_seconds:.2f}s | "
f"{iter_rec.timestamp} |"
)
history_table = '\n'.join(history_rows)
# Build recurring issues section
recurring = self.issue_detector.get_recurring_issues()
if recurring:
recurring_section = "### Most Recurring Issues\n\n"
for issue, count in recurring:
recurring_section += f"- **{count} occurrences:** {issue[:100]}...\n"
else:
recurring_section = "No recurring issues detected (escalation triggered by other condition)."
# Build issue frequency analysis
top_issues = self.issue_detector.get_top_issues(10)
frequency_lines = []
for issue, count in top_issues:
frequency_lines.append(f"- **{count}x:** {issue[:80]}...")
frequency_analysis = '\n'.join(frequency_lines) if frequency_lines else "No issues tracked."
# Get last QA result
last_qa = self.iterations[-1].qa_result if self.iterations else None
last_qa_json = json.dumps(last_qa.to_dict(), indent=2) if last_qa else "N/A"
# Fill template
report_content = template.format(
timestamp=datetime.now(timezone.utc).isoformat(),
artifact_path=str(self.artifact_path),
iteration_number=len(self.iterations),
max_iterations=self.max_iterations,
escalation_reason=reason,
escalation_trigger=reason,
recurring_issues_section=recurring_section,
iteration_history_table=history_table,
last_qa_review_json=last_qa_json,
issue_frequency_analysis=frequency_analysis,
top_issue_count=len(top_issues),
config_snapshot=json.dumps(self.config, indent=2),
report_path="[Generated on write]"
)
# Write report
report_dir = Path(".coditect/qa-escalations")
report_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
artifact_name = self.artifact_path.stem
report_path = report_dir / f"QA_ESCALATION-{artifact_name}-{timestamp}.md"
with open(report_path, 'w') as f:
# Update report_path placeholder
report_content = report_content.replace(
"report_path=\"[Generated on write]\"",
f"report_path=\"{report_path}\""
)
f.write(report_content)
self.log(f"Escalation report generated: {report_path}", "WARNING")
return report_path
def run(self) -> Tuple[str, Optional[Path]]:
"""
Execute the QA self-healing loop
Returns:
Tuple of (result, escalation_report_path)
result: "success", "escalated", "error"
escalation_report_path: Path to report if escalated, else None
"""
self.log(f"Starting QA self-healing loop for {self.artifact_path}")
self.log(f"Max iterations: {self.max_iterations}, Dry run: {self.dry_run}")
iteration = 0
while iteration < self.max_iterations:
iteration += 1
start_time = datetime.now()
self.log(f"\n=== Iteration {iteration}/{self.max_iterations} ===")
# Run QA review
qa_result = self.run_qa_review()
# Check for errors
if qa_result.status == "error":
self.consecutive_errors += 1
self.log(f"Error in iteration {iteration} (consecutive: {self.consecutive_errors})",
"ERROR")
iter_record = IterationRecord(
iteration=iteration,
status="error",
issues_found=0,
duration_seconds=(datetime.now() - start_time).total_seconds(),
timestamp=datetime.now(timezone.utc).isoformat(),
qa_result=qa_result,
error_message="QA review failed"
)
self.iterations.append(iter_record)
# Check if should escalate
should_esc, esc_reason = self.should_escalate()
if should_esc:
report_path = self.generate_escalation_report(esc_reason)
self.history_manager.add_run(
str(self.artifact_path),
self.iterations,
"escalated",
esc_reason
)
return "escalated", report_path
continue
# Reset error counter on success
self.consecutive_errors = 0
# Add issues to detector
self.issue_detector.add_issues(qa_result.issues)
# Record iteration
iter_record = IterationRecord(
iteration=iteration,
status=qa_result.status,
issues_found=len(qa_result.issues),
duration_seconds=(datetime.now() - start_time).total_seconds(),
timestamp=datetime.now(timezone.utc).isoformat(),
qa_result=qa_result
)
self.iterations.append(iter_record)
# Check if approved
if qa_result.status == "approved":
self.log(f"ā QA approved after {iteration} iterations", "INFO")
self.history_manager.add_run(
str(self.artifact_path),
self.iterations,
"success"
)
return "success", None
# Check if should escalate before attempting fix
should_esc, esc_reason = self.should_escalate()
if should_esc:
report_path = self.generate_escalation_report(esc_reason)
self.history_manager.add_run(
str(self.artifact_path),
self.iterations,
"escalated",
esc_reason
)
return "escalated", report_path
# Run fixer
self.log(f"QA rejected, running fixer agent...")
fix_success = self.run_fixer(qa_result)
if not fix_success:
self.log("Fixer failed, continuing to next iteration", "WARNING")
# Max iterations reached
self.log(f"Max iterations reached ({self.max_iterations})", "WARNING")
report_path = self.generate_escalation_report(
f"Max iterations reached: {self.max_iterations}"
)
self.history_manager.add_run(
str(self.artifact_path),
self.iterations,
"escalated",
"Max iterations reached"
)
return "escalated", report_path
def load_config(config_path: Optional[Path] = None) -> Dict[str, Any]: """ Load QA loop configuration
Args:
config_path: Optional custom config path
Returns:
Configuration dictionary
"""
if config_path is None:
config_path = Path(__file__).parent.parent / "config" / "qa-loop-config.json"
if not config_path.exists():
print(f"Error: Config file not found: {config_path}")
sys.exit(1)
with open(config_path, 'r') as f:
return json.load(f)
def main() -> int: """Main entry point""" parser = argparse.ArgumentParser( description="QA Self-Healing Loop - Autonomous quality assurance with auto-remediation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples:
Basic validation
python3 scripts/qa-validation-loop.py docs/architecture/decisions/ADR-025.md
With custom config
python3 scripts/qa-validation-loop.py artifact.md --config custom-config.json
Dry run (no fixes)
python3 scripts/qa-validation-loop.py artifact.md --dry-run
Verbose output
python3 scripts/qa-validation-loop.py artifact.md --verbose """ )
parser.add_argument(
"artifact",
type=Path,
help="Path to artifact to validate"
)
parser.add_argument(
"--config",
type=Path,
help="Path to custom configuration file"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Run without applying fixes"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)
args = parser.parse_args()
# Validate artifact exists
if not args.artifact.exists():
print(f"Error: Artifact not found: {args.artifact}")
return 1
# Load config
config = load_config(args.config)
# Create and run loop
loop = QASelfHealingLoop(
artifact_path=args.artifact,
config=config,
dry_run=args.dry_run,
verbose=args.verbose
)
result, escalation_report = loop.run()
# Print summary
print("\n" + "="*80)
print("QA SELF-HEALING LOOP SUMMARY")
print("="*80)
print(f"Artifact: {args.artifact}")
print(f"Result: {result.upper()}")
print(f"Iterations: {len(loop.iterations)}")
if escalation_report:
print(f"\nEscalation Report: {escalation_report}")
print("\nHuman review required. See escalation report for details.")
return 2
elif result == "success":
print("\nā QA validation passed!")
return 0
else:
print("\nā QA validation failed")
return 1
if name == "main": sys.exit(main())