scripts-team-pr-digest
#!/usr/bin/env python3 """ Team PR Digest Generator - K.5.2
Generates PR activity digests by contributor with risk highlights, review statistics, and team velocity metrics.
Usage: python3 scripts/team-pr-digest.py [--days 7] [--team user1,user2] python3 scripts/team-pr-digest.py --org my-org
Track: K (Workflow Automation) Agent: team-pr-digest Command: /team-pr-digest """
import argparse import json import subprocess import sys from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Any
@dataclass class ContributorStats: """Statistics for a single contributor.""" login: str prs_opened: int = 0 prs_merged: int = 0 reviews_given: int = 0 comments_made: int = 0 additions: int = 0 deletions: int = 0 files_changed: int = 0 avg_pr_size: float = 0.0 avg_time_to_merge: float = 0.0 # hours risk_prs: list = field(default_factory=list)
def run_command(cmd: list[str]) -> tuple[int, str, str]: """Run a shell command.""" try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) return result.returncode, result.stdout, result.stderr except Exception as e: return 1, "", str(e)
def get_prs(days: int, repo: str | None = None, state: str = "all") -> list[dict[str, Any]]: """Fetch PRs from GitHub.""" since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cmd = ["gh", "pr", "list", "--state", state, "--json",
"number,title,author,createdAt,mergedAt,closedAt,additions,deletions,"
"changedFiles,labels,reviews,url,headRefName",
"--search", f"created:>={since}",
"--limit", "200"]
if repo:
cmd.extend(["--repo", repo])
code, stdout, stderr = run_command(cmd)
if code != 0:
print(f"Warning: Failed to fetch PRs: {stderr}", file=sys.stderr)
return []
try:
return json.loads(stdout) if stdout.strip() else []
except json.JSONDecodeError:
return []
def calculate_risk_score(pr: dict[str, Any]) -> tuple[int, list[str]]: """Calculate risk score for a PR.""" score = 0 reasons = []
additions = pr.get("additions", 0)
deletions = pr.get("deletions", 0)
files = pr.get("changedFiles", 0)
labels = [l.get("name", "").lower() for l in pr.get("labels", [])]
# Size-based risk
total_changes = additions + deletions
if total_changes > 1000:
score += 3
reasons.append(f"Large PR ({total_changes} lines)")
elif total_changes > 500:
score += 2
reasons.append(f"Medium-large PR ({total_changes} lines)")
if files > 20:
score += 2
reasons.append(f"Many files changed ({files})")
# Label-based risk
risk_labels = {"breaking", "security", "critical", "urgent", "hotfix"}
found_risk_labels = risk_labels.intersection(set(labels))
if found_risk_labels:
score += 3
reasons.append(f"Risk labels: {', '.join(found_risk_labels)}")
# Review status
reviews = pr.get("reviews", [])
if not reviews:
score += 1
reasons.append("No reviews")
elif all(r.get("state") != "APPROVED" for r in reviews):
score += 1
reasons.append("Not approved")
return score, reasons
def build_contributor_stats(prs: list[dict[str, Any]], team: list[str] | None) -> dict[str, ContributorStats]: """Build statistics per contributor.""" stats: dict[str, ContributorStats] = defaultdict(ContributorStats)
for pr in prs:
author = pr.get("author", {}).get("login", "unknown")
# Filter by team if specified
if team and author not in team:
continue
if author not in stats:
stats[author] = ContributorStats(login=author)
contributor = stats[author]
contributor.prs_opened += 1
if pr.get("mergedAt"):
contributor.prs_merged += 1
# Calculate time to merge
try:
created = datetime.fromisoformat(pr["createdAt"].replace("Z", "+00:00"))
merged = datetime.fromisoformat(pr["mergedAt"].replace("Z", "+00:00"))
hours = (merged - created).total_seconds() / 3600
contributor.avg_time_to_merge = (
(contributor.avg_time_to_merge * (contributor.prs_merged - 1) + hours)
/ contributor.prs_merged
)
except (ValueError, KeyError):
pass
contributor.additions += pr.get("additions", 0)
contributor.deletions += pr.get("deletions", 0)
contributor.files_changed += pr.get("changedFiles", 0)
# Track risk PRs
risk_score, risk_reasons = calculate_risk_score(pr)
if risk_score >= 3:
contributor.risk_prs.append({
"number": pr.get("number"),
"title": pr.get("title", "")[:50],
"score": risk_score,
"reasons": risk_reasons
})
# Count reviews given
for review in pr.get("reviews", []):
reviewer = review.get("author", {}).get("login", "")
if reviewer and reviewer != author:
if reviewer not in stats:
stats[reviewer] = ContributorStats(login=reviewer)
stats[reviewer].reviews_given += 1
# Calculate average PR size
for contributor in stats.values():
if contributor.prs_opened > 0:
contributor.avg_pr_size = (
(contributor.additions + contributor.deletions) / contributor.prs_opened
)
return dict(stats)
def generate_report( stats: dict[str, ContributorStats], prs: list[dict[str, Any]], days: int ) -> str: """Generate the team PR digest report.""" end_date = datetime.now() start_date = end_date - timedelta(days=days)
total_prs = len(prs)
total_merged = sum(1 for pr in prs if pr.get("mergedAt"))
total_additions = sum(pr.get("additions", 0) for pr in prs)
total_deletions = sum(pr.get("deletions", 0) for pr in prs)
lines = [
"# Team PR Digest",
"",
f"**Period:** {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}",
f"**Generated:** {end_date.strftime('%Y-%m-%d %H:%M:%S')}",
"",
"---",
"",
"## Team Summary",
"",
f"| Metric | Value |",
f"|--------|-------|",
f"| Total PRs | {total_prs} |",
f"| Merged | {total_merged} ({total_merged/total_prs*100:.0f}% merge rate) |" if total_prs > 0 else "| Merged | 0 |",
f"| Contributors | {len(stats)} |",
f"| Lines Changed | +{total_additions:,} / -{total_deletions:,} |",
"",
]
# Contributor breakdown
lines.extend([
"## Activity by Contributor",
"",
"| Contributor | PRs | Merged | Reviews | Avg Size | Avg Merge Time |",
"|-------------|-----|--------|---------|----------|----------------|",
])
for contributor in sorted(stats.values(), key=lambda x: x.prs_opened, reverse=True):
merge_time = f"{contributor.avg_time_to_merge:.1f}h" if contributor.avg_time_to_merge > 0 else "-"
lines.append(
f"| @{contributor.login} | {contributor.prs_opened} | {contributor.prs_merged} | "
f"{contributor.reviews_given} | {contributor.avg_pr_size:.0f} | {merge_time} |"
)
lines.append("")
# Risk highlights
all_risk_prs = []
for contributor in stats.values():
for risk_pr in contributor.risk_prs:
risk_pr["author"] = contributor.login
all_risk_prs.append(risk_pr)
if all_risk_prs:
lines.extend([
"## Risk Highlights",
"",
"PRs that need extra attention:",
"",
])
for risk_pr in sorted(all_risk_prs, key=lambda x: -x["score"])[:10]:
level = "HIGH" if risk_pr["score"] >= 5 else "MEDIUM"
lines.append(f"- **[{level}]** #{risk_pr['number']} {risk_pr['title']}")
lines.append(f" - Author: @{risk_pr['author']}")
lines.append(f" - Reasons: {', '.join(risk_pr['reasons'])}")
lines.append("")
# Review distribution
reviewers = [(c.login, c.reviews_given) for c in stats.values() if c.reviews_given > 0]
if reviewers:
lines.extend([
"## Review Activity",
"",
"Top reviewers this period:",
"",
])
for reviewer, count in sorted(reviewers, key=lambda x: -x[1])[:5]:
lines.append(f"- @{reviewer}: {count} reviews")
lines.append("")
# Velocity metrics
if total_prs > 0:
avg_merge_time = sum(c.avg_time_to_merge * c.prs_merged for c in stats.values()) / max(total_merged, 1)
prs_per_day = total_prs / days
merges_per_day = total_merged / days
lines.extend([
"## Velocity Metrics",
"",
f"- **PRs per day:** {prs_per_day:.1f}",
f"- **Merges per day:** {merges_per_day:.1f}",
f"- **Avg time to merge:** {avg_merge_time:.1f} hours",
"",
])
# Recommendations
lines.extend([
"## Recommendations",
"",
])
# Check for review bottlenecks
no_review_count = sum(1 for pr in prs if not pr.get("reviews"))
if no_review_count > total_prs * 0.2:
lines.append(f"- **Review coverage:** {no_review_count} PRs ({no_review_count/total_prs*100:.0f}%) lack reviews")
# Check for large PRs
large_prs = sum(1 for pr in prs if pr.get("additions", 0) + pr.get("deletions", 0) > 500)
if large_prs > 0:
lines.append(f"- **PR size:** {large_prs} large PRs (>500 lines) - consider breaking down")
# Check review balance
if reviewers:
top_reviewer_pct = max(c for _, c in reviewers) / sum(c for _, c in reviewers) * 100
if top_reviewer_pct > 50:
lines.append(f"- **Review balance:** Top reviewer handles {top_reviewer_pct:.0f}% of reviews - distribute load")
lines.extend([
"",
"---",
"*Generated by CODITECT Team PR Digest*",
])
return "\n".join(lines)
def main(): parser = argparse.ArgumentParser( description="Generate team PR activity digest" ) parser.add_argument( "--days", "-d", type=int, default=7, help="Number of days to include (default: 7)" ) parser.add_argument( "--team", "-t", type=str, default=None, help="Comma-separated list of team members to include" ) parser.add_argument( "--repo", "-r", type=str, default=None, help="GitHub repository (owner/repo format)" ) parser.add_argument( "--output", "-o", type=str, default=None, help="Output file path (default: stdout)" ) parser.add_argument( "--json", action="store_true", help="Output as JSON instead of Markdown" )
args = parser.parse_args()
team = args.team.split(",") if args.team else None
print(f"Fetching PRs from last {args.days} days...", file=sys.stderr)
prs = get_prs(args.days, args.repo)
if not prs:
print("No PRs found.", file=sys.stderr)
sys.exit(0)
print(f"Analyzing {len(prs)} PRs...", file=sys.stderr)
stats = build_contributor_stats(prs, team)
if args.json:
output = json.dumps({
"period_days": args.days,
"total_prs": len(prs),
"contributors": {
login: {
"prs_opened": s.prs_opened,
"prs_merged": s.prs_merged,
"reviews_given": s.reviews_given,
"additions": s.additions,
"deletions": s.deletions,
"avg_pr_size": s.avg_pr_size,
"avg_time_to_merge": s.avg_time_to_merge,
"risk_prs": s.risk_prs
}
for login, s in stats.items()
}
}, indent=2)
else:
output = generate_report(stats, prs, args.days)
if args.output:
from pathlib import Path
Path(args.output).write_text(output)
print(f"Digest written to: {args.output}", file=sys.stderr)
else:
print(output)
if name == "main": main()