Skip to main content

scripts-qa-validation-loop

#!/usr/bin/env python3 """​

title: "Constants" component_type: script version: "1.0.0" audience: contributor status: stable summary: "QA Self-Healing Loop - Autonomous Quality Assurance with Auto-Remediation" keywords: ['analysis', 'loop', 'review', 'validation'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "qa-validation-loop.py" language: python executable: true usage: "python3 scripts/qa-validation-loop.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false​

QA Self-Healing Loop - Autonomous Quality Assurance with Auto-Remediation

Implements an autonomous QA validation loop inspired by Auto-Claude patterns:

  1. Run QA review using council-orchestrator (multi-agent review)
  2. If approved → SUCCESS
  3. If rejected → Check for recurring issues
    • If recurring (3+ similar) → Escalate to human
    • Else → Run fixer agent, continue loop
  4. Track state, detect patterns, escalate intelligently

Part of CODITECT Core Framework Created: 2025-12-22 Version: 1.0.0

Usage: # Basic validation python3 scripts/qa-validation-loop.py path/to/artifact.md

# With custom config
python3 scripts/qa-validation-loop.py path/to/artifact.md --config custom-config.json

# Dry run (no fixes applied)
python3 scripts/qa-validation-loop.py path/to/artifact.md --dry-run

# Verbose output
python3 scripts/qa-validation-loop.py path/to/artifact.md --verbose

"""

import sys import json import argparse import subprocess import hashlib import difflib import re from pathlib import Path from datetime import datetime, timezone from typing import Dict, Any, List, Optional, Tuple from dataclasses import dataclass, asdict from collections import defaultdict

Constants

MAX_ITERATIONS = 50 MAX_CONSECUTIVE_ERRORS = 3 RECURRING_THRESHOLD = 3 SIMILARITY_THRESHOLD = 0.8

@dataclass class QAResult: """Result from a QA review iteration""" status: str # "approved", "rejected", "error" score: float # 0.0-1.0 critical_findings: int high_findings: int medium_findings: int low_findings: int issues: List[Dict[str, Any]] consensus: Optional[float] = None verdict: Optional[str] = None timestamp: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return asdict(self)

@dataclass class IterationRecord: """Record of a single loop iteration""" iteration: int status: str issues_found: int duration_seconds: float timestamp: str qa_result: Optional[QAResult] = None error_message: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
data = asdict(self)
if self.qa_result:
data['qa_result'] = self.qa_result.to_dict()
return data

class IssueNormalizer: """Normalize and compare issues for similarity detection"""

@staticmethod
def normalize_key(issue_text: str) -> str:
"""
Normalize issue text for comparison

Args:
issue_text: Raw issue description

Returns:
Normalized lowercase text with punctuation removed
"""
# Convert to lowercase
normalized = issue_text.lower()

# Remove punctuation
normalized = re.sub(r'[^\w\s]', '', normalized)

# Remove extra whitespace
normalized = ' '.join(normalized.split())

return normalized

@staticmethod
def calculate_similarity(text1: str, text2: str) -> float:
"""
Calculate similarity between two issue texts

Args:
text1: First issue text
text2: Second issue text

Returns:
Similarity score 0.0-1.0
"""
norm1 = IssueNormalizer.normalize_key(text1)
norm2 = IssueNormalizer.normalize_key(text2)

return difflib.SequenceMatcher(None, norm1, norm2).ratio()

@staticmethod
def is_similar(text1: str, text2: str, threshold: float = SIMILARITY_THRESHOLD) -> bool:
"""
Check if two issues are similar

Args:
text1: First issue text
text2: Second issue text
threshold: Similarity threshold (default 0.8)

Returns:
True if similar, False otherwise
"""
return IssueNormalizer.calculate_similarity(text1, text2) >= threshold

class RecurringIssueDetector: """Detect and track recurring issues across iterations"""

def __init__(self, threshold: int = RECURRING_THRESHOLD,
similarity_threshold: float = SIMILARITY_THRESHOLD):
"""
Initialize detector

Args:
threshold: Number of occurrences to consider recurring
similarity_threshold: Similarity threshold for matching issues
"""
self.threshold = threshold
self.similarity_threshold = similarity_threshold
self.issue_history: List[str] = []
self.issue_counts: Dict[str, int] = defaultdict(int)

def add_issues(self, issues: List[Dict[str, Any]]) -> None:
"""
Add issues from current iteration

Args:
issues: List of issue dictionaries
"""
for issue in issues:
issue_text = issue.get('title', '') + ' ' + issue.get('description', '')
normalized = IssueNormalizer.normalize_key(issue_text)

# Check if similar to any existing issue
matched = False
for existing_key in list(self.issue_counts.keys()):
if IssueNormalizer.is_similar(normalized, existing_key,
self.similarity_threshold):
self.issue_counts[existing_key] += 1
matched = True
break

if not matched:
self.issue_counts[normalized] = 1

self.issue_history.append(normalized)

def has_recurring_issues(self) -> bool:
"""
Check if any issues have recurred beyond threshold

Returns:
True if recurring issues detected
"""
return any(count >= self.threshold for count in self.issue_counts.values())

def get_recurring_issues(self) -> List[Tuple[str, int]]:
"""
Get list of recurring issues with counts

Returns:
List of (issue_text, count) tuples for recurring issues
"""
return [(issue, count) for issue, count in self.issue_counts.items()
if count >= self.threshold]

def get_top_issues(self, n: int = 5) -> List[Tuple[str, int]]:
"""
Get top N most frequent issues

Args:
n: Number of top issues to return

Returns:
List of (issue_text, count) tuples
"""
sorted_issues = sorted(self.issue_counts.items(),
key=lambda x: x[1], reverse=True)
return sorted_issues[:n]

class QAHistoryManager: """Manage QA loop history persistence"""

def __init__(self, history_file: Path):
"""
Initialize history manager

Args:
history_file: Path to history JSON file
"""
self.history_file = history_file
self.history: Dict[str, Any] = self._load_history()

def _load_history(self) -> Dict[str, Any]:
"""Load history from file"""
if self.history_file.exists():
with open(self.history_file, 'r') as f:
return json.load(f)
return {
"version": "1.0.0",
"runs": []
}

def save_history(self) -> None:
"""Save history to file"""
self.history_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.history_file, 'w') as f:
json.dump(self.history, f, indent=2)

def add_run(self, artifact_path: str, iterations: List[IterationRecord],
result: str, escalation_reason: Optional[str] = None) -> None:
"""
Add a completed run to history

Args:
artifact_path: Path to artifact that was validated
iterations: List of iteration records
result: Final result ("success", "escalated", "error")
escalation_reason: Reason for escalation if applicable
"""
run_record = {
"artifact": artifact_path,
"timestamp": datetime.now(timezone.utc).isoformat(),
"total_iterations": len(iterations),
"result": result,
"escalation_reason": escalation_reason,
"iterations": [iter_rec.to_dict() for iter_rec in iterations]
}

self.history["runs"].append(run_record)
self.save_history()

class QASelfHealingLoop: """Main QA self-healing loop implementation"""

def __init__(self, artifact_path: Path, config: Dict[str, Any],
dry_run: bool = False, verbose: bool = False):
"""
Initialize QA loop

Args:
artifact_path: Path to artifact to validate
config: Configuration dictionary
dry_run: If True, don't apply fixes
verbose: If True, print detailed output
"""
self.artifact_path = artifact_path
self.config = config
self.dry_run = dry_run
self.verbose = verbose

# Extract config parameters
loop_params = config.get('loop_parameters', {})
self.max_iterations = loop_params.get('max_iterations', MAX_ITERATIONS)
self.max_consecutive_errors = loop_params.get('max_consecutive_errors',
MAX_CONSECUTIVE_ERRORS)
self.recurring_threshold = loop_params.get('recurring_threshold',
RECURRING_THRESHOLD)
self.similarity_threshold = loop_params.get('similarity_threshold',
SIMILARITY_THRESHOLD)

# Initialize components
self.issue_detector = RecurringIssueDetector(
threshold=self.recurring_threshold,
similarity_threshold=self.similarity_threshold
)

history_file = Path(config['state_tracking']['history_file'])
self.history_manager = QAHistoryManager(history_file)

# State tracking
self.iterations: List[IterationRecord] = []
self.consecutive_errors = 0

def log(self, message: str, level: str = "INFO") -> None:
"""
Log message if verbose mode enabled

Args:
message: Message to log
level: Log level (INFO, WARNING, ERROR)
"""
if self.verbose or level in ["WARNING", "ERROR"]:
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")

def run_qa_review(self) -> QAResult:
"""
Run QA review using council-orchestrator

Returns:
QAResult object with review outcome
"""
self.log(f"Running QA review for {self.artifact_path}")

# In a real implementation, this would invoke the council-orchestrator agent
# For now, we simulate the integration point
# TODO: Integrate with actual council-orchestrator agent

try:
# Simulated QA review result
# In production, this would call: Task(subagent_type="council-orchestrator", ...)
result = QAResult(
status="approved", # or "rejected"
score=0.85,
critical_findings=0,
high_findings=1,
medium_findings=3,
low_findings=5,
issues=[],
consensus=0.75,
verdict="APPROVED",
timestamp=datetime.now(timezone.utc).isoformat()
)

self.log(f"QA Review complete: {result.verdict} (score: {result.score:.2f})")
return result

except Exception as e:
self.log(f"QA review error: {e}", "ERROR")
return QAResult(
status="error",
score=0.0,
critical_findings=0,
high_findings=0,
medium_findings=0,
low_findings=0,
issues=[],
timestamp=datetime.now(timezone.utc).isoformat()
)

def run_fixer(self, qa_result: QAResult) -> bool:
"""
Run fixer agent to remediate issues

Args:
qa_result: QA result with issues to fix

Returns:
True if fixes applied successfully
"""
if self.dry_run:
self.log("DRY RUN: Skipping fixer execution", "WARNING")
return False

self.log(f"Running fixer agent for {len(qa_result.issues)} issues")

# In a real implementation, this would invoke the qa-reviewer agent
# with the specific issues to fix
# TODO: Integrate with actual qa-reviewer agent

try:
# Simulated fixer execution
# In production, this would call: Task(subagent_type="qa-reviewer", ...)
self.log("Fixer execution complete")
return True

except Exception as e:
self.log(f"Fixer error: {e}", "ERROR")
return False

def should_escalate(self) -> Tuple[bool, Optional[str]]:
"""
Check if conditions require human escalation

Returns:
Tuple of (should_escalate, reason)
"""
# Check for recurring issues
if self.issue_detector.has_recurring_issues():
recurring = self.issue_detector.get_recurring_issues()
reason = f"Recurring issues detected: {len(recurring)} issues occurring {self.recurring_threshold}+ times"
return True, reason

# Check for consecutive errors
if self.consecutive_errors >= self.max_consecutive_errors:
reason = f"Consecutive errors threshold reached: {self.consecutive_errors}/{self.max_consecutive_errors}"
return True, reason

# Check for max iterations
if len(self.iterations) >= self.max_iterations:
reason = f"Max iterations reached: {len(self.iterations)}/{self.max_iterations}"
return True, reason

return False, None

def generate_escalation_report(self, reason: str) -> Path:
"""
Generate human escalation report

Args:
reason: Escalation reason

Returns:
Path to generated report
"""
self.log(f"Generating escalation report: {reason}", "WARNING")

# Load template
template_path = Path(self.config['output']['escalation_template'])
with open(template_path, 'r') as f:
template = f.read()

# Build iteration history table
history_rows = []
for iter_rec in self.iterations:
history_rows.append(
f"| {iter_rec.iteration} | {iter_rec.status} | "
f"{iter_rec.issues_found} | {iter_rec.duration_seconds:.2f}s | "
f"{iter_rec.timestamp} |"
)
history_table = '\n'.join(history_rows)

# Build recurring issues section
recurring = self.issue_detector.get_recurring_issues()
if recurring:
recurring_section = "### Most Recurring Issues\n\n"
for issue, count in recurring:
recurring_section += f"- **{count} occurrences:** {issue[:100]}...\n"
else:
recurring_section = "No recurring issues detected (escalation triggered by other condition)."

# Build issue frequency analysis
top_issues = self.issue_detector.get_top_issues(10)
frequency_lines = []
for issue, count in top_issues:
frequency_lines.append(f"- **{count}x:** {issue[:80]}...")
frequency_analysis = '\n'.join(frequency_lines) if frequency_lines else "No issues tracked."

# Get last QA result
last_qa = self.iterations[-1].qa_result if self.iterations else None
last_qa_json = json.dumps(last_qa.to_dict(), indent=2) if last_qa else "N/A"

# Fill template
report_content = template.format(
timestamp=datetime.now(timezone.utc).isoformat(),
artifact_path=str(self.artifact_path),
iteration_number=len(self.iterations),
max_iterations=self.max_iterations,
escalation_reason=reason,
escalation_trigger=reason,
recurring_issues_section=recurring_section,
iteration_history_table=history_table,
last_qa_review_json=last_qa_json,
issue_frequency_analysis=frequency_analysis,
top_issue_count=len(top_issues),
config_snapshot=json.dumps(self.config, indent=2),
report_path="[Generated on write]"
)

# Write report
report_dir = Path(".coditect/qa-escalations")
report_dir.mkdir(parents=True, exist_ok=True)

timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
artifact_name = self.artifact_path.stem
report_path = report_dir / f"QA_ESCALATION-{artifact_name}-{timestamp}.md"

with open(report_path, 'w') as f:
# Update report_path placeholder
report_content = report_content.replace(
"report_path=\"[Generated on write]\"",
f"report_path=\"{report_path}\""
)
f.write(report_content)

self.log(f"Escalation report generated: {report_path}", "WARNING")
return report_path

def run(self) -> Tuple[str, Optional[Path]]:
"""
Execute the QA self-healing loop

Returns:
Tuple of (result, escalation_report_path)
result: "success", "escalated", "error"
escalation_report_path: Path to report if escalated, else None
"""
self.log(f"Starting QA self-healing loop for {self.artifact_path}")
self.log(f"Max iterations: {self.max_iterations}, Dry run: {self.dry_run}")

iteration = 0

while iteration < self.max_iterations:
iteration += 1
start_time = datetime.now()
self.log(f"\n=== Iteration {iteration}/{self.max_iterations} ===")

# Run QA review
qa_result = self.run_qa_review()

# Check for errors
if qa_result.status == "error":
self.consecutive_errors += 1
self.log(f"Error in iteration {iteration} (consecutive: {self.consecutive_errors})",
"ERROR")

iter_record = IterationRecord(
iteration=iteration,
status="error",
issues_found=0,
duration_seconds=(datetime.now() - start_time).total_seconds(),
timestamp=datetime.now(timezone.utc).isoformat(),
qa_result=qa_result,
error_message="QA review failed"
)
self.iterations.append(iter_record)

# Check if should escalate
should_esc, esc_reason = self.should_escalate()
if should_esc:
report_path = self.generate_escalation_report(esc_reason)
self.history_manager.add_run(
str(self.artifact_path),
self.iterations,
"escalated",
esc_reason
)
return "escalated", report_path

continue

# Reset error counter on success
self.consecutive_errors = 0

# Add issues to detector
self.issue_detector.add_issues(qa_result.issues)

# Record iteration
iter_record = IterationRecord(
iteration=iteration,
status=qa_result.status,
issues_found=len(qa_result.issues),
duration_seconds=(datetime.now() - start_time).total_seconds(),
timestamp=datetime.now(timezone.utc).isoformat(),
qa_result=qa_result
)
self.iterations.append(iter_record)

# Check if approved
if qa_result.status == "approved":
self.log(f"āœ“ QA approved after {iteration} iterations", "INFO")
self.history_manager.add_run(
str(self.artifact_path),
self.iterations,
"success"
)
return "success", None

# Check if should escalate before attempting fix
should_esc, esc_reason = self.should_escalate()
if should_esc:
report_path = self.generate_escalation_report(esc_reason)
self.history_manager.add_run(
str(self.artifact_path),
self.iterations,
"escalated",
esc_reason
)
return "escalated", report_path

# Run fixer
self.log(f"QA rejected, running fixer agent...")
fix_success = self.run_fixer(qa_result)

if not fix_success:
self.log("Fixer failed, continuing to next iteration", "WARNING")

# Max iterations reached
self.log(f"Max iterations reached ({self.max_iterations})", "WARNING")
report_path = self.generate_escalation_report(
f"Max iterations reached: {self.max_iterations}"
)
self.history_manager.add_run(
str(self.artifact_path),
self.iterations,
"escalated",
"Max iterations reached"
)
return "escalated", report_path

def load_config(config_path: Optional[Path] = None) -> Dict[str, Any]: """ Load QA loop configuration

Args:
config_path: Optional custom config path

Returns:
Configuration dictionary
"""
if config_path is None:
config_path = Path(__file__).parent.parent / "config" / "qa-loop-config.json"

if not config_path.exists():
print(f"Error: Config file not found: {config_path}")
sys.exit(1)

with open(config_path, 'r') as f:
return json.load(f)

def main() -> int: """Main entry point""" parser = argparse.ArgumentParser( description="QA Self-Healing Loop - Autonomous quality assurance with auto-remediation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples:

Basic validation

python3 scripts/qa-validation-loop.py docs/architecture/decisions/ADR-025.md

With custom config

python3 scripts/qa-validation-loop.py artifact.md --config custom-config.json

Dry run (no fixes)

python3 scripts/qa-validation-loop.py artifact.md --dry-run

Verbose output

python3 scripts/qa-validation-loop.py artifact.md --verbose """ )

parser.add_argument(
"artifact",
type=Path,
help="Path to artifact to validate"
)

parser.add_argument(
"--config",
type=Path,
help="Path to custom configuration file"
)

parser.add_argument(
"--dry-run",
action="store_true",
help="Run without applying fixes"
)

parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose output"
)

args = parser.parse_args()

# Validate artifact exists
if not args.artifact.exists():
print(f"Error: Artifact not found: {args.artifact}")
return 1

# Load config
config = load_config(args.config)

# Create and run loop
loop = QASelfHealingLoop(
artifact_path=args.artifact,
config=config,
dry_run=args.dry_run,
verbose=args.verbose
)

result, escalation_report = loop.run()

# Print summary
print("\n" + "="*80)
print("QA SELF-HEALING LOOP SUMMARY")
print("="*80)
print(f"Artifact: {args.artifact}")
print(f"Result: {result.upper()}")
print(f"Iterations: {len(loop.iterations)}")

if escalation_report:
print(f"\nEscalation Report: {escalation_report}")
print("\nHuman review required. See escalation report for details.")
return 2
elif result == "success":
print("\nāœ“ QA validation passed!")
return 0
else:
print("\nāœ— QA validation failed")
return 1

if name == "main": sys.exit(main())