Skip to main content

scripts-semantic

""" Semantic Analyst Agent

Analyzes document classification based on:

  • Natural language understanding of content
  • Intent classification
  • Topic extraction
  • Semantic similarity to document type descriptions

Note: This implementation uses keyword/phrase analysis. In production, can be upgraded to use LLM API (Claude/GPT). """

import re from pathlib import Path import time from typing import Dict, List, Tuple

import sys sys.path.insert(0, str(Path(file).parent.parent))

from core.models import Document, AnalystVote from analysts.base import BaseAnalyst

class SemanticAnalyst(BaseAnalyst): """Analyst that classifies based on semantic understanding of content."""

name = "semantic"

# Intent patterns - phrases that indicate document purpose
INTENT_PATTERNS: Dict[str, List[Tuple[str, float]]] = {
'agent': [
(r'this agent\b', 0.30),
(r'specialist (agent|in)', 0.25),
(r'autonomous(ly)?\b', 0.15),
(r'(execute|perform)s? tasks?', 0.15),
(r'ai.{0,10}(agent|assistant)', 0.25),
(r'subagent_type', 0.30),
(r'(invoke|spawn|launch).{0,20}agent', 0.20),
],
'command': [
(r'this command\b', 0.30),
(r'invoke (this|the) command', 0.25),
(r'run(ning)? /\w+', 0.25),
(r'slash command', 0.30),
(r'command.line (interface|tool)', 0.20),
(r'usage:\s*/\w+', 0.25),
],
'skill': [
(r'this skill\b', 0.30),
(r'reusable pattern', 0.25),
(r'use this (skill|pattern)', 0.20),
(r'skill (for|to)\b', 0.20),
(r'when (to )?use this', 0.15),
],
'guide': [
(r'this guide (will|explains?)', 0.30),
(r'step.by.step', 0.25),
(r'follow (these|the) steps', 0.25),
(r'(learn|understand) how to', 0.25),
(r'in this tutorial', 0.30),
(r'getting started with', 0.25),
(r'(we\'ll|you\'ll) (learn|walk through)', 0.20),
],
'adr': [
(r'we (decided|will|chose)', 0.25),
(r'(this|the) decision', 0.20),
(r'architecture decision', 0.35),
(r'context:.{0,50}decision', 0.30),
(r'(accept|reject|supersede)ed', 0.20),
(r'consequences of (this|the)', 0.20),
],
'workflow': [
(r'this workflow', 0.30),
(r'(execute|run) (this|the) (workflow|pipeline)', 0.25),
(r'step \d+ of \d+', 0.20),
(r'(process|pipeline) (for|to)\b', 0.20),
(r'automation (workflow|process)', 0.25),
],
'reference': [
(r'(api|technical) reference', 0.30),
(r'specification (for|of)', 0.25),
(r'(this|the) schema', 0.20),
(r'documentation (for|of)', 0.20),
(r'(complete|full) (list|reference)', 0.20),
],
'config': [
(r'configuration (options?|settings?)', 0.30),
(r'(set|configure) (this|the)', 0.25),
(r'(available|supported) options', 0.20),
(r'(environment|config) variable', 0.25),
],
'script': [
(r'this script\b', 0.30),
(r'run(ning)? the script', 0.25),
(r'python.{0,20}script', 0.20),
(r'automation script', 0.25),
(r'(execute|run) (this|the) (file|program)', 0.20),
],
'hook': [
(r'this hook\b', 0.30),
(r'(trigger|fire)s? (when|on)', 0.25),
(r'(pre|post).commit hook', 0.30),
(r'event.driven', 0.20),
(r'hook (for|to)\b', 0.20),
],
}

# Topic extraction patterns
TOPIC_PATTERNS: Dict[str, List[str]] = {
'development': ['code', 'develop', 'implement', 'build', 'create'],
'testing': ['test', 'spec', 'coverage', 'assert', 'verify'],
'deployment': ['deploy', 'release', 'production', 'staging'],
'security': ['security', 'auth', 'encrypt', 'credential', 'permission'],
'documentation': ['document', 'guide', 'reference', 'readme'],
'architecture': ['architect', 'design', 'pattern', 'structure'],
'automation': ['automat', 'script', 'pipeline', 'workflow'],
}

def analyze(self, document: Document) -> AnalystVote:
"""Analyze document based on semantic understanding."""
start = time.time()

content = document.body or document.content
content_lower = content.lower()
scores: Dict[str, float] = {}
reasons: List[str] = []

# Analyze intent patterns
for doc_type, patterns in self.INTENT_PATTERNS.items():
type_score = 0.0
matched = 0

for pattern, weight in patterns:
matches = len(re.findall(pattern, content_lower))
if matches > 0:
type_score += weight * min(matches, 3) # Cap at 3 matches
matched += 1

if type_score > 0:
# Normalize and cap
normalized_score = min(0.95, type_score / 2) # Divide by 2 to normalize
scores[doc_type] = normalized_score
if matched > 0:
reasons.append(f"Intent patterns suggest {doc_type} ({matched} indicators)")

# Extract topics and apply boosts
topics = self._extract_topics(content_lower)

# Topic-based adjustments
if 'documentation' in topics:
for t in ['guide', 'reference']:
scores[t] = scores.get(t, 0.4) + 0.10

if 'architecture' in topics:
scores['adr'] = scores.get('adr', 0.4) + 0.15

if 'automation' in topics:
for t in ['workflow', 'script']:
scores[t] = scores.get(t, 0.4) + 0.10

# Analyze first paragraph for strong intent signals
first_para = self._get_first_paragraph(content)
if first_para:
para_analysis = self._analyze_first_paragraph(first_para)
for doc_type, boost in para_analysis.items():
scores[doc_type] = scores.get(doc_type, 0.5) + boost

# Determine best classification
if scores:
best_type = max(scores, key=scores.get)
confidence = min(0.95, scores[best_type])
reasoning = "; ".join(reasons[:3]) if reasons else "Semantic intent analysis"
else:
best_type = 'reference'
confidence = 0.50
reasoning = "No strong semantic indicators, defaulting to reference"

duration_ms = int((time.time() - start) * 1000)

return self._create_vote(
classification=best_type,
confidence=confidence,
reasoning=reasoning,
duration_ms=duration_ms,
metadata={
'topics': topics,
'all_scores': {k: round(v, 3) for k, v in scores.items()}
}
)

def _extract_topics(self, content: str) -> List[str]:
"""Extract main topics from content."""
topics = []
for topic, keywords in self.TOPIC_PATTERNS.items():
if any(kw in content for kw in keywords):
topics.append(topic)
return topics

def _get_first_paragraph(self, content: str) -> str:
"""Extract first meaningful paragraph."""
# Remove frontmatter if present
content = re.sub(r'^---\s*\n.*?\n---\s*\n', '', content, flags=re.DOTALL)

# Remove headers
content = re.sub(r'^#.*$', '', content, flags=re.MULTILINE)

# Get paragraphs
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]

for para in paragraphs[:3]:
if len(para) > 50 and not para.startswith('```'):
return para

return ""

def _analyze_first_paragraph(self, paragraph: str) -> Dict[str, float]:
"""Analyze first paragraph for type indicators."""
boosts: Dict[str, float] = {}
para_lower = paragraph.lower()

# Check for explicit document type statements
if 'this document' in para_lower or 'this guide' in para_lower:
if 'how to' in para_lower or 'step' in para_lower:
boosts['guide'] = 0.15
elif 'decision' in para_lower:
boosts['adr'] = 0.15
elif 'reference' in para_lower or 'specification' in para_lower:
boosts['reference'] = 0.15

if 'this agent' in para_lower:
boosts['agent'] = 0.20

if 'this command' in para_lower:
boosts['command'] = 0.20

return boosts