scripts-ci-failure-analyzer
#!/usr/bin/env python3 """ CI Failure Analyzer - K.5.3
Groups CI/CD pipeline failures by likely root cause, identifies flaky tests, and suggests minimal targeted fixes.
Usage: python3 scripts/ci-failure-analyzer.py [--window 24h] [--repo owner/repo] python3 scripts/ci-failure-analyzer.py --flaky-threshold 0.15
Track: K (Workflow Automation) Agent: ci-failure-analyzer Command: /ci-analyze """
import argparse import json import re import subprocess import sys from collections import defaultdict from datetime import datetime, timedelta from typing import Any
def run_command(cmd: list[str], capture: bool = True) -> tuple[int, str, str]: """Run a shell command and return (returncode, stdout, stderr).""" try: result = subprocess.run( cmd, capture_output=capture, text=True, timeout=120 ) return result.returncode, result.stdout, result.stderr except subprocess.TimeoutExpired: return 1, "", "Command timed out" except Exception as e: return 1, "", str(e)
def parse_window(window: str) -> timedelta: """Parse a time window string like '24h', '7d', '1w'.""" match = re.match(r"(\d+)([hdwm])", window.lower()) if not match: return timedelta(hours=24)
value, unit = int(match.group(1)), match.group(2)
if unit == "h":
return timedelta(hours=value)
elif unit == "d":
return timedelta(days=value)
elif unit == "w":
return timedelta(weeks=value)
elif unit == "m":
return timedelta(days=value * 30)
return timedelta(hours=24)
def get_failed_runs(window: str, repo: str | None = None) -> list[dict[str, Any]]: """Fetch failed CI runs from GitHub Actions.""" cmd = ["gh", "run", "list", "--status", "failure", "--json", "databaseId,name,conclusion,createdAt,headBranch,url,workflowName", "--limit", "100"]
if repo:
cmd.extend(["--repo", repo])
code, stdout, stderr = run_command(cmd)
if code != 0:
print(f"Warning: Failed to fetch runs: {stderr}", file=sys.stderr)
return []
try:
runs = json.loads(stdout) if stdout.strip() else []
except json.JSONDecodeError:
return []
# Filter by time window
cutoff = datetime.now() - parse_window(window)
filtered = []
for run in runs:
try:
created = datetime.fromisoformat(run.get("createdAt", "").replace("Z", "+00:00"))
if created.replace(tzinfo=None) >= cutoff:
filtered.append(run)
except (ValueError, TypeError):
continue
return filtered
def get_run_logs(run_id: int, repo: str | None = None) -> str: """Fetch failed job logs for a run.""" cmd = ["gh", "run", "view", str(run_id), "--log-failed"] if repo: cmd.extend(["--repo", repo])
code, stdout, stderr = run_command(cmd)
return stdout if code == 0 else ""
def classify_failure(logs: str, workflow_name: str) -> dict[str, Any]: """Classify a failure by root cause.""" logs_lower = logs.lower()
# Test failure patterns
test_patterns = [
(r"FAILED.*test", "test_failure"),
(r"AssertionError", "assertion_failure"),
(r"pytest.*failed", "pytest_failure"),
(r"jest.*failed", "jest_failure"),
(r"error:.*test", "test_error"),
]
# Build failure patterns
build_patterns = [
(r"compilation failed", "compilation_error"),
(r"cannot find module", "missing_dependency"),
(r"npm ERR!", "npm_error"),
(r"pip.*error", "pip_error"),
(r"cargo.*error", "cargo_error"),
(r"ModuleNotFoundError", "import_error"),
]
# Infrastructure patterns
infra_patterns = [
(r"timeout", "timeout"),
(r"out of memory|OOM", "oom"),
(r"connection refused", "network_error"),
(r"disk.*full|no space", "disk_full"),
(r"runner.*unavailable", "runner_unavailable"),
]
# Environment patterns
env_patterns = [
(r"version.*mismatch", "version_mismatch"),
(r"environment variable.*not set", "missing_env_var"),
(r"cache.*invalid|cache.*miss", "cache_issue"),
]
for pattern, cause in test_patterns:
if re.search(pattern, logs_lower):
return {"category": "test_failure", "cause": cause, "confidence": 0.85}
for pattern, cause in build_patterns:
if re.search(pattern, logs_lower):
return {"category": "build_failure", "cause": cause, "confidence": 0.80}
for pattern, cause in infra_patterns:
if re.search(pattern, logs_lower):
return {"category": "infrastructure", "cause": cause, "confidence": 0.75}
for pattern, cause in env_patterns:
if re.search(pattern, logs_lower):
return {"category": "environment", "cause": cause, "confidence": 0.70}
return {"category": "unknown", "cause": "unclassified", "confidence": 0.50}
def extract_failed_tests(logs: str) -> list[str]: """Extract failed test names from logs.""" tests = set()
# pytest pattern
pytest_matches = re.findall(r"FAILED\s+([\w/]+\.py::\w+)", logs)
tests.update(pytest_matches)
# jest pattern
jest_matches = re.findall(r"FAIL\s+([\w/]+\.(?:test|spec)\.\w+)", logs)
tests.update(jest_matches)
# Generic test pattern
generic_matches = re.findall(r"(?:test_\w+|Test\w+::\w+)", logs)
tests.update(generic_matches)
return list(tests)[:20] # Limit to 20
def detect_flaky_tests(runs: list[dict[str, Any]], repo: str | None, threshold: float) -> list[dict[str, Any]]: """Detect tests that fail intermittently.""" test_results = defaultdict(lambda: {"pass": 0, "fail": 0})
# This would require more runs data including passed runs
# Simplified implementation based on failure frequency
all_failed_tests = []
for run in runs[:20]: # Limit to recent runs
logs = get_run_logs(run.get("databaseId", 0), repo)
failed = extract_failed_tests(logs)
for test in failed:
test_results[test]["fail"] += 1
flaky = []
for test, results in test_results.items():
fail_rate = results["fail"] / (results["fail"] + results["pass"] + 1)
if 0.1 < fail_rate < 0.9: # Intermittent failures
flaky.append({
"test": test,
"failure_rate": round(fail_rate, 2),
"failures": results["fail"],
"type": "intermittent"
})
return sorted(flaky, key=lambda x: x["failure_rate"], reverse=True)
def suggest_fix(classification: dict[str, Any], logs: str) -> str: """Suggest a fix based on the failure classification.""" cause = classification.get("cause", "unknown")
suggestions = {
"test_failure": "Review failing test assertions and update expected values",
"assertion_failure": "Check test data and assertion conditions",
"pytest_failure": "Run pytest locally with -v flag to debug",
"jest_failure": "Run jest with --runInBand to isolate test",
"compilation_error": "Check syntax and type errors in recent changes",
"missing_dependency": "Run package manager install and update lockfile",
"npm_error": "Clear npm cache and reinstall: npm ci",
"pip_error": "Update pip and reinstall: pip install -r requirements.txt",
"import_error": "Check import paths and __init__.py files",
"timeout": "Increase timeout or optimize slow operations",
"oom": "Reduce memory usage or increase runner memory",
"network_error": "Check external service availability and add retries",
"disk_full": "Clean up artifacts or increase disk space",
"version_mismatch": "Pin dependency versions in lockfile",
"missing_env_var": "Add required environment variables to workflow",
"cache_issue": "Clear cache and rebuild",
}
return suggestions.get(cause, "Investigate logs for root cause")
def generate_report( runs: list[dict[str, Any]], flaky_tests: list[dict[str, Any]], repo: str | None ) -> str: """Generate the failure analysis report."""
# Group failures by root cause
groups = defaultdict(list)
for run in runs:
logs = get_run_logs(run.get("databaseId", 0), repo)
classification = classify_failure(logs, run.get("workflowName", ""))
classification["run"] = run
classification["failed_tests"] = extract_failed_tests(logs)
classification["fix_suggestion"] = suggest_fix(classification, logs)
groups[classification["category"]].append(classification)
total_runs = len(runs)
lines = [
"# CI Failure Analysis Report",
"",
f"**Period:** Last analysis window",
f"**Total Failed Runs:** {total_runs}",
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"---",
"",
"## Failure Groups (by Root Cause)",
"",
]
# Sort groups by count
for category, failures in sorted(groups.items(), key=lambda x: -len(x[1])):
lines.append(f"### {category.replace('_', ' ').title()} ({len(failures)} failures)")
lines.append("")
for failure in failures[:5]:
run = failure["run"]
lines.append(f"- **{run.get('workflowName', 'Unknown')}** on `{run.get('headBranch', 'unknown')}`")
lines.append(f" - Cause: {failure['cause']} (confidence: {failure['confidence']:.0%})")
if failure["failed_tests"]:
lines.append(f" - Failed tests: {', '.join(failure['failed_tests'][:3])}")
lines.append(f" - Fix: {failure['fix_suggestion']}")
lines.append("")
# Flaky tests section
if flaky_tests:
lines.extend([
"## Flaky Tests",
"",
"| Test | Failure Rate | Type | Suggested Fix |",
"|------|-------------|------|---------------|",
])
for test in flaky_tests[:10]:
lines.append(
f"| `{test['test'][:50]}` | {test['failure_rate']:.0%} | "
f"{test['type']} | Add retry or fix race condition |"
)
lines.append("")
# Recommendations
lines.extend([
"## Recommendations (Priority Order)",
"",
])
priority = 1
for category, failures in sorted(groups.items(), key=lambda x: -len(x[1])):
if failures:
pct = len(failures) / total_runs * 100 if total_runs > 0 else 0
level = "HIGH" if pct > 30 else "MEDIUM" if pct > 10 else "LOW"
lines.append(f"{priority}. **[{level}]** Fix {category.replace('_', ' ')} issues - {pct:.0f}% of failures")
priority += 1
lines.extend([
"",
"---",
"*Generated by CODITECT CI Failure Analyzer*",
])
return "\n".join(lines)
def main(): parser = argparse.ArgumentParser( description="Analyze CI/CD failures and identify root causes" ) parser.add_argument( "--window", "-w", type=str, default="24h", help="Time window for analysis (e.g., 24h, 7d, 1w)" ) parser.add_argument( "--repo", "-r", type=str, default=None, help="GitHub repository (owner/repo format)" ) parser.add_argument( "--flaky-threshold", type=float, default=0.10, help="Threshold for flaky test detection (default: 0.10)" ) parser.add_argument( "--output", "-o", type=str, default=None, help="Output file path (default: stdout)" ) parser.add_argument( "--json", action="store_true", help="Output as JSON instead of Markdown" )
args = parser.parse_args()
print(f"Fetching failed runs from last {args.window}...", file=sys.stderr)
runs = get_failed_runs(args.window, args.repo)
if not runs:
print("No failed runs found in the specified window.", file=sys.stderr)
sys.exit(0)
print(f"Analyzing {len(runs)} failed runs...", file=sys.stderr)
flaky_tests = detect_flaky_tests(runs, args.repo, args.flaky_threshold)
if args.json:
output = json.dumps({
"total_failures": len(runs),
"runs": runs,
"flaky_tests": flaky_tests
}, indent=2, default=str)
else:
output = generate_report(runs, flaky_tests, args.repo)
if args.output:
from pathlib import Path
Path(args.output).write_text(output)
print(f"Report written to: {args.output}", file=sys.stderr)
else:
print(output)
if name == "main": main()