Implementation Blueprint: Reviewer Council for Coditect
Overview
This document provides production-ready implementation patterns for integrating the LLM Council's peer review mechanism into Coditect's multi-agent architecture.
Module Structure
coditect/
├── agents/
│ ├── __init__.py
│ ├── base.py
│ ├── orchestrator.py
│ ├── architect.py
│ ├── implementer.py
│ └── council/
│ ├── __init__.py
│ ├── reviewer.py # Individual reviewer agent
│ ├── council.py # Council orchestration
│ ├── chairman.py # Synthesis agent
│ ├── anonymizer.py # Label mapping
│ ├── ranking.py # Ranking aggregation
│ └── audit.py # Compliance trail
├── providers/
│ ├── __init__.py
│ ├── router.py # Multi-model routing
│ ├── openrouter.py # OpenRouter adapter
│ └── circuit_breaker.py # Fault tolerance
├── compliance/
│ ├── __init__.py
│ ├── pii_redactor.py # Pre-dispatch sanitization
│ ├── audit_logger.py # Immutable logging
│ └── signatures.py # Electronic signatures
└── storage/
├── __init__.py
├── checkpoints.py # State persistence
└── foundation.py # FoundationDB adapter
Core Implementation
1. Reviewer Base Class
# coditect/agents/council/reviewer.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import hashlib
import json
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class Finding:
"""Individual review finding with location and severity."""
id: str
severity: Severity
category: str
location: str # file:line or AST path
title: str
description: str
recommendation: str
confidence: float # 0.0 - 1.0
evidence: Optional[str] = None # Code snippet or reference
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'severity': self.severity.value,
'category': self.category,
'location': self.location,
'title': self.title,
'description': self.description,
'recommendation': self.recommendation,
'confidence': self.confidence,
'evidence': self.evidence
}
@property
def is_blocking(self) -> bool:
return self.severity in [Severity.CRITICAL, Severity.HIGH]
@dataclass
class ReviewResult:
"""Complete result from a single reviewer."""
reviewer_id: str
domain: str
findings: List[Finding]
overall_score: float
summary: str
raw_response: str # Full LLM output for audit
token_usage: int
latency_ms: int
model_used: str
timestamp: str
@property
def pass_threshold(self) -> bool:
"""Check if review passes minimum quality bar."""
critical_count = sum(1 for f in self.findings if f.severity == Severity.CRITICAL)
high_count = sum(1 for f in self.findings if f.severity == Severity.HIGH)
return critical_count == 0 and high_count <= 2 and self.overall_score >= 0.7
def compute_hash(self) -> str:
"""Compute deterministic hash for audit trail."""
content = json.dumps({
'reviewer_id': self.reviewer_id,
'domain': self.domain,
'findings': [f.to_dict() for f in self.findings],
'overall_score': self.overall_score,
'summary': self.summary
}, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
class BaseReviewer(ABC):
"""
Abstract base class for specialized code reviewers.
Each reviewer focuses on a specific quality domain.
"""
def __init__(
self,
model: str,
domain: str,
evaluation_rubric: Dict[str, float],
severity_thresholds: Dict[str, int]
):
self.model = model
self.domain = domain
self.evaluation_rubric = evaluation_rubric
self.severity_thresholds = severity_thresholds
self.reviewer_id = f"{domain}-reviewer-{hashlib.md5(model.encode()).hexdigest()[:8]}"
@abstractmethod
def build_system_prompt(self) -> str:
"""Build domain-specific system prompt."""
pass
@abstractmethod
def build_review_prompt(
self,
artifact: 'CodeArtifact',
context: Dict[str, Any]
) -> str:
"""Build review prompt for specific artifact."""
pass
def parse_response(self, response: str) -> ReviewResult:
"""Parse LLM response into structured ReviewResult."""
# Expect JSON response format
try:
data = json.loads(response)
findings = [
Finding(
id=f"{self.reviewer_id}-{i}",
severity=Severity(f['severity']),
category=f['category'],
location=f['location'],
title=f['title'],
description=f['description'],
recommendation=f['recommendation'],
confidence=f.get('confidence', 0.8),
evidence=f.get('evidence')
)
for i, f in enumerate(data.get('findings', []))
]
return ReviewResult(
reviewer_id=self.reviewer_id,
domain=self.domain,
findings=findings,
overall_score=data.get('overall_score', 0.5),
summary=data.get('summary', ''),
raw_response=response,
token_usage=0, # Set by caller
latency_ms=0, # Set by caller
model_used=self.model,
timestamp='' # Set by caller
)
except (json.JSONDecodeError, KeyError) as e:
# Fallback: Create error result
return self._create_error_result(str(e), response)
def _create_error_result(self, error: str, raw: str) -> ReviewResult:
"""Create result indicating parse failure."""
return ReviewResult(
reviewer_id=self.reviewer_id,
domain=self.domain,
findings=[
Finding(
id=f"{self.reviewer_id}-error",
severity=Severity.INFO,
category="parse_error",
location="N/A",
title="Review Parse Error",
description=f"Failed to parse reviewer response: {error}",
recommendation="Manual review required",
confidence=0.0
)
],
overall_score=0.0,
summary=f"Parse error: {error}",
raw_response=raw,
token_usage=0,
latency_ms=0,
model_used=self.model,
timestamp=''
)
class SecurityReviewer(BaseReviewer):
"""Specialized reviewer for security vulnerabilities."""
def __init__(self, model: str = "anthropic/claude-sonnet-4.5"):
super().__init__(
model=model,
domain="security",
evaluation_rubric={
'injection_vulnerabilities': 0.25,
'authentication_issues': 0.20,
'data_exposure': 0.20,
'cryptographic_weaknesses': 0.15,
'input_validation': 0.10,
'dependency_risks': 0.10
},
severity_thresholds={
'sql_injection': 0,
'xss': 0,
'hardcoded_secrets': 0,
'weak_crypto': 1
}
)
def build_system_prompt(self) -> str:
return """You are a senior security engineer reviewing code for vulnerabilities.
Your expertise includes:
- OWASP Top 10 vulnerabilities
- CWE/SANS Top 25 dangerous software errors
- Secure coding practices for Python and TypeScript
- Authentication and authorization patterns
- Cryptographic best practices
- Supply chain security
Evaluation rubric (weights):
- Injection vulnerabilities: 25%
- Authentication issues: 20%
- Data exposure risks: 20%
- Cryptographic weaknesses: 15%
- Input validation: 10%
- Dependency risks: 10%
You must respond with valid JSON only."""
def build_review_prompt(
self,
artifact: 'CodeArtifact',
context: Dict[str, Any]
) -> str:
return f"""Review the following code for security vulnerabilities.
## Code Artifact
- File: {artifact.path}
- Language: {artifact.language}
- Context: {context.get('description', 'N/A')}
```{artifact.language}
{artifact.content}
Instructions
Analyze this code for security issues. For each finding:
- Identify the specific vulnerability type
- Locate the exact line(s) affected
- Explain the risk
- Provide a concrete fix
Respond with JSON: {{ "findings": [ {{ "severity": "critical|high|medium|low|info", "category": "injection|auth|exposure|crypto|validation|dependency", "location": "file:line", "title": "Brief title", "description": "Detailed explanation", "recommendation": "How to fix", "confidence": 0.0-1.0, "evidence": "Relevant code snippet" }} ], "overall_score": 0.0-1.0, "summary": "2-3 sentence summary" }}"""
class ComplianceReviewer(BaseReviewer): """Specialized reviewer for regulatory compliance."""
def __init__(
self,
model: str = "anthropic/claude-sonnet-4.5",
frameworks: List[str] = None
):
self.frameworks = frameworks or ['HIPAA', 'SOC2']
super().__init__(
model=model,
domain="compliance",
evaluation_rubric={
'data_handling': 0.30,
'audit_logging': 0.25,
'access_control': 0.20,
'encryption': 0.15,
'retention': 0.10
},
severity_thresholds={
'phi_exposure': 0,
'missing_audit_log': 1,
'insufficient_encryption': 0
}
)
def build_system_prompt(self) -> str:
frameworks_str = ', '.join(self.frameworks)
return f"""You are a compliance specialist reviewing code for regulatory requirements.
Target frameworks: {frameworks_str}
Your expertise includes:
- HIPAA Privacy and Security Rules
- SOC 2 Trust Service Criteria
- FDA 21 CFR Part 11 (if applicable)
- GDPR data protection requirements
- PCI-DSS for payment data
Evaluation rubric (weights):
- Data handling practices: 30%
- Audit logging completeness: 25%
- Access control implementation: 20%
- Encryption usage: 15%
- Data retention compliance: 10%
You must respond with valid JSON only."""
def build_review_prompt(
self,
artifact: 'CodeArtifact',
context: Dict[str, Any]
) -> str:
data_classification = context.get('data_classification', 'unknown')
return f"""Review the following code for regulatory compliance.
Code Artifact
- File: {artifact.path}
- Language: {artifact.language}
- Data Classification: {data_classification}
- Target Frameworks: {', '.join(self.frameworks)}
{artifact.content}
Compliance Requirements Context
{context.get('compliance_context', 'Standard compliance review required.')}
Instructions
Analyze this code for compliance violations. Consider:
- How sensitive data is handled
- Whether audit trails are complete
- Access control enforcement
- Encryption at rest and in transit
- Data retention and deletion
Respond with JSON: {{ "findings": [ {{ "severity": "critical|high|medium|low|info", "category": "data_handling|audit|access|encryption|retention", "location": "file:line", "title": "Brief title", "description": "Detailed explanation with framework reference", "recommendation": "How to achieve compliance", "confidence": 0.0-1.0, "evidence": "Relevant code snippet" }} ], "overall_score": 0.0-1.0, "summary": "2-3 sentence compliance assessment" }}"""
class PerformanceReviewer(BaseReviewer): """Specialized reviewer for performance issues."""
def __init__(self, model: str = "openai/gpt-4o"):
super().__init__(
model=model,
domain="performance",
evaluation_rubric={
'algorithmic_complexity': 0.30,
'memory_usage': 0.25,
'io_efficiency': 0.20,
'concurrency': 0.15,
'caching': 0.10
},
severity_thresholds={
'n_squared_loop': 2,
'memory_leak': 0,
'blocking_io': 3
}
)
def build_system_prompt(self) -> str:
return """You are a performance engineer reviewing code for efficiency issues.
Your expertise includes:
- Algorithm complexity analysis (Big O)
- Memory management and leak detection
- I/O optimization patterns
- Concurrency and parallelization
- Caching strategies
Evaluation rubric (weights):
- Algorithmic complexity: 30%
- Memory usage: 25%
- I/O efficiency: 20%
- Concurrency patterns: 15%
- Caching opportunities: 10%
You must respond with valid JSON only."""
def build_review_prompt(
self,
artifact: 'CodeArtifact',
context: Dict[str, Any]
) -> str:
scale_expectations = context.get('scale', 'standard')
return f"""Review the following code for performance issues.
Code Artifact
- File: {artifact.path}
- Language: {artifact.language}
- Expected Scale: {scale_expectations}
{artifact.content}
Instructions
Analyze this code for performance problems. Consider:
- Time complexity of algorithms
- Space complexity and memory allocation
- I/O patterns (blocking vs async)
- Concurrency correctness
- Caching opportunities
Respond with JSON: {{ "findings": [ {{ "severity": "critical|high|medium|low|info", "category": "complexity|memory|io|concurrency|caching", "location": "file:line", "title": "Brief title", "description": "Detailed explanation with complexity analysis", "recommendation": "How to optimize", "confidence": 0.0-1.0, "evidence": "Relevant code snippet" }} ], "overall_score": 0.0-1.0, "summary": "2-3 sentence performance assessment" }}"""
### 2. Anonymizer Module
```python
# coditect/agents/council/anonymizer.py
from typing import Dict, List, Tuple
from dataclasses import dataclass
import random
import string
@dataclass
class AnonymizedReview:
"""Review with identity hidden behind neutral label."""
label: str
domain: str # Keep domain visible for context
findings_summary: str
overall_score: float
raw_content: str # Sanitized response without identifying info
class ReviewAnonymizer:
"""
Implements LLM Council's anonymization pattern.
Prevents models from favoring their own family's responses.
"""
# Neutral labels that don't hint at provider identity
LABELS = [
'Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon',
'Zeta', 'Eta', 'Theta', 'Iota', 'Kappa'
]
def __init__(self, seed: int = None):
"""Initialize with optional seed for reproducibility."""
self.rng = random.Random(seed)
def anonymize_reviews(
self,
reviews: Dict[str, 'ReviewResult']
) -> Tuple[Dict[str, AnonymizedReview], Dict[str, str]]:
"""
Convert reviewer IDs to neutral labels.
Returns:
- anonymized_reviews: Dict[label, AnonymizedReview]
- label_mapping: Dict[label, original_reviewer_id]
"""
# Shuffle labels to prevent positional bias
available_labels = self.LABELS[:len(reviews)]
self.rng.shuffle(available_labels)
label_mapping = {}
anonymized = {}
for (reviewer_id, review), label in zip(reviews.items(), available_labels):
label_mapping[label] = reviewer_id
# Sanitize content to remove identifying information
sanitized_content = self._sanitize_content(review.raw_response)
anonymized[label] = AnonymizedReview(
label=label,
domain=review.domain,
findings_summary=self._summarize_findings(review.findings),
overall_score=review.overall_score,
raw_content=sanitized_content
)
return anonymized, label_mapping
def _sanitize_content(self, content: str) -> str:
"""Remove provider-identifying information from content."""
# Known provider signatures to remove
provider_patterns = [
('claude', '[ASSISTANT]'),
('gpt', '[ASSISTANT]'),
('gemini', '[ASSISTANT]'),
('llama', '[ASSISTANT]'),
('anthropic', '[PROVIDER]'),
('openai', '[PROVIDER]'),
('google', '[PROVIDER]'),
('meta', '[PROVIDER]')
]
sanitized = content.lower()
for pattern, replacement in provider_patterns:
sanitized = sanitized.replace(pattern, replacement)
return sanitized
def _summarize_findings(self, findings: List['Finding']) -> str:
"""Create summary of findings without revealing source."""
if not findings:
return "No findings reported."
by_severity = {}
for f in findings:
sev = f.severity.value
if sev not in by_severity:
by_severity[sev] = []
by_severity[sev].append(f.title)
summary_parts = []
for severity in ['critical', 'high', 'medium', 'low', 'info']:
if severity in by_severity:
count = len(by_severity[severity])
titles = ', '.join(by_severity[severity][:3])
if count > 3:
titles += f", and {count - 3} more"
summary_parts.append(f"{severity.upper()}: {titles}")
return "; ".join(summary_parts)
def deanonymize_rankings(
self,
rankings: Dict[str, List[str]],
label_mapping: Dict[str, str]
) -> Dict[str, List[str]]:
"""Convert rankings back to original reviewer IDs."""
deanonymized = {}
for ranker, ranked_labels in rankings.items():
deanonymized[ranker] = [
label_mapping.get(label, label)
for label in ranked_labels
]
return deanonymized
3. Ranking Aggregation
# coditect/agents/council/ranking.py
from typing import Dict, List, Tuple
from dataclasses import dataclass
import statistics
@dataclass
class AggregateRanking:
"""Aggregated ranking across all peer evaluations."""
reviewer_id: str
average_position: float # Lower is better
position_variance: float
times_ranked_first: int
times_ranked_last: int
confidence: float # Based on agreement level
class RankingAggregator:
"""
Implements LLM Council's ranking aggregation.
Computes consensus scores from peer evaluations.
"""
def aggregate(
self,
rankings: Dict[str, List[str]],
label_mapping: Dict[str, str]
) -> Tuple[Dict[str, AggregateRanking], float]:
"""
Aggregate peer rankings into consensus scores.
Args:
rankings: Dict[ranker_id, ordered_list_of_labels]
label_mapping: Dict[label, reviewer_id]
Returns:
- aggregate_rankings: Dict[reviewer_id, AggregateRanking]
- consensus_level: float (0-1, higher = more agreement)
"""
# Collect positions for each reviewer
positions: Dict[str, List[int]] = {}
for ranker_id, ranked_labels in rankings.items():
for position, label in enumerate(ranked_labels, start=1):
reviewer_id = label_mapping.get(label, label)
if reviewer_id not in positions:
positions[reviewer_id] = []
positions[reviewer_id].append(position)
# Compute aggregate scores
aggregates = {}
for reviewer_id, pos_list in positions.items():
avg_pos = statistics.mean(pos_list)
variance = statistics.variance(pos_list) if len(pos_list) > 1 else 0.0
aggregates[reviewer_id] = AggregateRanking(
reviewer_id=reviewer_id,
average_position=avg_pos,
position_variance=variance,
times_ranked_first=sum(1 for p in pos_list if p == 1),
times_ranked_last=sum(1 for p in pos_list if p == len(rankings)),
confidence=self._compute_confidence(pos_list)
)
# Compute overall consensus level
consensus = self._compute_consensus(rankings, label_mapping)
return aggregates, consensus
def _compute_confidence(self, positions: List[int]) -> float:
"""Confidence based on position variance. Low variance = high confidence."""
if len(positions) < 2:
return 0.5 # Uncertain with single data point
variance = statistics.variance(positions)
# Normalize: variance of 0 → confidence 1.0, variance of n² → confidence 0.0
max_variance = (len(positions) ** 2) / 4 # Approximate max variance
confidence = 1.0 - min(variance / max_variance, 1.0)
return round(confidence, 3)
def _compute_consensus(
self,
rankings: Dict[str, List[str]],
label_mapping: Dict[str, str]
) -> float:
"""
Compute Kendall's W (coefficient of concordance).
Measures agreement among rankers.
"""
if len(rankings) < 2:
return 1.0 # Perfect agreement with single ranker
# Convert rankings to position matrix
labels = list(label_mapping.keys())
n_items = len(labels)
n_rankers = len(rankings)
# Sum of ranks for each item
rank_sums = {label: 0 for label in labels}
for ranker_id, ranked_labels in rankings.items():
for position, label in enumerate(ranked_labels, start=1):
rank_sums[label] += position
# Mean rank sum
mean_rank_sum = sum(rank_sums.values()) / n_items
# Sum of squared deviations
ss = sum((rs - mean_rank_sum) ** 2 for rs in rank_sums.values())
# Maximum possible sum of squared deviations
max_ss = (n_rankers ** 2 * (n_items ** 3 - n_items)) / 12
if max_ss == 0:
return 1.0
w = ss / max_ss
return round(w, 3)
def get_consensus_interpretation(self, consensus: float) -> str:
"""Human-readable interpretation of consensus level."""
if consensus >= 0.9:
return "Very high agreement - reviewers strongly concur"
elif consensus >= 0.7:
return "Good agreement - reviewers mostly concur"
elif consensus >= 0.5:
return "Moderate agreement - some divergence in evaluations"
elif consensus >= 0.3:
return "Low agreement - significant divergence"
else:
return "Very low agreement - reviewers strongly disagree"
4. Chairman Agent
# coditect/agents/council/chairman.py
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class MergeDecision(Enum):
APPROVE = "approve"
REQUEST_CHANGES = "request_changes"
REJECT = "reject"
@dataclass
class ChairmanVerdict:
"""Final verdict from the council chairman."""
decision: MergeDecision
aggregate_score: float
synthesis: str
key_findings: List[str]
recommendations: List[str]
dissenting_opinions: List[str]
confidence: float
rationale: str # Explicit decision rationale for audit
class ChairmanAgent:
"""
Synthesizes council reviews into actionable verdict.
Adapted from LLM Council's chairman synthesis.
"""
def __init__(
self,
model: str = "anthropic/claude-sonnet-4.5",
decision_thresholds: Dict[str, Any] = None
):
self.model = model
self.thresholds = decision_thresholds or {
'critical_findings_reject': 1, # Any critical → reject
'high_findings_request_changes': 3, # >3 high → request changes
'min_score_approve': 0.7, # Need 0.7+ to approve
'min_consensus_approve': 0.5 # Need 50% agreement to approve
}
def build_synthesis_prompt(
self,
artifact: 'CodeArtifact',
reviews: Dict[str, 'ReviewResult'],
aggregate_rankings: Dict[str, 'AggregateRanking'],
consensus_level: float
) -> str:
"""Build prompt for chairman synthesis."""
reviews_section = self._format_reviews(reviews)
rankings_section = self._format_rankings(aggregate_rankings)
return f"""You are the Chairman of a Code Review Council for a regulated software system.
## Artifact Under Review
- File: {artifact.path}
- Language: {artifact.language}
- Lines: {artifact.line_count}
- Compliance Context: {', '.join(artifact.compliance_tags) if artifact.compliance_tags else 'Standard'}
## Council Reviews
{reviews_section}
## Peer Evaluation Rankings
(Lower average position = higher quality review)
{rankings_section}
## Consensus Level: {consensus_level:.2f}
{self._interpret_consensus(consensus_level)}
## Your Task as Chairman
Synthesize all reviews into a final verdict. Consider:
1. Severity and count of findings across all reviewers
2. Consensus level - low consensus may indicate need for human review
3. Reviewer quality as indicated by peer rankings
4. Compliance context and risk tolerance
Decision Criteria:
- Any CRITICAL finding from compliance/security → REJECT
- >3 HIGH findings → REQUEST_CHANGES
- Aggregate score <0.7 → REQUEST_CHANGES
- Consensus <0.5 with blocking findings → FLAG FOR HUMAN REVIEW
Respond with JSON:
{{
"decision": "approve|request_changes|reject",
"aggregate_score": 0.0-1.0,
"synthesis": "2-3 paragraph synthesis of findings",
"key_findings": ["Most important issues to address"],
"recommendations": ["Specific action items"],
"dissenting_opinions": ["Notable disagreements between reviewers"],
"confidence": 0.0-1.0,
"rationale": "Explicit reasoning for the decision"
}}"""
def _format_reviews(self, reviews: Dict[str, 'ReviewResult']) -> str:
"""Format reviews for chairman prompt."""
sections = []
for reviewer_id, review in reviews.items():
findings_str = "\n".join(
f" - [{f.severity.value.upper()}] {f.title}: {f.description}"
for f in review.findings[:5] # Limit to top 5
)
if len(review.findings) > 5:
findings_str += f"\n ... and {len(review.findings) - 5} more findings"
sections.append(f"""### {review.domain.upper()} Review
Score: {review.overall_score:.2f}
Summary: {review.summary}
Findings:
{findings_str}
""")
return "\n".join(sections)
def _format_rankings(
self,
rankings: Dict[str, 'AggregateRanking']
) -> str:
"""Format rankings for chairman prompt."""
sorted_rankings = sorted(
rankings.values(),
key=lambda r: r.average_position
)
lines = []
for rank in sorted_rankings:
lines.append(
f"- {rank.reviewer_id}: avg position {rank.average_position:.2f}, "
f"confidence {rank.confidence:.2f}"
)
return "\n".join(lines)
def _interpret_consensus(self, level: float) -> str:
"""Interpret consensus level for chairman."""
if level >= 0.7:
return "High agreement - reviewers largely concur on quality assessment"
elif level >= 0.5:
return "Moderate agreement - some divergence warrants attention"
else:
return "Low agreement - significant divergence may require human arbitration"
def parse_verdict(self, response: str) -> ChairmanVerdict:
"""Parse chairman response into structured verdict."""
import json
try:
data = json.loads(response)
return ChairmanVerdict(
decision=MergeDecision(data['decision']),
aggregate_score=data['aggregate_score'],
synthesis=data['synthesis'],
key_findings=data.get('key_findings', []),
recommendations=data.get('recommendations', []),
dissenting_opinions=data.get('dissenting_opinions', []),
confidence=data.get('confidence', 0.5),
rationale=data.get('rationale', '')
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
# Fallback: Conservative rejection on parse failure
return ChairmanVerdict(
decision=MergeDecision.REQUEST_CHANGES,
aggregate_score=0.0,
synthesis=f"Chairman verdict parse error: {e}",
key_findings=["Unable to parse chairman synthesis"],
recommendations=["Manual review required"],
dissenting_opinions=[],
confidence=0.0,
rationale="Parse failure - defaulting to request changes"
)
def apply_thresholds(
self,
verdict: ChairmanVerdict,
reviews: Dict[str, 'ReviewResult']
) -> ChairmanVerdict:
"""
Apply hard thresholds regardless of chairman's judgment.
Ensures compliance requirements are met.
"""
# Count findings by severity across all reviews
critical_count = 0
high_count = 0
for review in reviews.values():
for finding in review.findings:
if finding.severity.value == 'critical':
critical_count += 1
elif finding.severity.value == 'high':
high_count += 1
# Override chairman if thresholds violated
if critical_count >= self.thresholds['critical_findings_reject']:
return ChairmanVerdict(
decision=MergeDecision.REJECT,
aggregate_score=verdict.aggregate_score,
synthesis=verdict.synthesis,
key_findings=verdict.key_findings,
recommendations=verdict.recommendations,
dissenting_opinions=verdict.dissenting_opinions,
confidence=1.0, # Threshold-based decisions are certain
rationale=f"OVERRIDE: {critical_count} critical findings exceed threshold"
)
if high_count > self.thresholds['high_findings_request_changes']:
if verdict.decision == MergeDecision.APPROVE:
return ChairmanVerdict(
decision=MergeDecision.REQUEST_CHANGES,
aggregate_score=verdict.aggregate_score,
synthesis=verdict.synthesis,
key_findings=verdict.key_findings,
recommendations=verdict.recommendations,
dissenting_opinions=verdict.dissenting_opinions,
confidence=1.0,
rationale=f"OVERRIDE: {high_count} high findings exceed threshold"
)
return verdict
Integration Example
# Example: Running a full council review
async def run_council_review(artifact: CodeArtifact, context: Dict) -> CouncilVerdict:
"""Execute full council review workflow."""
# Initialize reviewers
reviewers = [
SecurityReviewer(model="anthropic/claude-sonnet-4.5"),
ComplianceReviewer(model="anthropic/claude-sonnet-4.5", frameworks=['HIPAA']),
PerformanceReviewer(model="openai/gpt-4o"),
]
# Initialize council
council = ReviewerCouncil(
reviewers=[r for r in reviewers],
chairman_model="anthropic/claude-sonnet-4.5",
checkpoint_store=FoundationDBCheckpointStore(),
compliance_mode=True
)
# Execute review
verdict = await council.review_artifact(artifact, context)
# Log for audit
await audit_logger.log_council_verdict(
artifact_hash=artifact.compute_hash(),
verdict=verdict,
timestamp=datetime.utcnow()
)
return verdict
Next Steps
- Implement circuit breakers for individual reviewer failures
- Add PII redaction before dispatching to external models
- Build FoundationDB schema for checkpoint persistence
- Create compliance report generator from council verdicts
- Implement electronic signature integration for FDA 21 CFR Part 11