#!/usr/bin/env python3 """ PR Learning Advisor - K.5.6
Extracts learning patterns from PR review feedback, identifies recurring review comments, and generates improvement recommendations.
Usage: python3 scripts/pr-learning-advisor.py [--prs 50] [--author username] python3 scripts/pr-learning-advisor.py --team
Track: K (Workflow Automation) Agent: pr-learning-advisor Command: /pr-learn """
import argparse import json import re import subprocess import sys from collections import defaultdict from dataclasses import dataclass from datetime import datetime from typing import Any
@dataclass class ReviewComment: """A code review comment.""" pr_number: int reviewer: str body: str path: str created_at: str state: str # COMMENTED, APPROVED, CHANGES_REQUESTED
@dataclass class LearningPattern: """A pattern identified from review feedback.""" category: str description: str frequency: int examples: list[str] recommendation: str severity: str # high, medium, low
def run_command(cmd: list[str]) -> tuple[int, str, str]: """Run a shell command.""" try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) return result.returncode, result.stdout, result.stderr except Exception as e: return 1, "", str(e)
def get_prs(count: int, author: str | None = None, repo: str | None = None) -> list[dict[str, Any]]: """Fetch merged PRs.""" cmd = ["gh", "pr", "list", "--state", "merged", "--json", "number,title,author,mergedAt,reviews,comments,url", "--limit", str(count)]
if author:
cmd.extend(["--author", author])
if repo:
cmd.extend(["--repo", repo])
code, stdout, stderr = run_command(cmd)
if code != 0:
return []
try:
return json.loads(stdout) if stdout.strip() else []
except json.JSONDecodeError:
return []
def get_pr_review_comments(pr_number: int, repo: str | None = None) -> list[ReviewComment]: """Fetch review comments for a PR.""" cmd = ["gh", "api", f"/repos/{'{owner}/{repo}' if not repo else repo}/pulls/{pr_number}/comments"] if not repo: # Try to get repo from git remote code, stdout, _ = run_command(["gh", "repo", "view", "--json", "nameWithOwner"]) if code == 0: try: repo = json.loads(stdout).get("nameWithOwner", "") cmd = ["gh", "api", f"/repos/{repo}/pulls/{pr_number}/comments"] except: pass
code, stdout, stderr = run_command(cmd)
if code != 0:
return []
try:
comments_data = json.loads(stdout) if stdout.strip() else []
return [
ReviewComment(
pr_number=pr_number,
reviewer=c.get("user", {}).get("login", "unknown"),
body=c.get("body", ""),
path=c.get("path", ""),
created_at=c.get("created_at", ""),
state="COMMENTED"
)
for c in comments_data
]
except json.JSONDecodeError:
return []
Review comment patterns to look for
REVIEW_PATTERNS = { "code_style": { "patterns": [ r"naming|variable name|function name|class name", r"format|indent|spacing|whitespace", r"consistent|convention|style guide", ], "severity": "low", "recommendation": "Follow project style guide and naming conventions" }, "error_handling": { "patterns": [ r"error handling|exception|try.catch|handle.*error", r"null check|undefined|optional chaining", r"edge case|boundary|corner case", ], "severity": "high", "recommendation": "Add comprehensive error handling and edge case coverage" }, "testing": { "patterns": [ r"test|coverage|unit test|integration test", r"assert|verify|mock|stub", r"test case|test scenario", ], "severity": "high", "recommendation": "Improve test coverage for new functionality" }, "documentation": { "patterns": [ r"document|comment|docstring|jsdoc", r"readme|explain|clarify|description", r"why|reason|rationale", ], "severity": "medium", "recommendation": "Add documentation explaining complex logic" }, "performance": { "patterns": [ r"performance|optimization|efficient|slow", r"memory|leak|cache|memoize", r"complexity|O(n|big-o", ], "severity": "medium", "recommendation": "Consider performance implications and optimizations" }, "security": { "patterns": [ r"security|vulnerab|injection|sanitize", r"auth|permission|access control", r"encrypt|hash|secret|credential", ], "severity": "high", "recommendation": "Review security implications and add safeguards" }, "architecture": { "patterns": [ r"refactor|abstraction|interface|coupling", r"separation of concerns|single responsibility", r"design pattern|architecture|structure", ], "severity": "medium", "recommendation": "Consider architectural improvements for maintainability" }, "type_safety": { "patterns": [ r"type|typing|typescript|any type", r"interface|generic|strict", r"type error|type check", ], "severity": "medium", "recommendation": "Strengthen type definitions and avoid 'any'" }, }
def categorize_comment(comment: str) -> list[tuple[str, str]]: """Categorize a review comment by pattern.""" matches = [] comment_lower = comment.lower()
for category, config in REVIEW_PATTERNS.items():
for pattern in config["patterns"]:
if re.search(pattern, comment_lower):
matches.append((category, config["severity"]))
break
return matches if matches else [("general", "low")]
def extract_learning_patterns(comments: list[ReviewComment]) -> list[LearningPattern]: """Extract learning patterns from review comments.""" category_data: dict[str, dict] = defaultdict(lambda: { "count": 0, "examples": [], "severity": "low" })
for comment in comments:
if len(comment.body) < 10: # Skip very short comments
continue
categories = categorize_comment(comment.body)
for category, severity in categories:
category_data[category]["count"] += 1
if len(category_data[category]["examples"]) < 3:
truncated = comment.body[:200] + "..." if len(comment.body) > 200 else comment.body
category_data[category]["examples"].append(truncated)
# Keep highest severity
if severity == "high" or (severity == "medium" and category_data[category]["severity"] == "low"):
category_data[category]["severity"] = severity
patterns = []
for category, data in category_data.items():
if data["count"] < 2: # Minimum frequency
continue
config = REVIEW_PATTERNS.get(category, {})
patterns.append(LearningPattern(
category=category,
description=category.replace("_", " ").title(),
frequency=data["count"],
examples=data["examples"],
recommendation=config.get("recommendation", "Review feedback and improve"),
severity=data["severity"]
))
return sorted(patterns, key=lambda x: (-["high", "medium", "low"].index(x.severity), -x.frequency))
def get_top_reviewers(comments: list[ReviewComment]) -> list[tuple[str, int]]: """Get top reviewers by comment count.""" reviewer_counts: dict[str, int] = defaultdict(int) for comment in comments: reviewer_counts[comment.reviewer] += 1 return sorted(reviewer_counts.items(), key=lambda x: -x[1])[:10]
def generate_report( patterns: list[LearningPattern], comments: list[ReviewComment], author: str | None ) -> str: """Generate the learning advisor report.""" total_comments = len(comments) top_reviewers = get_top_reviewers(comments)
lines = [
"# PR Review Learning Report",
"",
f"**Author:** {author or 'All contributors'}",
f"**Total Review Comments Analyzed:** {total_comments}",
f"**Patterns Identified:** {len(patterns)}",
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"---",
"",
]
if not patterns:
lines.extend([
"No significant patterns found in review feedback.",
"",
"This could mean:",
"- Code quality is consistently high",
"- Not enough review comments to analyze",
"- Reviews are too brief to categorize",
])
else:
# Key insights
high_priority = [p for p in patterns if p.severity == "high"]
if high_priority:
lines.extend([
"## Key Areas for Improvement",
"",
])
for pattern in high_priority:
lines.append(f"- **{pattern.description}** ({pattern.frequency} occurrences) - {pattern.recommendation}")
lines.append("")
# Detailed patterns
lines.extend([
"## Detailed Pattern Analysis",
"",
])
for i, pattern in enumerate(patterns[:10], 1):
emoji = {"high": "", "medium": "", "low": ""}[pattern.severity]
lines.extend([
f"### {i}. {emoji} {pattern.description}",
"",
f"**Frequency:** {pattern.frequency} comments",
f"**Severity:** {pattern.severity.title()}",
f"**Recommendation:** {pattern.recommendation}",
"",
"**Example Feedback:**",
])
for example in pattern.examples[:2]:
lines.append(f"> {example}")
lines.append("")
lines.append("")
# Summary table
lines.extend([
"## Summary by Category",
"",
"| Category | Count | Severity | Action |",
"|----------|-------|----------|--------|",
])
for pattern in patterns:
lines.append(
f"| {pattern.description} | {pattern.frequency} | "
f"{pattern.severity} | {pattern.recommendation[:50]}... |"
)
lines.append("")
# Top reviewers
if top_reviewers:
lines.extend([
"## Top Reviewers",
"",
"These reviewers provide the most feedback:",
"",
])
for reviewer, count in top_reviewers[:5]:
lines.append(f"- @{reviewer}: {count} comments")
lines.append("")
# Action plan
lines.extend([
"## Action Plan",
"",
"Based on the analysis, prioritize improvements in this order:",
"",
])
priority = 1
for pattern in patterns[:5]:
if pattern.frequency >= 2:
lines.append(f"{priority}. **{pattern.description}**: {pattern.recommendation}")
priority += 1
lines.append("")
lines.extend([
"---",
"*Generated by CODITECT PR Learning Advisor*",
])
return "\n".join(lines)
def main(): parser = argparse.ArgumentParser( description="Extract learning patterns from PR review feedback" ) parser.add_argument( "--prs", "-n", type=int, default=50, help="Number of PRs to analyze (default: 50)" ) parser.add_argument( "--author", "-a", type=str, default=None, help="Filter by PR author" ) parser.add_argument( "--repo", "-r", type=str, default=None, help="GitHub repository (owner/repo format)" ) parser.add_argument( "--team", action="store_true", help="Analyze team-wide patterns (all authors)" ) parser.add_argument( "--output", "-o", type=str, default=None, help="Output file path (default: stdout)" ) parser.add_argument( "--json", action="store_true", help="Output as JSON instead of Markdown" )
args = parser.parse_args()
author = None if args.team else args.author
print(f"Fetching {args.prs} merged PRs...", file=sys.stderr)
prs = get_prs(args.prs, author, args.repo)
if not prs:
print("No PRs found.", file=sys.stderr)
sys.exit(0)
print(f"Found {len(prs)} PRs, fetching review comments...", file=sys.stderr)
all_comments: list[ReviewComment] = []
for pr in prs:
pr_number = pr.get("number")
if pr_number:
comments = get_pr_review_comments(pr_number, args.repo)
all_comments.extend(comments)
# Also include inline reviews
for review in pr.get("reviews", []):
if review.get("body"):
all_comments.append(ReviewComment(
pr_number=pr_number,
reviewer=review.get("author", {}).get("login", "unknown"),
body=review.get("body", ""),
path="",
created_at=review.get("submittedAt", ""),
state=review.get("state", "COMMENTED")
))
print(f"Analyzing {len(all_comments)} review comments...", file=sys.stderr)
patterns = extract_learning_patterns(all_comments)
if args.json:
output = json.dumps({
"total_comments": len(all_comments),
"patterns": [
{
"category": p.category,
"frequency": p.frequency,
"severity": p.severity,
"recommendation": p.recommendation
}
for p in patterns
]
}, indent=2)
else:
output = generate_report(patterns, all_comments, author)
if args.output:
from pathlib import Path
Path(args.output).write_text(output)
print(f"Report written to: {args.output}", file=sys.stderr)
else:
print(output)
if name == "main": main()