scripts-flaky-test-analyzer
#!/usr/bin/env python3 """ Flaky Test Analyzer - K.5.5
Detects and analyzes flaky tests by examining test run history, identifying intermittent failures, and suggesting fixes.
Usage: python3 scripts/flaky-test-analyzer.py [--runs 20] [--threshold 0.1] python3 scripts/flaky-test-analyzer.py --test-report junit.xml
Track: K (Workflow Automation) Agent: flaky-test-analyzer Command: /flaky-tests """
import argparse import json import re import subprocess import sys import xml.etree.ElementTree as ET from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any
@dataclass class TestResult: """A single test execution result.""" name: str file: str passed: bool duration: float run_id: str error_message: str = "" timestamp: str = ""
@dataclass class FlakynessAnalysis: """Analysis of a potentially flaky test.""" test_name: str file: str total_runs: int failures: int failure_rate: float flakiness_type: str error_patterns: list[str] = field(default_factory=list) suggested_fix: str = "" confidence: float = 0.0
def run_command(cmd: list[str]) -> tuple[int, str, str]: """Run a shell command.""" try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) return result.returncode, result.stdout, result.stderr except Exception as e: return 1, "", str(e)
def parse_junit_xml(xml_path: Path) -> list[TestResult]: """Parse JUnit XML test report.""" results = [] try: tree = ET.parse(xml_path) root = tree.getroot()
for testcase in root.iter("testcase"):
name = testcase.get("name", "unknown")
classname = testcase.get("classname", "")
time = float(testcase.get("time", 0))
failure = testcase.find("failure")
error = testcase.find("error")
skipped = testcase.find("skipped")
if skipped is not None:
continue
passed = failure is None and error is None
error_msg = ""
if failure is not None:
error_msg = failure.text or failure.get("message", "")
elif error is not None:
error_msg = error.text or error.get("message", "")
results.append(TestResult(
name=name,
file=classname,
passed=passed,
duration=time,
run_id=xml_path.stem,
error_message=error_msg[:500] # Truncate
))
except Exception as e:
print(f"Warning: Failed to parse {xml_path}: {e}", file=sys.stderr)
return results
def get_ci_test_results(runs: int, repo: str | None = None) -> list[TestResult]: """Fetch test results from CI runs.""" results = []
# Get recent runs
cmd = ["gh", "run", "list", "--json", "databaseId,conclusion,createdAt", "--limit", str(runs)]
if repo:
cmd.extend(["--repo", repo])
code, stdout, stderr = run_command(cmd)
if code != 0:
return results
try:
runs_data = json.loads(stdout)
except json.JSONDecodeError:
return results
for run in runs_data:
run_id = str(run.get("databaseId", ""))
conclusion = run.get("conclusion", "")
# Try to get artifacts with test reports
# This is simplified - real implementation would download and parse artifacts
results.append(TestResult(
name=f"run_{run_id}",
file="ci",
passed=conclusion == "success",
duration=0,
run_id=run_id,
timestamp=run.get("createdAt", "")
))
return results
def classify_flakiness(error_patterns: list[str]) -> tuple[str, str]: """Classify the type of flakiness and suggest a fix.""" combined = " ".join(error_patterns).lower()
if any(p in combined for p in ["timeout", "timed out", "deadline"]):
return "timing-dependent", "Increase timeout or add explicit waits"
if any(p in combined for p in ["race", "concurrent", "thread", "async"]):
return "race-condition", "Add proper synchronization or use thread-safe operations"
if any(p in combined for p in ["connection", "network", "socket", "http"]):
return "network-dependent", "Mock network calls or add retry logic"
if any(p in combined for p in ["file", "permission", "disk", "io"]):
return "resource-dependent", "Use temp directories and ensure cleanup"
if any(p in combined for p in ["random", "uuid", "timestamp", "date"]):
return "non-deterministic", "Mock time/random sources or use fixed seeds"
if any(p in combined for p in ["order", "depend", "setup", "teardown"]):
return "order-dependent", "Ensure test isolation and proper setup/teardown"
if any(p in combined for p in ["memory", "heap", "oom"]):
return "resource-exhaustion", "Optimize memory usage or increase limits"
return "unknown", "Investigate test logs for root cause"
def analyze_test_history(results: list[TestResult], threshold: float) -> list[FlakynessAnalysis]: """Analyze test history to identify flaky tests.""" # Group results by test test_history: dict[str, list[TestResult]] = defaultdict(list) for result in results: key = f"{result.file}::{result.name}" test_history[key].append(result)
flaky_tests = []
for test_key, history in test_history.items():
if len(history) < 3: # Need minimum runs to detect flakiness
continue
failures = [r for r in history if not r.passed]
passes = [r for r in history if r.passed]
# True flaky = sometimes passes, sometimes fails
if not failures or not passes:
continue
failure_rate = len(failures) / len(history)
# Only flag as flaky if failure rate is between threshold and 90%
if threshold < failure_rate < 0.9:
error_messages = [f.error_message for f in failures if f.error_message]
flakiness_type, suggested_fix = classify_flakiness(error_messages)
# Calculate confidence based on sample size and consistency
confidence = min(0.95, 0.5 + (len(history) / 50))
parts = test_key.split("::", 1)
flaky_tests.append(FlakynessAnalysis(
test_name=parts[1] if len(parts) > 1 else test_key,
file=parts[0] if len(parts) > 1 else "",
total_runs=len(history),
failures=len(failures),
failure_rate=failure_rate,
flakiness_type=flakiness_type,
error_patterns=list(set(error_messages))[:5],
suggested_fix=suggested_fix,
confidence=confidence
))
return sorted(flaky_tests, key=lambda x: x.failure_rate, reverse=True)
def generate_report(flaky_tests: list[FlakynessAnalysis], total_tests: int) -> str: """Generate the flaky test analysis report.""" lines = [ "# Flaky Test Analysis Report", "", f"Tests Analyzed: {total_tests}", f"Flaky Tests Found: {len(flaky_tests)}", f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "", "---", "", ]
if not flaky_tests:
lines.extend([
"No flaky tests detected.",
"",
"All tests show consistent behavior across runs.",
])
else:
# Summary table
lines.extend([
"## Summary",
"",
"| Test | Failure Rate | Type | Runs | Confidence |",
"|------|-------------|------|------|------------|",
])
for test in flaky_tests[:20]:
name = test.test_name[:40] + "..." if len(test.test_name) > 40 else test.test_name
lines.append(
f"| `{name}` | {test.failure_rate:.0%} | {test.flakiness_type} | "
f"{test.total_runs} | {test.confidence:.0%} |"
)
lines.append("")
# Detailed analysis
lines.extend([
"## Detailed Analysis",
"",
])
for i, test in enumerate(flaky_tests[:10], 1):
severity = "HIGH" if test.failure_rate > 0.3 else "MEDIUM" if test.failure_rate > 0.15 else "LOW"
lines.extend([
f"### {i}. [{severity}] {test.test_name}",
"",
f"**File:** `{test.file}`",
f"**Failure Rate:** {test.failure_rate:.0%} ({test.failures}/{test.total_runs} runs)",
f"**Flakiness Type:** {test.flakiness_type}",
f"**Confidence:** {test.confidence:.0%}",
"",
f"**Suggested Fix:** {test.suggested_fix}",
"",
])
if test.error_patterns:
lines.append("**Error Patterns:**")
for pattern in test.error_patterns[:3]:
truncated = pattern[:150] + "..." if len(pattern) > 150 else pattern
lines.append(f"- `{truncated}`")
lines.append("")
# Group by type
type_counts: dict[str, int] = defaultdict(int)
for test in flaky_tests:
type_counts[test.flakiness_type] += 1
lines.extend([
"## Flakiness by Type",
"",
"| Type | Count | Common Fix |",
"|------|-------|------------|",
])
fix_suggestions = {
"timing-dependent": "Add explicit waits, increase timeouts",
"race-condition": "Add synchronization, use locks",
"network-dependent": "Mock external calls, add retries",
"resource-dependent": "Use temp files, ensure cleanup",
"non-deterministic": "Mock time/random, use fixed seeds",
"order-dependent": "Ensure test isolation",
"resource-exhaustion": "Optimize or increase limits",
"unknown": "Manual investigation needed",
}
for flaky_type, count in sorted(type_counts.items(), key=lambda x: -x[1]):
lines.append(f"| {flaky_type} | {count} | {fix_suggestions.get(flaky_type, 'N/A')} |")
lines.append("")
# Recommendations
lines.extend([
"## Recommendations",
"",
"1. **Prioritize by failure rate** - Fix tests with >30% failure rate first",
"2. **Quarantine high-flake tests** - Mark as flaky while fixing to unblock CI",
"3. **Add retry annotations** - For timing/network issues, add controlled retries",
"4. **Improve test isolation** - Ensure each test can run independently",
"5. **Mock external dependencies** - Replace network/file calls with mocks",
"",
])
lines.extend([
"---",
"*Generated by CODITECT Flaky Test Analyzer*",
])
return "\n".join(lines)
def main(): parser = argparse.ArgumentParser( description="Analyze test history to identify flaky tests" ) parser.add_argument( "--runs", "-n", type=int, default=20, help="Number of CI runs to analyze (default: 20)" ) parser.add_argument( "--threshold", "-t", type=float, default=0.1, help="Minimum failure rate to flag as flaky (default: 0.1)" ) parser.add_argument( "--test-report", type=str, action="append", help="Path to JUnit XML test report (can specify multiple)" ) parser.add_argument( "--repo", "-r", type=str, default=None, help="GitHub repository (owner/repo format)" ) parser.add_argument( "--output", "-o", type=str, default=None, help="Output file path (default: stdout)" ) parser.add_argument( "--json", action="store_true", help="Output as JSON instead of Markdown" )
args = parser.parse_args()
all_results: list[TestResult] = []
# Parse local test reports
if args.test_report:
for report_path in args.test_report:
path = Path(report_path)
if path.exists():
print(f"Parsing {path}...", file=sys.stderr)
all_results.extend(parse_junit_xml(path))
else:
print(f"Warning: Report not found: {path}", file=sys.stderr)
# Fetch from CI if no local reports
if not all_results:
print(f"Fetching results from last {args.runs} CI runs...", file=sys.stderr)
all_results = get_ci_test_results(args.runs, args.repo)
if not all_results:
print("No test results found to analyze.", file=sys.stderr)
sys.exit(0)
print(f"Analyzing {len(all_results)} test results...", file=sys.stderr)
flaky_tests = analyze_test_history(all_results, args.threshold)
if args.json:
output = json.dumps({
"total_results": len(all_results),
"flaky_tests": [
{
"name": t.test_name,
"file": t.file,
"failure_rate": t.failure_rate,
"type": t.flakiness_type,
"suggested_fix": t.suggested_fix
}
for t in flaky_tests
]
}, indent=2)
else:
output = generate_report(flaky_tests, len(all_results))
if args.output:
Path(args.output).write_text(output)
print(f"Report written to: {args.output}", file=sys.stderr)
else:
print(output)
if name == "main": main()