Skip to main content

#!/usr/bin/env python3 """ Deep Analysis Agents for MoE Escalation Handling

Second-tier specialists invoked when first-tier classification is uncertain. These agents perform more expensive but more accurate analysis.

Architecture: Escalation → DeepAnalysisOrchestrator → [DeepAnalysts] → FinalDecision

Deep Analysts: 1. SemanticSimilarityAnalyst - Compare against known exemplars using embeddings 2. ContextualAnalyst - Analyze directory context, sibling files, naming patterns 3. ContentReasoningAnalyst - Deep content analysis with structured reasoning 4. CrossReferenceAnalyst - Check references, links, and relationships """

from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Set from enum import Enum from pathlib import Path import re import json from datetime import datetime

from .models import Document, AnalystVote, ClassificationResult, ApprovalType

class DeepAnalystType(Enum): """Types of deep analysis agents.""" SEMANTIC_SIMILARITY = "semantic_similarity" CONTEXTUAL = "contextual" CONTENT_REASONING = "content_reasoning" CROSS_REFERENCE = "cross_reference"

@dataclass class DeepAnalysisVote: """Vote from a deep analysis agent.""" analyst_type: DeepAnalystType classification: str confidence: float reasoning: str evidence: List[str] = field(default_factory=list) processing_time_ms: float = 0.0

@dataclass class DeepAnalysisResult: """Result from the deep analysis pipeline.""" original_classification: str original_confidence: float final_classification: str final_confidence: float deep_votes: List[DeepAnalysisVote] consensus_reached: bool requires_human_review: bool reasoning: str

Known exemplar patterns for each document type

EXEMPLAR_PATTERNS = { "agent": { "title_patterns": [ r"specialist", r"expert", r"agent", r"orchestrator", r"coordinator", r"analyzer", r"reviewer", r"architect", r"developer", r"engineer" ], "content_patterns": [ # Frontmatter patterns (very strong signals) r"component_type:\sagent", r"type:\sagent", r"agent_type:\s*\w+", r"invocation_pattern:", r"moe_role:", # Content patterns r"system prompt", r"capabilities", r"when to use", r"agent.description", r"specialized.agent", r"ai agent", r"agent type", r"subagent", r"you are a\s+\w+", r"you are an\s+\w+", r"your job is to", r"your role is", r"## core responsibilities", r"## technical expertise", r"/agent\s+\w+" ], "structure_patterns": [ r"## (capabilities|when to use|system prompt|core responsibilities|technical expertise)", r"---\s\n.component_type:\sagent", r"---\s\n.type:\sagent", r"^you are a[n]?\s+\w+.specialist", r"^you are a[n]?\s+\w+.expert" ] }, "command": { "title_patterns": [ r"command", r"cli", r"usage", r"invoke" ], "content_patterns": [ # Frontmatter patterns (very strong signals) r"component_type:\scommand", r"type:\scommand", r"invocation:\s*/\w+", r"command_name:\s*/\w+", # Content patterns r"slash command", r"/\w+\s+", r"command.execute", r"arguments?:", r"options?:", r"flags?:", r"## system prompt", r"## usage", r"execution directive" ], "structure_patterns": [ r"## (usage|arguments|options|examples|system prompt)", r"---\s\n.component_type:\scommand", r"---\s*\n.type:\scommand", r"---\s*\n.invocation:\s/\w+" ] }, "skill": { "title_patterns": [ r"skill", r"pattern", r"capability" ], "content_patterns": [ r"skill.definition", r"reusable.pattern", r"when to apply", r"skill.md" ], "structure_patterns": [ r"## (when to apply|pattern|implementation)", r"---\s\n.type:\sskill" ] }, "adr": { "title_patterns": [ r"adr[-_]?\d+", r"architecture.decision", r"decision.record" ], "content_patterns": [ r"## (context|decision|status|consequences)", r"we will|we decided|the decision", r"accepted|proposed|deprecated|superseded" ], "structure_patterns": [ r"## (context|decision|consequences|alternatives)", r"status:\s(accepted|proposed|deprecated)" ] }, "guide": { "title_patterns": [ r"guide", r"tutorial", r"how[- ]to", r"getting[- ]started", r"cookbook", r"troubleshooting", r"best[- ]practices" ], "content_patterns": [ # Frontmatter patterns (very strong signals) r"component_type:\sguide", r"type:\sguide", r"doc_type:\sguide", # Content patterns r"step \d+", r"follow.steps", r"this guide", r"you will learn", r"prerequisites", r"when to read", r"reading_time:", r"quick solutions", r"common issues", r"best practices", r"cookbook", r"recipe" ], "structure_patterns": [ r"## (prerequisites|step|getting started|overview|quick start|troubleshooting)", r"---\s\n.component_type:\sguide", r"---\s\n.type:\sguide", r"---\s*\n.doc_type:\sguide" ] }, "workflow": { "title_patterns": [ r"workflow", r"process", r"pipeline", r"automation" ], "content_patterns": [ r"phase \d+", r"step \d+", r"workflow.steps", r"automation", r"orchestrat" ], "structure_patterns": [ r"## (phase|step|workflow|process)", r"---\s\n.type:\sworkflow" ] }, "reference": { "title_patterns": [ r"reference", r"api", r"specification", r"spec", r"overview", r"index" ], "content_patterns": [ # Frontmatter patterns (very strong signals) r"component_type:\sreference", r"type:\sreference", # Content patterns r"api.reference", r"specification", r"parameters?:", r"returns?:", r"interface", r"complete inventory", r"table of contents", r"quick index", r"quick navigation", r"## how it works", r"system design" ], "structure_patterns": [ r"## (api|parameters|methods|properties|table of contents|quick index)", r"---\s\n.component_type:\sreference", r"---\s*\n.type:\sreference", r"|.|.|" # Tables (still useful) ] } }

Directory context hints

DIRECTORY_HINTS = { "agents": "agent", "commands": "command", "skills": "skill", "adrs": "adr", "guides": "guide", "workflows": "workflow", "reference": "reference", "docs": None, # Ambiguous "internal": None, }

class SemanticSimilarityAnalyst: """ Analyzes document similarity to known exemplars. Uses pattern matching as a lightweight embedding proxy. """

def __init__(self):
self.analyst_type = DeepAnalystType.SEMANTIC_SIMILARITY

def analyze(self, doc: Document, original_votes: List[AnalystVote]) -> DeepAnalysisVote:
"""Compare document against known exemplar patterns."""
import time
start = time.time()

scores: Dict[str, float] = {}
evidence: List[str] = []

content_lower = doc.content.lower()
title_lower = (doc.frontmatter.get("title", "") or doc.path.stem).lower()

for doc_type, patterns in EXEMPLAR_PATTERNS.items():
score = 0.0
type_evidence = []

# Title pattern matching (weight: 0.3)
for pattern in patterns["title_patterns"]:
if re.search(pattern, title_lower):
score += 0.3 / len(patterns["title_patterns"])
type_evidence.append(f"Title matches '{pattern}'")

# Content pattern matching (weight: 0.4)
for pattern in patterns["content_patterns"]:
matches = len(re.findall(pattern, content_lower, re.IGNORECASE))
if matches > 0:
score += min(0.4, 0.1 * matches)
type_evidence.append(f"Content has {matches} matches for '{pattern}'")

# Structure pattern matching (weight: 0.3)
for pattern in patterns["structure_patterns"]:
if re.search(pattern, doc.content, re.IGNORECASE | re.MULTILINE):
score += 0.3 / len(patterns["structure_patterns"])
type_evidence.append(f"Structure matches '{pattern}'")

scores[doc_type] = min(1.0, score)
if score > 0.3:
evidence.extend(type_evidence[:2]) # Top 2 evidence per type

# Get best match
best_type = max(scores, key=scores.get)
best_score = scores[best_type]

# Boost if multiple signals agree
if best_score > 0.5:
evidence.append(f"Strong exemplar match: {best_score:.0%}")

processing_time = (time.time() - start) * 1000

return DeepAnalysisVote(
analyst_type=self.analyst_type,
classification=best_type,
confidence=best_score,
reasoning=f"Exemplar similarity analysis: best match is '{best_type}' with {best_score:.0%} confidence",
evidence=evidence[:5],
processing_time_ms=processing_time
)

class ContextualAnalyst: """ Analyzes document context including directory structure, sibling files, and naming conventions. """

def __init__(self):
self.analyst_type = DeepAnalystType.CONTEXTUAL

def analyze(self, doc: Document, original_votes: List[AnalystVote]) -> DeepAnalysisVote:
"""Analyze document context for classification hints."""
import time
start = time.time()

evidence: List[str] = []
scores: Dict[str, float] = {}

path = Path(doc.path) if isinstance(doc.path, str) else doc.path

# Analyze directory path
path_parts = [p.lower() for p in path.parts]

for dir_name, doc_type in DIRECTORY_HINTS.items():
if doc_type and dir_name in path_parts:
# Strong weight for directory hints (0.6 for explicit directories)
scores[doc_type] = scores.get(doc_type, 0) + 0.6
evidence.append(f"In '{dir_name}/' directory (strong signal)")

# Analyze filename
filename = path.stem.lower()

if re.match(r"adr[-_]?\d+", filename):
scores["adr"] = scores.get("adr", 0) + 0.4
evidence.append("Filename matches ADR pattern")

if "guide" in filename or "how-to" in filename:
scores["guide"] = scores.get("guide", 0) + 0.3
evidence.append("Filename contains 'guide'")

if "workflow" in filename:
scores["workflow"] = scores.get("workflow", 0) + 0.3
evidence.append("Filename contains 'workflow'")

if filename == "skill" or filename.endswith("-skill"):
scores["skill"] = scores.get("skill", 0) + 0.4
evidence.append("Filename indicates skill")

if filename == "readme":
# READMEs can be various types depending on context
for part in path_parts:
if part in DIRECTORY_HINTS and DIRECTORY_HINTS[part]:
scores[DIRECTORY_HINTS[part]] = scores.get(DIRECTORY_HINTS[part], 0) + 0.2

# Check sibling files for context
try:
parent = path.parent
if parent.exists():
siblings = [f.name.lower() for f in parent.iterdir() if f.is_file()]

# If siblings are all one type, likely same type
if any("skill.md" in s for s in siblings):
scores["skill"] = scores.get("skill", 0) + 0.2
evidence.append("Sibling SKILL.md found")

if any(s.startswith("adr-") for s in siblings):
scores["adr"] = scores.get("adr", 0) + 0.2
evidence.append("Sibling ADRs found")
except Exception:
pass

# Get best match
if scores:
best_type = max(scores, key=scores.get)
best_score = min(1.0, scores[best_type])
else:
# Fallback to original votes
vote_counts = {}
for vote in original_votes:
vote_counts[vote.classification] = vote_counts.get(vote.classification, 0) + 1
best_type = max(vote_counts, key=vote_counts.get) if vote_counts else "reference"
best_score = 0.3
evidence.append("No strong contextual signals, using vote consensus")

processing_time = (time.time() - start) * 1000

return DeepAnalysisVote(
analyst_type=self.analyst_type,
classification=best_type,
confidence=best_score,
reasoning=f"Contextual analysis: directory/naming suggests '{best_type}'",
evidence=evidence[:5],
processing_time_ms=processing_time
)

class ContentReasoningAnalyst: """ Performs deep content analysis with structured reasoning. Analyzes document structure, sections, and content patterns. """

def __init__(self):
self.analyst_type = DeepAnalystType.CONTENT_REASONING

def analyze(self, doc: Document, original_votes: List[AnalystVote]) -> DeepAnalysisVote:
"""Deep content analysis with structured reasoning."""
import time
start = time.time()

evidence: List[str] = []
reasoning_steps: List[str] = []
scores: Dict[str, float] = {}

content = doc.content

# Analyze document structure
headers = re.findall(r"^(#{1,6})\s+(.+)$", content, re.MULTILINE)
header_texts = [h[1].lower() for h in headers]

# Check for ADR structure
adr_headers = {"context", "decision", "status", "consequences", "alternatives"}
adr_matches = len(set(header_texts) & adr_headers)
if adr_matches >= 2:
scores["adr"] = scores.get("adr", 0) + 0.3 * adr_matches
evidence.append(f"Has {adr_matches} ADR-style headers")
reasoning_steps.append(f"Document has ADR-style headers: {set(header_texts) & adr_headers}")

# Check for guide-specific content (enhanced detection)
# Frontmatter detection (very strong signal)
if "component_type: guide" in content.lower() or "type: guide" in content.lower():
scores["guide"] = scores.get("guide", 0) + 0.6
evidence.append("Frontmatter declares guide type")
reasoning_steps.append("Document frontmatter explicitly defines guide type")

if "doc_type: guide" in content.lower():
scores["guide"] = scores.get("guide", 0) + 0.3
evidence.append("Has doc_type: guide in frontmatter")

# Check for guide structure
if any("step" in h or "getting started" in h or "troubleshooting" in h for h in header_texts):
scores["guide"] = scores.get("guide", 0) + 0.3
evidence.append("Has step-by-step/troubleshooting structure")
reasoning_steps.append("Document follows tutorial/guide structure")

# Check for guide-specific content patterns
guide_indicators = ["cookbook", "recipe", "troubleshooting", "best practices", "quick solutions", "common issues"]
guide_count = sum(1 for ind in guide_indicators if ind in content.lower())
if guide_count >= 1:
scores["guide"] = scores.get("guide", 0) + 0.2 * guide_count
evidence.append(f"Has {guide_count} guide-style indicators")

# Check for agent-specific content (enhanced detection)
agent_indicators = [
"system prompt", "capabilities", "when to use this agent",
"specialized agent", "ai agent", "your job is to",
"core responsibilities", "technical expertise"
]
agent_count = sum(1 for ind in agent_indicators if ind in content.lower())

# Check frontmatter for agent type (very strong signal)
if "component_type: agent" in content.lower() or "type: agent" in content.lower():
scores["agent"] = scores.get("agent", 0) + 0.6
evidence.append("Frontmatter declares agent type")
reasoning_steps.append("Document frontmatter explicitly defines agent type")

# Check for "You are a..." persona pattern (strong agent signal)
if re.search(r"^you are a[n]?\s+\w+", content, re.IGNORECASE | re.MULTILINE):
scores["agent"] = scores.get("agent", 0) + 0.4
evidence.append("Has 'You are a...' agent persona")
reasoning_steps.append("Document uses agent persona pattern")

# Check for agent headers
agent_headers = {"core responsibilities", "technical expertise", "capabilities"}
agent_header_matches = len(set(header_texts) & agent_headers)
if agent_header_matches >= 1:
scores["agent"] = scores.get("agent", 0) + 0.3 * agent_header_matches
evidence.append(f"Has {agent_header_matches} agent-style headers")

if agent_count >= 2:
scores["agent"] = scores.get("agent", 0) + 0.2 * agent_count
evidence.append(f"Has {agent_count} agent indicators")
reasoning_steps.append("Document describes an AI agent")

# Check for command-specific content (enhanced detection)
# Frontmatter detection (very strong signal)
if "component_type: command" in content.lower() or "type: command" in content.lower():
scores["command"] = scores.get("command", 0) + 0.6
evidence.append("Frontmatter declares command type")
reasoning_steps.append("Document frontmatter explicitly defines command type")

if re.search(r"invocation:\s*/\w+", content, re.IGNORECASE):
scores["command"] = scores.get("command", 0) + 0.4
evidence.append("Has invocation pattern in frontmatter")

if re.search(r"```\s*\n\s*/\w+", content) or "slash command" in content.lower():
scores["command"] = scores.get("command", 0) + 0.3
evidence.append("Contains slash command examples")
reasoning_steps.append("Document defines a slash command")

# Check for workflow structure
phase_matches = len(re.findall(r"phase\s*\d+|step\s*\d+", content, re.IGNORECASE))
if phase_matches >= 3:
scores["workflow"] = scores.get("workflow", 0) + 0.3
evidence.append(f"Has {phase_matches} phase/step sections")
reasoning_steps.append("Document describes a multi-step workflow")

# Check for reference-specific content (enhanced detection)
# Frontmatter detection (very strong signal)
if "component_type: reference" in content.lower() or "type: reference" in content.lower():
scores["reference"] = scores.get("reference", 0) + 0.6
evidence.append("Frontmatter declares reference type")
reasoning_steps.append("Document frontmatter explicitly defines reference type")

# Check for reference structure indicators
ref_indicators = ["table of contents", "quick index", "quick navigation", "complete inventory"]
ref_count = sum(1 for ind in ref_indicators if ind in content.lower())
if ref_count >= 1:
scores["reference"] = scores.get("reference", 0) + 0.3 * ref_count
evidence.append(f"Has {ref_count} reference structure indicators")

# Check for reference/API structure (tables)
if re.search(r"\|.*\|.*\|", content): # Tables
table_count = len(re.findall(r"\|.*\|", content))
if table_count > 5:
scores["reference"] = scores.get("reference", 0) + 0.2
evidence.append(f"Has {table_count} table rows")
reasoning_steps.append("Document has reference-style tables")

# Analyze code block density
code_blocks = len(re.findall(r"```", content)) // 2
if code_blocks > 5:
scores["guide"] = scores.get("guide", 0) + 0.2
evidence.append(f"Has {code_blocks} code examples")

# Get best match with reasoning
if scores:
best_type = max(scores, key=scores.get)
best_score = min(1.0, scores[best_type])
else:
best_type = "reference" # Default fallback
best_score = 0.25
reasoning_steps.append("No strong content signals, defaulting to reference")

processing_time = (time.time() - start) * 1000

return DeepAnalysisVote(
analyst_type=self.analyst_type,
classification=best_type,
confidence=best_score,
reasoning=f"Content reasoning: {'; '.join(reasoning_steps[:3])}",
evidence=evidence[:5],
processing_time_ms=processing_time
)

class CrossReferenceAnalyst: """ Analyzes cross-references, links, and relationships to other documents. """

def __init__(self):
self.analyst_type = DeepAnalystType.CROSS_REFERENCE

def analyze(self, doc: Document, original_votes: List[AnalystVote]) -> DeepAnalysisVote:
"""Analyze document references and relationships."""
import time
start = time.time()

evidence: List[str] = []
scores: Dict[str, float] = {}
content = doc.content

# Find markdown links
links = re.findall(r"\[([^\]]+)\]\(([^)]+)\)", content)

# Analyze link targets
for link_text, link_target in links:
link_lower = link_target.lower()

if "/agents/" in link_lower or "agent.md" in link_lower:
scores["agent"] = scores.get("agent", 0) + 0.1
elif "/commands/" in link_lower:
scores["command"] = scores.get("command", 0) + 0.1
elif "/adrs/" in link_lower or "adr-" in link_lower:
scores["adr"] = scores.get("adr", 0) + 0.1
elif "/guides/" in link_lower or "guide" in link_lower:
scores["guide"] = scores.get("guide", 0) + 0.1
elif "/workflows/" in link_lower:
scores["workflow"] = scores.get("workflow", 0) + 0.1

if links:
evidence.append(f"Has {len(links)} cross-references")

# Check for related document mentions
related_patterns = {
"agent": [r"see also.*agent", r"related agents?"],
"adr": [r"see adr-\d+", r"supersedes adr-\d+", r"related.*decisions?"],
"guide": [r"see.*guide", r"refer to.*tutorial"],
"workflow": [r"see.*workflow", r"related.*process"]
}

for doc_type, patterns in related_patterns.items():
for pattern in patterns:
if re.search(pattern, content, re.IGNORECASE):
scores[doc_type] = scores.get(doc_type, 0) + 0.15
evidence.append(f"References related {doc_type}s")
break

# Analyze frontmatter references
if doc.frontmatter:
if "related" in doc.frontmatter or "see_also" in doc.frontmatter:
evidence.append("Has related documents in frontmatter")

# Get best match
if scores:
best_type = max(scores, key=scores.get)
best_score = min(0.8, scores[best_type]) # Cap at 0.8 for cross-ref
else:
# Use original vote consensus
vote_counts = {}
for vote in original_votes:
vote_counts[vote.classification] = vote_counts.get(vote.classification, 0) + vote.confidence
best_type = max(vote_counts, key=vote_counts.get) if vote_counts else "reference"
best_score = 0.3
evidence.append("No cross-reference signals")

processing_time = (time.time() - start) * 1000

return DeepAnalysisVote(
analyst_type=self.analyst_type,
classification=best_type,
confidence=best_score,
reasoning=f"Cross-reference analysis: document relationship patterns suggest '{best_type}'",
evidence=evidence[:5],
processing_time_ms=processing_time
)

class DeepAnalysisOrchestrator: """ Orchestrates deep analysis for escalated documents. Runs multiple deep analysts and combines their results. """

def __init__(self, confidence_threshold: float = 0.65):
self.analysts = [
SemanticSimilarityAnalyst(),
ContextualAnalyst(),
ContentReasoningAnalyst(),
CrossReferenceAnalyst(),
]
self.confidence_threshold = confidence_threshold

def analyze_escalation(
self,
doc: Document,
original_result: ClassificationResult
) -> DeepAnalysisResult:
"""
Perform deep analysis on an escalated document.

Args:
doc: The document to analyze
original_result: The original classification result that was escalated

Returns:
DeepAnalysisResult with final classification
"""
# Run all deep analysts
deep_votes: List[DeepAnalysisVote] = []

# Extract original votes from the ConsensusResult within ClassificationResult
original_votes = original_result.result.votes if original_result.result else []

for analyst in self.analysts:
try:
vote = analyst.analyze(doc, original_votes)
deep_votes.append(vote)
except Exception as e:
# Log error but continue with other analysts
print(f"Warning: {analyst.analyst_type.value} failed: {e}")

# Calculate weighted consensus
classification_scores: Dict[str, float] = {}
total_weight = 0.0

# Weight by analyst confidence
for vote in deep_votes:
weight = vote.confidence
classification_scores[vote.classification] = (
classification_scores.get(vote.classification, 0) + weight
)
total_weight += weight

# Normalize scores
if total_weight > 0:
for cls in classification_scores:
classification_scores[cls] /= total_weight

# Get final classification
if classification_scores:
final_classification = max(classification_scores, key=classification_scores.get)
final_confidence = classification_scores[final_classification]
else:
# Fall back to original classification (access through .result)
final_classification = original_result.result.classification if original_result.result else "reference"
final_confidence = original_result.result.confidence if original_result.result else 0.3

# Determine if consensus was reached
vote_agreement = sum(1 for v in deep_votes if v.classification == final_classification)
consensus_reached = (
vote_agreement >= len(deep_votes) * 0.6 and # 60%+ agreement
final_confidence >= self.confidence_threshold
)

# Determine if human review is still needed
original_confidence = original_result.result.confidence if original_result.result else 0.0
requires_human_review = (
not consensus_reached or
final_confidence < 0.5 or
(original_confidence < 0.3 and final_confidence < 0.6)
)

# Build reasoning
reasoning_parts = []
if consensus_reached:
reasoning_parts.append(f"Deep analysis consensus: {final_classification} ({final_confidence:.0%})")
else:
reasoning_parts.append(f"Deep analysis inconclusive: best guess is {final_classification} ({final_confidence:.0%})")

reasoning_parts.append(f"Analyst agreement: {vote_agreement}/{len(deep_votes)}")

if requires_human_review:
reasoning_parts.append("Recommend human review for final confirmation")

# Get original classification for result
original_classification = original_result.result.classification if original_result.result else "unknown"

return DeepAnalysisResult(
original_classification=original_classification,
original_confidence=original_confidence,
final_classification=final_classification,
final_confidence=final_confidence,
deep_votes=deep_votes,
consensus_reached=consensus_reached,
requires_human_review=requires_human_review,
reasoning="; ".join(reasoning_parts)
)

def should_escalate_to_human(self, result: DeepAnalysisResult) -> bool:
"""Determine if document needs human review after deep analysis."""
return result.requires_human_review

def get_analysis_summary(self, result: DeepAnalysisResult) -> str:
"""Get a human-readable summary of deep analysis."""
lines = [
f"Original: {result.original_classification} ({result.original_confidence:.0%})",
f"Final: {result.final_classification} ({result.final_confidence:.0%})",
f"Consensus: {'Yes' if result.consensus_reached else 'No'}",
f"Human Review: {'Required' if result.requires_human_review else 'Not needed'}",
"",
"Deep Analyst Votes:"
]

for vote in result.deep_votes:
lines.append(f" {vote.analyst_type.value}: {vote.classification} ({vote.confidence:.0%})")
if vote.evidence:
for ev in vote.evidence[:2]:
lines.append(f" - {ev}")

lines.append("")
lines.append(f"Reasoning: {result.reasoning}")

return "\n".join(lines)