Skip to main content

Implementation Blueprint: Reviewer Council for Coditect

Overview

This document provides production-ready implementation patterns for integrating the LLM Council's peer review mechanism into Coditect's multi-agent architecture.


Module Structure

coditect/
├── agents/
│ ├── __init__.py
│ ├── base.py
│ ├── orchestrator.py
│ ├── architect.py
│ ├── implementer.py
│ └── council/
│ ├── __init__.py
│ ├── reviewer.py # Individual reviewer agent
│ ├── council.py # Council orchestration
│ ├── chairman.py # Synthesis agent
│ ├── anonymizer.py # Label mapping
│ ├── ranking.py # Ranking aggregation
│ └── audit.py # Compliance trail
├── providers/
│ ├── __init__.py
│ ├── router.py # Multi-model routing
│ ├── openrouter.py # OpenRouter adapter
│ └── circuit_breaker.py # Fault tolerance
├── compliance/
│ ├── __init__.py
│ ├── pii_redactor.py # Pre-dispatch sanitization
│ ├── audit_logger.py # Immutable logging
│ └── signatures.py # Electronic signatures
└── storage/
├── __init__.py
├── checkpoints.py # State persistence
└── foundation.py # FoundationDB adapter

Core Implementation

1. Reviewer Base Class

# coditect/agents/council/reviewer.py

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from enum import Enum
import hashlib
import json

class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"

@dataclass
class Finding:
"""Individual review finding with location and severity."""

id: str
severity: Severity
category: str
location: str # file:line or AST path
title: str
description: str
recommendation: str
confidence: float # 0.0 - 1.0
evidence: Optional[str] = None # Code snippet or reference

def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'severity': self.severity.value,
'category': self.category,
'location': self.location,
'title': self.title,
'description': self.description,
'recommendation': self.recommendation,
'confidence': self.confidence,
'evidence': self.evidence
}

@property
def is_blocking(self) -> bool:
return self.severity in [Severity.CRITICAL, Severity.HIGH]

@dataclass
class ReviewResult:
"""Complete result from a single reviewer."""

reviewer_id: str
domain: str
findings: List[Finding]
overall_score: float
summary: str
raw_response: str # Full LLM output for audit
token_usage: int
latency_ms: int
model_used: str
timestamp: str

@property
def pass_threshold(self) -> bool:
"""Check if review passes minimum quality bar."""
critical_count = sum(1 for f in self.findings if f.severity == Severity.CRITICAL)
high_count = sum(1 for f in self.findings if f.severity == Severity.HIGH)
return critical_count == 0 and high_count <= 2 and self.overall_score >= 0.7

def compute_hash(self) -> str:
"""Compute deterministic hash for audit trail."""
content = json.dumps({
'reviewer_id': self.reviewer_id,
'domain': self.domain,
'findings': [f.to_dict() for f in self.findings],
'overall_score': self.overall_score,
'summary': self.summary
}, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()


class BaseReviewer(ABC):
"""
Abstract base class for specialized code reviewers.
Each reviewer focuses on a specific quality domain.
"""

def __init__(
self,
model: str,
domain: str,
evaluation_rubric: Dict[str, float],
severity_thresholds: Dict[str, int]
):
self.model = model
self.domain = domain
self.evaluation_rubric = evaluation_rubric
self.severity_thresholds = severity_thresholds
self.reviewer_id = f"{domain}-reviewer-{hashlib.md5(model.encode()).hexdigest()[:8]}"

@abstractmethod
def build_system_prompt(self) -> str:
"""Build domain-specific system prompt."""
pass

@abstractmethod
def build_review_prompt(
self,
artifact: 'CodeArtifact',
context: Dict[str, Any]
) -> str:
"""Build review prompt for specific artifact."""
pass

def parse_response(self, response: str) -> ReviewResult:
"""Parse LLM response into structured ReviewResult."""
# Expect JSON response format
try:
data = json.loads(response)
findings = [
Finding(
id=f"{self.reviewer_id}-{i}",
severity=Severity(f['severity']),
category=f['category'],
location=f['location'],
title=f['title'],
description=f['description'],
recommendation=f['recommendation'],
confidence=f.get('confidence', 0.8),
evidence=f.get('evidence')
)
for i, f in enumerate(data.get('findings', []))
]

return ReviewResult(
reviewer_id=self.reviewer_id,
domain=self.domain,
findings=findings,
overall_score=data.get('overall_score', 0.5),
summary=data.get('summary', ''),
raw_response=response,
token_usage=0, # Set by caller
latency_ms=0, # Set by caller
model_used=self.model,
timestamp='' # Set by caller
)
except (json.JSONDecodeError, KeyError) as e:
# Fallback: Create error result
return self._create_error_result(str(e), response)

def _create_error_result(self, error: str, raw: str) -> ReviewResult:
"""Create result indicating parse failure."""
return ReviewResult(
reviewer_id=self.reviewer_id,
domain=self.domain,
findings=[
Finding(
id=f"{self.reviewer_id}-error",
severity=Severity.INFO,
category="parse_error",
location="N/A",
title="Review Parse Error",
description=f"Failed to parse reviewer response: {error}",
recommendation="Manual review required",
confidence=0.0
)
],
overall_score=0.0,
summary=f"Parse error: {error}",
raw_response=raw,
token_usage=0,
latency_ms=0,
model_used=self.model,
timestamp=''
)


class SecurityReviewer(BaseReviewer):
"""Specialized reviewer for security vulnerabilities."""

def __init__(self, model: str = "anthropic/claude-sonnet-4.5"):
super().__init__(
model=model,
domain="security",
evaluation_rubric={
'injection_vulnerabilities': 0.25,
'authentication_issues': 0.20,
'data_exposure': 0.20,
'cryptographic_weaknesses': 0.15,
'input_validation': 0.10,
'dependency_risks': 0.10
},
severity_thresholds={
'sql_injection': 0,
'xss': 0,
'hardcoded_secrets': 0,
'weak_crypto': 1
}
)

def build_system_prompt(self) -> str:
return """You are a senior security engineer reviewing code for vulnerabilities.

Your expertise includes:
- OWASP Top 10 vulnerabilities
- CWE/SANS Top 25 dangerous software errors
- Secure coding practices for Python and TypeScript
- Authentication and authorization patterns
- Cryptographic best practices
- Supply chain security

Evaluation rubric (weights):
- Injection vulnerabilities: 25%
- Authentication issues: 20%
- Data exposure risks: 20%
- Cryptographic weaknesses: 15%
- Input validation: 10%
- Dependency risks: 10%

You must respond with valid JSON only."""

def build_review_prompt(
self,
artifact: 'CodeArtifact',
context: Dict[str, Any]
) -> str:
return f"""Review the following code for security vulnerabilities.

## Code Artifact
- File: {artifact.path}
- Language: {artifact.language}
- Context: {context.get('description', 'N/A')}

```{artifact.language}
{artifact.content}

Instructions

Analyze this code for security issues. For each finding:

  1. Identify the specific vulnerability type
  2. Locate the exact line(s) affected
  3. Explain the risk
  4. Provide a concrete fix

Respond with JSON: {{ "findings": [ {{ "severity": "critical|high|medium|low|info", "category": "injection|auth|exposure|crypto|validation|dependency", "location": "file:line", "title": "Brief title", "description": "Detailed explanation", "recommendation": "How to fix", "confidence": 0.0-1.0, "evidence": "Relevant code snippet" }} ], "overall_score": 0.0-1.0, "summary": "2-3 sentence summary" }}"""

class ComplianceReviewer(BaseReviewer): """Specialized reviewer for regulatory compliance."""

def __init__(
self,
model: str = "anthropic/claude-sonnet-4.5",
frameworks: List[str] = None
):
self.frameworks = frameworks or ['HIPAA', 'SOC2']
super().__init__(
model=model,
domain="compliance",
evaluation_rubric={
'data_handling': 0.30,
'audit_logging': 0.25,
'access_control': 0.20,
'encryption': 0.15,
'retention': 0.10
},
severity_thresholds={
'phi_exposure': 0,
'missing_audit_log': 1,
'insufficient_encryption': 0
}
)

def build_system_prompt(self) -> str:
frameworks_str = ', '.join(self.frameworks)
return f"""You are a compliance specialist reviewing code for regulatory requirements.

Target frameworks: {frameworks_str}

Your expertise includes:

  • HIPAA Privacy and Security Rules
  • SOC 2 Trust Service Criteria
  • FDA 21 CFR Part 11 (if applicable)
  • GDPR data protection requirements
  • PCI-DSS for payment data

Evaluation rubric (weights):

  • Data handling practices: 30%
  • Audit logging completeness: 25%
  • Access control implementation: 20%
  • Encryption usage: 15%
  • Data retention compliance: 10%

You must respond with valid JSON only."""

def build_review_prompt(
self,
artifact: 'CodeArtifact',
context: Dict[str, Any]
) -> str:
data_classification = context.get('data_classification', 'unknown')

return f"""Review the following code for regulatory compliance.

Code Artifact

  • File: {artifact.path}
  • Language: {artifact.language}
  • Data Classification: {data_classification}
  • Target Frameworks: {', '.join(self.frameworks)}
{artifact.content}

Compliance Requirements Context

{context.get('compliance_context', 'Standard compliance review required.')}

Instructions

Analyze this code for compliance violations. Consider:

  1. How sensitive data is handled
  2. Whether audit trails are complete
  3. Access control enforcement
  4. Encryption at rest and in transit
  5. Data retention and deletion

Respond with JSON: {{ "findings": [ {{ "severity": "critical|high|medium|low|info", "category": "data_handling|audit|access|encryption|retention", "location": "file:line", "title": "Brief title", "description": "Detailed explanation with framework reference", "recommendation": "How to achieve compliance", "confidence": 0.0-1.0, "evidence": "Relevant code snippet" }} ], "overall_score": 0.0-1.0, "summary": "2-3 sentence compliance assessment" }}"""

class PerformanceReviewer(BaseReviewer): """Specialized reviewer for performance issues."""

def __init__(self, model: str = "openai/gpt-4o"):
super().__init__(
model=model,
domain="performance",
evaluation_rubric={
'algorithmic_complexity': 0.30,
'memory_usage': 0.25,
'io_efficiency': 0.20,
'concurrency': 0.15,
'caching': 0.10
},
severity_thresholds={
'n_squared_loop': 2,
'memory_leak': 0,
'blocking_io': 3
}
)

def build_system_prompt(self) -> str:
return """You are a performance engineer reviewing code for efficiency issues.

Your expertise includes:

  • Algorithm complexity analysis (Big O)
  • Memory management and leak detection
  • I/O optimization patterns
  • Concurrency and parallelization
  • Caching strategies

Evaluation rubric (weights):

  • Algorithmic complexity: 30%
  • Memory usage: 25%
  • I/O efficiency: 20%
  • Concurrency patterns: 15%
  • Caching opportunities: 10%

You must respond with valid JSON only."""

def build_review_prompt(
self,
artifact: 'CodeArtifact',
context: Dict[str, Any]
) -> str:
scale_expectations = context.get('scale', 'standard')

return f"""Review the following code for performance issues.

Code Artifact

  • File: {artifact.path}
  • Language: {artifact.language}
  • Expected Scale: {scale_expectations}
{artifact.content}

Instructions

Analyze this code for performance problems. Consider:

  1. Time complexity of algorithms
  2. Space complexity and memory allocation
  3. I/O patterns (blocking vs async)
  4. Concurrency correctness
  5. Caching opportunities

Respond with JSON: {{ "findings": [ {{ "severity": "critical|high|medium|low|info", "category": "complexity|memory|io|concurrency|caching", "location": "file:line", "title": "Brief title", "description": "Detailed explanation with complexity analysis", "recommendation": "How to optimize", "confidence": 0.0-1.0, "evidence": "Relevant code snippet" }} ], "overall_score": 0.0-1.0, "summary": "2-3 sentence performance assessment" }}"""


### 2. Anonymizer Module

```python
# coditect/agents/council/anonymizer.py

from typing import Dict, List, Tuple
from dataclasses import dataclass
import random
import string

@dataclass
class AnonymizedReview:
"""Review with identity hidden behind neutral label."""
label: str
domain: str # Keep domain visible for context
findings_summary: str
overall_score: float
raw_content: str # Sanitized response without identifying info

class ReviewAnonymizer:
"""
Implements LLM Council's anonymization pattern.
Prevents models from favoring their own family's responses.
"""

# Neutral labels that don't hint at provider identity
LABELS = [
'Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon',
'Zeta', 'Eta', 'Theta', 'Iota', 'Kappa'
]

def __init__(self, seed: int = None):
"""Initialize with optional seed for reproducibility."""
self.rng = random.Random(seed)

def anonymize_reviews(
self,
reviews: Dict[str, 'ReviewResult']
) -> Tuple[Dict[str, AnonymizedReview], Dict[str, str]]:
"""
Convert reviewer IDs to neutral labels.

Returns:
- anonymized_reviews: Dict[label, AnonymizedReview]
- label_mapping: Dict[label, original_reviewer_id]
"""

# Shuffle labels to prevent positional bias
available_labels = self.LABELS[:len(reviews)]
self.rng.shuffle(available_labels)

label_mapping = {}
anonymized = {}

for (reviewer_id, review), label in zip(reviews.items(), available_labels):
label_mapping[label] = reviewer_id

# Sanitize content to remove identifying information
sanitized_content = self._sanitize_content(review.raw_response)

anonymized[label] = AnonymizedReview(
label=label,
domain=review.domain,
findings_summary=self._summarize_findings(review.findings),
overall_score=review.overall_score,
raw_content=sanitized_content
)

return anonymized, label_mapping

def _sanitize_content(self, content: str) -> str:
"""Remove provider-identifying information from content."""

# Known provider signatures to remove
provider_patterns = [
('claude', '[ASSISTANT]'),
('gpt', '[ASSISTANT]'),
('gemini', '[ASSISTANT]'),
('llama', '[ASSISTANT]'),
('anthropic', '[PROVIDER]'),
('openai', '[PROVIDER]'),
('google', '[PROVIDER]'),
('meta', '[PROVIDER]')
]

sanitized = content.lower()
for pattern, replacement in provider_patterns:
sanitized = sanitized.replace(pattern, replacement)

return sanitized

def _summarize_findings(self, findings: List['Finding']) -> str:
"""Create summary of findings without revealing source."""

if not findings:
return "No findings reported."

by_severity = {}
for f in findings:
sev = f.severity.value
if sev not in by_severity:
by_severity[sev] = []
by_severity[sev].append(f.title)

summary_parts = []
for severity in ['critical', 'high', 'medium', 'low', 'info']:
if severity in by_severity:
count = len(by_severity[severity])
titles = ', '.join(by_severity[severity][:3])
if count > 3:
titles += f", and {count - 3} more"
summary_parts.append(f"{severity.upper()}: {titles}")

return "; ".join(summary_parts)

def deanonymize_rankings(
self,
rankings: Dict[str, List[str]],
label_mapping: Dict[str, str]
) -> Dict[str, List[str]]:
"""Convert rankings back to original reviewer IDs."""

deanonymized = {}
for ranker, ranked_labels in rankings.items():
deanonymized[ranker] = [
label_mapping.get(label, label)
for label in ranked_labels
]

return deanonymized

3. Ranking Aggregation

# coditect/agents/council/ranking.py

from typing import Dict, List, Tuple
from dataclasses import dataclass
import statistics

@dataclass
class AggregateRanking:
"""Aggregated ranking across all peer evaluations."""
reviewer_id: str
average_position: float # Lower is better
position_variance: float
times_ranked_first: int
times_ranked_last: int
confidence: float # Based on agreement level

class RankingAggregator:
"""
Implements LLM Council's ranking aggregation.
Computes consensus scores from peer evaluations.
"""

def aggregate(
self,
rankings: Dict[str, List[str]],
label_mapping: Dict[str, str]
) -> Tuple[Dict[str, AggregateRanking], float]:
"""
Aggregate peer rankings into consensus scores.

Args:
rankings: Dict[ranker_id, ordered_list_of_labels]
label_mapping: Dict[label, reviewer_id]

Returns:
- aggregate_rankings: Dict[reviewer_id, AggregateRanking]
- consensus_level: float (0-1, higher = more agreement)
"""

# Collect positions for each reviewer
positions: Dict[str, List[int]] = {}

for ranker_id, ranked_labels in rankings.items():
for position, label in enumerate(ranked_labels, start=1):
reviewer_id = label_mapping.get(label, label)
if reviewer_id not in positions:
positions[reviewer_id] = []
positions[reviewer_id].append(position)

# Compute aggregate scores
aggregates = {}
for reviewer_id, pos_list in positions.items():
avg_pos = statistics.mean(pos_list)
variance = statistics.variance(pos_list) if len(pos_list) > 1 else 0.0

aggregates[reviewer_id] = AggregateRanking(
reviewer_id=reviewer_id,
average_position=avg_pos,
position_variance=variance,
times_ranked_first=sum(1 for p in pos_list if p == 1),
times_ranked_last=sum(1 for p in pos_list if p == len(rankings)),
confidence=self._compute_confidence(pos_list)
)

# Compute overall consensus level
consensus = self._compute_consensus(rankings, label_mapping)

return aggregates, consensus

def _compute_confidence(self, positions: List[int]) -> float:
"""Confidence based on position variance. Low variance = high confidence."""
if len(positions) < 2:
return 0.5 # Uncertain with single data point

variance = statistics.variance(positions)
# Normalize: variance of 0 → confidence 1.0, variance of n² → confidence 0.0
max_variance = (len(positions) ** 2) / 4 # Approximate max variance
confidence = 1.0 - min(variance / max_variance, 1.0)
return round(confidence, 3)

def _compute_consensus(
self,
rankings: Dict[str, List[str]],
label_mapping: Dict[str, str]
) -> float:
"""
Compute Kendall's W (coefficient of concordance).
Measures agreement among rankers.
"""
if len(rankings) < 2:
return 1.0 # Perfect agreement with single ranker

# Convert rankings to position matrix
labels = list(label_mapping.keys())
n_items = len(labels)
n_rankers = len(rankings)

# Sum of ranks for each item
rank_sums = {label: 0 for label in labels}
for ranker_id, ranked_labels in rankings.items():
for position, label in enumerate(ranked_labels, start=1):
rank_sums[label] += position

# Mean rank sum
mean_rank_sum = sum(rank_sums.values()) / n_items

# Sum of squared deviations
ss = sum((rs - mean_rank_sum) ** 2 for rs in rank_sums.values())

# Maximum possible sum of squared deviations
max_ss = (n_rankers ** 2 * (n_items ** 3 - n_items)) / 12

if max_ss == 0:
return 1.0

w = ss / max_ss
return round(w, 3)

def get_consensus_interpretation(self, consensus: float) -> str:
"""Human-readable interpretation of consensus level."""
if consensus >= 0.9:
return "Very high agreement - reviewers strongly concur"
elif consensus >= 0.7:
return "Good agreement - reviewers mostly concur"
elif consensus >= 0.5:
return "Moderate agreement - some divergence in evaluations"
elif consensus >= 0.3:
return "Low agreement - significant divergence"
else:
return "Very low agreement - reviewers strongly disagree"

4. Chairman Agent

# coditect/agents/council/chairman.py

from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum

class MergeDecision(Enum):
APPROVE = "approve"
REQUEST_CHANGES = "request_changes"
REJECT = "reject"

@dataclass
class ChairmanVerdict:
"""Final verdict from the council chairman."""
decision: MergeDecision
aggregate_score: float
synthesis: str
key_findings: List[str]
recommendations: List[str]
dissenting_opinions: List[str]
confidence: float
rationale: str # Explicit decision rationale for audit

class ChairmanAgent:
"""
Synthesizes council reviews into actionable verdict.
Adapted from LLM Council's chairman synthesis.
"""

def __init__(
self,
model: str = "anthropic/claude-sonnet-4.5",
decision_thresholds: Dict[str, Any] = None
):
self.model = model
self.thresholds = decision_thresholds or {
'critical_findings_reject': 1, # Any critical → reject
'high_findings_request_changes': 3, # >3 high → request changes
'min_score_approve': 0.7, # Need 0.7+ to approve
'min_consensus_approve': 0.5 # Need 50% agreement to approve
}

def build_synthesis_prompt(
self,
artifact: 'CodeArtifact',
reviews: Dict[str, 'ReviewResult'],
aggregate_rankings: Dict[str, 'AggregateRanking'],
consensus_level: float
) -> str:
"""Build prompt for chairman synthesis."""

reviews_section = self._format_reviews(reviews)
rankings_section = self._format_rankings(aggregate_rankings)

return f"""You are the Chairman of a Code Review Council for a regulated software system.

## Artifact Under Review
- File: {artifact.path}
- Language: {artifact.language}
- Lines: {artifact.line_count}
- Compliance Context: {', '.join(artifact.compliance_tags) if artifact.compliance_tags else 'Standard'}

## Council Reviews

{reviews_section}

## Peer Evaluation Rankings
(Lower average position = higher quality review)

{rankings_section}

## Consensus Level: {consensus_level:.2f}
{self._interpret_consensus(consensus_level)}

## Your Task as Chairman

Synthesize all reviews into a final verdict. Consider:
1. Severity and count of findings across all reviewers
2. Consensus level - low consensus may indicate need for human review
3. Reviewer quality as indicated by peer rankings
4. Compliance context and risk tolerance

Decision Criteria:
- Any CRITICAL finding from compliance/security → REJECT
- >3 HIGH findings → REQUEST_CHANGES
- Aggregate score <0.7 → REQUEST_CHANGES
- Consensus <0.5 with blocking findings → FLAG FOR HUMAN REVIEW

Respond with JSON:
{{
"decision": "approve|request_changes|reject",
"aggregate_score": 0.0-1.0,
"synthesis": "2-3 paragraph synthesis of findings",
"key_findings": ["Most important issues to address"],
"recommendations": ["Specific action items"],
"dissenting_opinions": ["Notable disagreements between reviewers"],
"confidence": 0.0-1.0,
"rationale": "Explicit reasoning for the decision"
}}"""

def _format_reviews(self, reviews: Dict[str, 'ReviewResult']) -> str:
"""Format reviews for chairman prompt."""
sections = []
for reviewer_id, review in reviews.items():
findings_str = "\n".join(
f" - [{f.severity.value.upper()}] {f.title}: {f.description}"
for f in review.findings[:5] # Limit to top 5
)
if len(review.findings) > 5:
findings_str += f"\n ... and {len(review.findings) - 5} more findings"

sections.append(f"""### {review.domain.upper()} Review
Score: {review.overall_score:.2f}
Summary: {review.summary}
Findings:
{findings_str}
""")

return "\n".join(sections)

def _format_rankings(
self,
rankings: Dict[str, 'AggregateRanking']
) -> str:
"""Format rankings for chairman prompt."""
sorted_rankings = sorted(
rankings.values(),
key=lambda r: r.average_position
)

lines = []
for rank in sorted_rankings:
lines.append(
f"- {rank.reviewer_id}: avg position {rank.average_position:.2f}, "
f"confidence {rank.confidence:.2f}"
)

return "\n".join(lines)

def _interpret_consensus(self, level: float) -> str:
"""Interpret consensus level for chairman."""
if level >= 0.7:
return "High agreement - reviewers largely concur on quality assessment"
elif level >= 0.5:
return "Moderate agreement - some divergence warrants attention"
else:
return "Low agreement - significant divergence may require human arbitration"

def parse_verdict(self, response: str) -> ChairmanVerdict:
"""Parse chairman response into structured verdict."""
import json

try:
data = json.loads(response)
return ChairmanVerdict(
decision=MergeDecision(data['decision']),
aggregate_score=data['aggregate_score'],
synthesis=data['synthesis'],
key_findings=data.get('key_findings', []),
recommendations=data.get('recommendations', []),
dissenting_opinions=data.get('dissenting_opinions', []),
confidence=data.get('confidence', 0.5),
rationale=data.get('rationale', '')
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
# Fallback: Conservative rejection on parse failure
return ChairmanVerdict(
decision=MergeDecision.REQUEST_CHANGES,
aggregate_score=0.0,
synthesis=f"Chairman verdict parse error: {e}",
key_findings=["Unable to parse chairman synthesis"],
recommendations=["Manual review required"],
dissenting_opinions=[],
confidence=0.0,
rationale="Parse failure - defaulting to request changes"
)

def apply_thresholds(
self,
verdict: ChairmanVerdict,
reviews: Dict[str, 'ReviewResult']
) -> ChairmanVerdict:
"""
Apply hard thresholds regardless of chairman's judgment.
Ensures compliance requirements are met.
"""

# Count findings by severity across all reviews
critical_count = 0
high_count = 0

for review in reviews.values():
for finding in review.findings:
if finding.severity.value == 'critical':
critical_count += 1
elif finding.severity.value == 'high':
high_count += 1

# Override chairman if thresholds violated
if critical_count >= self.thresholds['critical_findings_reject']:
return ChairmanVerdict(
decision=MergeDecision.REJECT,
aggregate_score=verdict.aggregate_score,
synthesis=verdict.synthesis,
key_findings=verdict.key_findings,
recommendations=verdict.recommendations,
dissenting_opinions=verdict.dissenting_opinions,
confidence=1.0, # Threshold-based decisions are certain
rationale=f"OVERRIDE: {critical_count} critical findings exceed threshold"
)

if high_count > self.thresholds['high_findings_request_changes']:
if verdict.decision == MergeDecision.APPROVE:
return ChairmanVerdict(
decision=MergeDecision.REQUEST_CHANGES,
aggregate_score=verdict.aggregate_score,
synthesis=verdict.synthesis,
key_findings=verdict.key_findings,
recommendations=verdict.recommendations,
dissenting_opinions=verdict.dissenting_opinions,
confidence=1.0,
rationale=f"OVERRIDE: {high_count} high findings exceed threshold"
)

return verdict

Integration Example

# Example: Running a full council review

async def run_council_review(artifact: CodeArtifact, context: Dict) -> CouncilVerdict:
"""Execute full council review workflow."""

# Initialize reviewers
reviewers = [
SecurityReviewer(model="anthropic/claude-sonnet-4.5"),
ComplianceReviewer(model="anthropic/claude-sonnet-4.5", frameworks=['HIPAA']),
PerformanceReviewer(model="openai/gpt-4o"),
]

# Initialize council
council = ReviewerCouncil(
reviewers=[r for r in reviewers],
chairman_model="anthropic/claude-sonnet-4.5",
checkpoint_store=FoundationDBCheckpointStore(),
compliance_mode=True
)

# Execute review
verdict = await council.review_artifact(artifact, context)

# Log for audit
await audit_logger.log_council_verdict(
artifact_hash=artifact.compute_hash(),
verdict=verdict,
timestamp=datetime.utcnow()
)

return verdict

Next Steps

  1. Implement circuit breakers for individual reviewer failures
  2. Add PII redaction before dispatching to external models
  3. Build FoundationDB schema for checkpoint persistence
  4. Create compliance report generator from council verdicts
  5. Implement electronic signature integration for FDA 21 CFR Part 11