scripts-weekly-digest-generator
#!/usr/bin/env python3 """ Weekly Digest Generator - K.5.1
Synthesizes a week's PRs, incidents, deployments, rollouts, code reviews, and session logs into a structured weekly digest.
Usage: python3 scripts/weekly-digest-generator.py [--days 7] [--output FILE] python3 scripts/weekly-digest-generator.py --repo owner/repo --days 14
Track: K (Workflow Automation) Agent: weekly-digest-generator Command: /weekly-digest """
import argparse import json import subprocess import sys from datetime import datetime, timedelta from pathlib import Path from typing import Any
def run_command(cmd: list[str], capture: bool = True) -> tuple[int, str, str]: """Run a shell command and return (returncode, stdout, stderr).""" try: result = subprocess.run( cmd, capture_output=capture, text=True, timeout=60 ) return result.returncode, result.stdout, result.stderr except subprocess.TimeoutExpired: return 1, "", "Command timed out" except Exception as e: return 1, "", str(e)
def get_prs(days: int, repo: str | None = None) -> list[dict[str, Any]]: """Fetch merged PRs from the last N days.""" since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cmd = ["gh", "pr", "list", "--state", "merged", "--json",
"number,title,author,mergedAt,labels,additions,deletions,url",
"--search", f"merged:>={since}"]
if repo:
cmd.extend(["--repo", repo])
code, stdout, stderr = run_command(cmd)
if code != 0:
print(f"Warning: Failed to fetch PRs: {stderr}", file=sys.stderr)
return []
try:
return json.loads(stdout) if stdout.strip() else []
except json.JSONDecodeError:
return []
def get_commits(days: int) -> list[dict[str, Any]]: """Fetch commits from the last N days.""" since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cmd = ["git", "log", f"--since={since}", "--pretty=format:%H|%an|%s|%ai", "--no-merges"]
code, stdout, stderr = run_command(cmd)
if code != 0:
return []
commits = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("|", 3)
if len(parts) == 4:
commits.append({
"sha": parts[0][:8],
"author": parts[1],
"message": parts[2],
"date": parts[3]
})
return commits
def get_issues(days: int, repo: str | None = None) -> list[dict[str, Any]]: """Fetch closed issues (potential incidents) from the last N days.""" since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cmd = ["gh", "issue", "list", "--state", "closed", "--json",
"number,title,labels,closedAt,url",
"--search", f"closed:>={since}"]
if repo:
cmd.extend(["--repo", repo])
code, stdout, stderr = run_command(cmd)
if code != 0:
return []
try:
issues = json.loads(stdout) if stdout.strip() else []
# Filter for incident-like labels
incident_labels = {"bug", "incident", "hotfix", "critical", "security"}
return [i for i in issues if any(
l.get("name", "").lower() in incident_labels
for l in i.get("labels", [])
)]
except json.JSONDecodeError:
return []
def get_session_logs(days: int) -> list[dict[str, Any]]: """Fetch session logs from the last N days.""" logs_dir = Path.home() / "PROJECTS" / ".coditect-data" / "session-logs" if not logs_dir.exists(): return []
cutoff = datetime.now() - timedelta(days=days)
sessions = []
for log_file in logs_dir.glob("*.md"):
try:
mtime = datetime.fromtimestamp(log_file.stat().st_mtime)
if mtime >= cutoff:
sessions.append({
"file": log_file.name,
"date": mtime.strftime("%Y-%m-%d"),
"size": log_file.stat().st_size
})
except Exception:
continue
return sorted(sessions, key=lambda x: x["date"], reverse=True)
def categorize_prs(prs: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: """Categorize PRs by area based on labels and title patterns.""" categories = { "Features": [], "Bug Fixes": [], "Documentation": [], "Infrastructure": [], "Tests": [], "Other": [] }
for pr in prs:
labels = [l.get("name", "").lower() for l in pr.get("labels", [])]
title = pr.get("title", "").lower()
if any(l in labels for l in ["feature", "enhancement"]) or title.startswith("feat"):
categories["Features"].append(pr)
elif any(l in labels for l in ["bug", "fix", "hotfix"]) or title.startswith("fix"):
categories["Bug Fixes"].append(pr)
elif any(l in labels for l in ["docs", "documentation"]) or title.startswith("docs"):
categories["Documentation"].append(pr)
elif any(l in labels for l in ["infra", "devops", "ci"]) or any(
k in title for k in ["ci", "deploy", "infra", "docker", "k8s"]
):
categories["Infrastructure"].append(pr)
elif any(l in labels for l in ["test", "testing"]) or title.startswith("test"):
categories["Tests"].append(pr)
else:
categories["Other"].append(pr)
return {k: v for k, v in categories.items() if v}
def generate_digest(days: int, repo: str | None = None) -> str: """Generate the weekly digest markdown.""" end_date = datetime.now() start_date = end_date - timedelta(days=days)
prs = get_prs(days, repo)
commits = get_commits(days)
incidents = get_issues(days, repo)
sessions = get_session_logs(days)
categorized_prs = categorize_prs(prs)
# Calculate stats
total_additions = sum(pr.get("additions", 0) for pr in prs)
total_deletions = sum(pr.get("deletions", 0) for pr in prs)
contributors = list(set(pr.get("author", {}).get("login", "unknown") for pr in prs))
lines = [
f"# Weekly Digest",
f"",
f"**Period:** {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}",
f"**Generated:** {end_date.strftime('%Y-%m-%d %H:%M:%S')}",
f"",
f"---",
f"",
f"## Highlights",
f"",
f"- **{len(prs)}** PRs merged",
f"- **{len(commits)}** commits",
f"- **{len(incidents)}** incidents resolved",
f"- **{len(contributors)}** contributors active",
f"- **+{total_additions:,}** / **-{total_deletions:,}** lines changed",
f"",
]
# PRs by Area
if categorized_prs:
lines.extend([
f"## PRs by Area",
f"",
])
for category, cat_prs in categorized_prs.items():
lines.append(f"### {category} ({len(cat_prs)})")
lines.append("")
for pr in cat_prs[:10]: # Limit to 10 per category
author = pr.get("author", {}).get("login", "unknown")
lines.append(f"- #{pr.get('number')} {pr.get('title')} (@{author})")
if len(cat_prs) > 10:
lines.append(f"- ... and {len(cat_prs) - 10} more")
lines.append("")
# Incidents
if incidents:
lines.extend([
f"## Incidents & Resolutions ({len(incidents)})",
f"",
])
for issue in incidents[:10]:
lines.append(f"- #{issue.get('number')} {issue.get('title')}")
lines.append("")
# Session Activity
if sessions:
lines.extend([
f"## Development Sessions ({len(sessions)})",
f"",
])
for session in sessions[:5]:
lines.append(f"- {session['file']} ({session['date']})")
lines.append("")
# Risks & Watch Items
lines.extend([
f"## Risks & Watch Items",
f"",
f"- Review large PRs (>500 lines) for thorough testing",
f"- Monitor recently resolved incidents for regression",
f"",
f"## Next Week Priorities",
f"",
f"- [ ] Review open PRs awaiting review",
f"- [ ] Address any failing CI pipelines",
f"- [ ] Update documentation for new features",
f"",
f"---",
f"*Generated by CODITECT Weekly Digest Generator*",
])
return "\n".join(lines)
def main(): parser = argparse.ArgumentParser( description="Generate a weekly digest of development activity" ) parser.add_argument( "--days", "-d", type=int, default=7, help="Number of days to include (default: 7)" ) parser.add_argument( "--repo", "-r", type=str, default=None, help="GitHub repository (owner/repo format)" ) parser.add_argument( "--output", "-o", type=str, default=None, help="Output file path (default: stdout)" ) parser.add_argument( "--json", action="store_true", help="Output as JSON instead of Markdown" )
args = parser.parse_args()
if args.json:
data = {
"period_days": args.days,
"generated": datetime.now().isoformat(),
"prs": get_prs(args.days, args.repo),
"commits": get_commits(args.days),
"incidents": get_issues(args.days, args.repo),
"sessions": get_session_logs(args.days)
}
output = json.dumps(data, indent=2, default=str)
else:
output = generate_digest(args.days, args.repo)
if args.output:
Path(args.output).write_text(output)
print(f"Digest written to: {args.output}")
else:
print(output)
if name == "main": main()