Skip to main content

scripts-content

""" Content Analyst Agent

Analyzes document classification based on:

  • Markdown heading structure
  • Section names and patterns
  • Code block presence and language
  • List structures and formatting
  • Content length and organization """

import re from pathlib import Path import time from typing import Dict, List, Tuple

import sys sys.path.insert(0, str(Path(file).parent.parent))

from core.models import Document, AnalystVote from analysts.base import BaseAnalyst

class ContentAnalyst(BaseAnalyst): """Analyst that classifies based on document content structure."""

name = "content"

# Section patterns that indicate document types
SECTION_PATTERNS: Dict[str, List[Tuple[str, float]]] = {
'adr': [
(r'##\s*Status', 0.30),
(r'##\s*Context', 0.25),
(r'##\s*Decision', 0.30),
(r'##\s*Consequences', 0.15),
(r'ADR-\d+', 0.20),
],
'agent': [
(r'##\s*Capabilities', 0.25),
(r'##\s*Role', 0.20),
(r'##\s*Responsibilities', 0.20),
(r'##\s*Tools', 0.15),
(r'##\s*Invocation', 0.15),
(r'subagent_type', 0.20),
],
'command': [
(r'##\s*Invocation', 0.25),
(r'##\s*Arguments', 0.20),
(r'##\s*Examples', 0.15),
(r'##\s*Usage', 0.15),
(r'/\w+', 0.10), # Slash command pattern
],
'skill': [
(r'##\s*When to Use', 0.25),
(r'##\s*Capabilities', 0.20),
(r'##\s*Pattern', 0.20),
(r'SKILL\.md', 0.15),
],
'guide': [
(r'##\s*Prerequisites', 0.15),
(r'##\s*Step\s*\d', 0.20),
(r'##\s*Getting Started', 0.20),
(r'##\s*How to', 0.15),
(r'##\s*Tutorial', 0.15),
(r'##\s*Quick Start', 0.15),
],
'workflow': [
(r'##\s*Steps', 0.20),
(r'##\s*Workflow', 0.25),
(r'##\s*Pipeline', 0.20),
(r'##\s*Phases?', 0.15),
(r'flowchart|sequenceDiagram', 0.15),
],
'reference': [
(r'##\s*API', 0.15),
(r'##\s*Reference', 0.20),
(r'##\s*Specification', 0.20),
(r'##\s*Schema', 0.15),
(r'##\s*Overview', 0.10),
],
'config': [
(r'##\s*Configuration', 0.25),
(r'##\s*Settings', 0.25),
(r'##\s*Options', 0.20),
(r'##\s*Parameters', 0.15),
],
'script': [
(r'```python', 0.20),
(r'```bash', 0.20),
(r'```sh', 0.20),
(r'##\s*Usage', 0.15),
(r'if __name__', 0.25),
],
'hook': [
(r'##\s*Trigger', 0.25),
(r'##\s*Events?', 0.20),
(r'pre-commit|post-commit', 0.25),
(r'##\s*Hook', 0.20),
],
}

# Content indicators
CONTENT_INDICATORS = {
'has_code_blocks': lambda c: '```' in c,
'has_mermaid': lambda c: '```mermaid' in c.lower(),
'has_tables': lambda c: bool(re.search(r'\|.*\|.*\|', c)),
'has_numbered_steps': lambda c: bool(re.search(r'^\d+\.\s+', c, re.MULTILINE)),
'has_checkboxes': lambda c: bool(re.search(r'\[[ x]\]', c, re.IGNORECASE)),
'has_frontmatter': lambda c: c.strip().startswith('---'),
}

def analyze(self, document: Document) -> AnalystVote:
"""Analyze document based on content structure."""
start = time.time()

content = document.body or document.content
scores: Dict[str, float] = {}
reasons: List[str] = []

# Check section patterns
for doc_type, patterns in self.SECTION_PATTERNS.items():
type_score = 0.0
matched_patterns = []

for pattern, weight in patterns:
if re.search(pattern, content, re.IGNORECASE | re.MULTILINE):
type_score += weight
matched_patterns.append(pattern)

if type_score > 0:
scores[doc_type] = min(0.98, type_score)
if matched_patterns:
reasons.append(f"{doc_type}: matched {len(matched_patterns)} patterns")

# Analyze heading structure
headings = self._extract_headings(content)
heading_analysis = self._analyze_headings(headings)

for doc_type, boost in heading_analysis.items():
scores[doc_type] = scores.get(doc_type, 0.5) + boost

# Check content indicators
indicators = {
name: check(content)
for name, check in self.CONTENT_INDICATORS.items()
}

# Apply indicator boosts
if indicators['has_mermaid']:
scores['workflow'] = scores.get('workflow', 0.5) + 0.15
scores['architecture'] = scores.get('reference', 0.5) + 0.10

if indicators['has_tables']:
scores['reference'] = scores.get('reference', 0.5) + 0.10

if indicators['has_numbered_steps']:
scores['guide'] = scores.get('guide', 0.5) + 0.15

if indicators['has_checkboxes']:
scores['workflow'] = scores.get('workflow', 0.5) + 0.10

# Determine best classification
if scores:
best_type = max(scores, key=scores.get)
confidence = min(0.98, scores[best_type])
reasoning = "; ".join(reasons[:3]) if reasons else f"Content structure analysis"
else:
best_type = 'reference'
confidence = 0.50
reasoning = "No strong content patterns, defaulting to reference"

duration_ms = int((time.time() - start) * 1000)

return self._create_vote(
classification=best_type,
confidence=confidence,
reasoning=reasoning,
duration_ms=duration_ms,
metadata={
'heading_count': len(headings),
'indicators': indicators,
'all_scores': {k: round(v, 3) for k, v in scores.items()}
}
)

def _extract_headings(self, content: str) -> List[Tuple[int, str]]:
"""Extract all markdown headings with their levels."""
headings = []
for match in re.finditer(r'^(#{1,6})\s+(.+)$', content, re.MULTILINE):
level = len(match.group(1))
text = match.group(2).strip()
headings.append((level, text))
return headings

def _analyze_headings(self, headings: List[Tuple[int, str]]) -> Dict[str, float]:
"""Analyze heading patterns for type indicators."""
boosts: Dict[str, float] = {}

h2_texts = [h[1].lower() for h in headings if h[0] == 2]

# ADR pattern: Status, Context, Decision
adr_sections = {'status', 'context', 'decision', 'consequences'}
adr_matches = sum(1 for h in h2_texts if any(s in h for s in adr_sections))
if adr_matches >= 2:
boosts['adr'] = 0.20 * adr_matches

# Guide pattern: Prerequisites, Steps, Getting Started
guide_sections = {'prerequisites', 'getting started', 'step', 'how to', 'tutorial'}
guide_matches = sum(1 for h in h2_texts if any(s in h for s in guide_sections))
if guide_matches >= 1:
boosts['guide'] = 0.15 * guide_matches

# Agent pattern: Role, Capabilities, Responsibilities
agent_sections = {'role', 'capabilities', 'responsibilities', 'tools'}
agent_matches = sum(1 for h in h2_texts if any(s in h for s in agent_sections))
if agent_matches >= 2:
boosts['agent'] = 0.20 * agent_matches

return boosts