Skip to main content

#!/usr/bin/env python3 """ Semantic Intent to Query Template Mapper (ADR-154 J.4.11.2)

Maps semantic command intents (CREATE, ANALYZE, DEPLOY, etc.) to appropriate context graph query templates for intelligent context hydration.

Usage: from scripts.context_graph.intent_template_mapper import IntentTemplateMapper

mapper = IntentTemplateMapper()
templates = mapper.map_intent("ANALYZE", agent_type="security-specialist")
context = mapper.build_context_for_intent(
"SECURITY_AUDIT",
agent_type="security-specialist",
user_message="Review the authentication code"
)

Author: CODITECT Team Version: 1.0.0 ADR: ADR-154 (Context Graph Query DSL and Agent Workflow) Task: J.4.11.2 """

import logging import re from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple

Import query template registry

try: from scripts.context_graph.query_templates import QueryTemplate, QueryTemplateRegistry except ImportError: # Handle relative import when run as script import sys sys.path.insert(0, str(Path(file).parent.parent.parent)) from scripts.context_graph.query_templates import QueryTemplate, QueryTemplateRegistry

Configure logging

logger = logging.getLogger(name)

class SemanticIntent(Enum): """ High-level semantic intents mapped to context requirements.

Extends the Cross-LLM Bridge CommandIntent with context-graph-aware categories.
"""
# Development intents
CREATE = "create"
ANALYZE = "analyze"
REFACTOR = "refactor"
DEBUG = "debug"
TEST = "test"
REVIEW = "review"

# Operations intents
DEPLOY = "deploy"
CONFIGURE = "configure"
SYNCHRONIZE = "synchronize"
MIGRATE = "migrate"

# Documentation intents
DOCUMENT = "document"
EXPLAIN = "explain"
SEARCH = "search"

# Security intents
SECURITY_AUDIT = "security_audit"
COMPLIANCE_CHECK = "compliance_check"
VULNERABILITY_SCAN = "vulnerability_scan"

# Architecture intents
ARCHITECTURE_REVIEW = "architecture_review"
DESIGN = "design"
PLAN = "plan"

# Data intents
DATA_ANALYSIS = "data_analysis"
QUERY = "query"
REPORT = "report"

# Workflow intents
WORKFLOW_START = "workflow_start"
WORKFLOW_RESUME = "workflow_resume"
WORKFLOW_COMPLETE = "workflow_complete"

# General
LEARN = "learn"
ORGANIZE = "organize"
AUDIT = "audit"
GENERATE = "generate"

@dataclass class IntentPattern: """Pattern for matching user messages to intents.""" intent: SemanticIntent patterns: List[str] # Regex patterns keywords: Set[str] # Simple keyword matches priority: int = 0 # Higher = more specific

def matches(self, text: str) -> Tuple[bool, float]:
"""
Check if text matches this intent pattern.

Returns:
Tuple of (matched, confidence)
"""
text_lower = text.lower()

# Check regex patterns
for pattern in self.patterns:
if re.search(pattern, text_lower, re.IGNORECASE):
return (True, 0.9)

# Check keywords
matched_keywords = sum(1 for kw in self.keywords if kw in text_lower)
if matched_keywords > 0:
confidence = min(0.5 + (matched_keywords * 0.15), 0.85)
return (True, confidence)

return (False, 0.0)

@dataclass class IntentMapping: """Mapping from intent to query templates.""" intent: SemanticIntent template_names: List[str] # Direct template names template_categories: List[str] # Categories to search template_tags: List[str] # Tags to search fallback_template: Optional[str] = None

def get_templates(self, registry: QueryTemplateRegistry) -> List[QueryTemplate]:
"""Get all matching templates from registry."""
templates = []
seen_names = set()

# Direct name matches (highest priority)
for name in self.template_names:
t = registry.get(name)
if t and t.name not in seen_names:
templates.append(t)
seen_names.add(t.name)

# Category matches
for category in self.template_categories:
for t in registry.find_by_category(category):
if t.name not in seen_names:
templates.append(t)
seen_names.add(t.name)

# Tag matches
if self.template_tags:
for t in registry.find_by_tags(self.template_tags):
if t.name not in seen_names:
templates.append(t)
seen_names.add(t.name)

# Fallback
if not templates and self.fallback_template:
t = registry.get(self.fallback_template)
if t:
templates.append(t)

return templates

class IntentTemplateMapper: """ Maps semantic intents to context graph query templates.

This class provides the bridge between the Cross-LLM Bridge's semantic
command processing and the Context Graph query system.
"""

def __init__(self, registry: Optional[QueryTemplateRegistry] = None):
self.registry = registry or QueryTemplateRegistry()
self._patterns = self._build_intent_patterns()
self._mappings = self._build_intent_mappings()

def _build_intent_patterns(self) -> List[IntentPattern]:
"""Build patterns for intent classification."""
return [
# Security-specific (high priority)
IntentPattern(
intent=SemanticIntent.SECURITY_AUDIT,
patterns=[
r"security\s*(audit|review|check)",
r"audit.*security",
r"check.*vulnerabilit",
r"pentest|penetration\s*test",
],
keywords={"security", "audit", "vulnerability", "owasp", "pentest"},
priority=10,
),
IntentPattern(
intent=SemanticIntent.COMPLIANCE_CHECK,
patterns=[
r"compliance\s*(check|audit|review)",
r"(hipaa|gdpr|soc2|pci)\s*(compliance|audit)",
r"regulatory\s*(check|audit)",
],
keywords={"compliance", "hipaa", "gdpr", "soc2", "regulatory"},
priority=10,
),
IntentPattern(
intent=SemanticIntent.VULNERABILITY_SCAN,
patterns=[
r"(scan|check)\s*(for\s*)?(vulnerabilit|cve|security\s*issue)",
r"find\s*vulnerabilit",
],
keywords={"vulnerability", "cve", "exploit", "scan"},
priority=10,
),

# Architecture (high priority)
IntentPattern(
intent=SemanticIntent.ARCHITECTURE_REVIEW,
patterns=[
r"(review|check|audit)\s*(the\s*)?(architecture|system\s*design)",
r"architecture\s*(review|audit)",
],
keywords={"architecture", "system design", "c4", "adr"},
priority=9,
),
IntentPattern(
intent=SemanticIntent.DESIGN,
patterns=[
r"design\s*(a|the|new)",
r"(create|draft)\s*.*\s*design",
r"architect\s*(a|the|new)",
],
keywords={"design", "architect", "schema", "pattern"},
priority=8,
),

# Development
IntentPattern(
intent=SemanticIntent.CREATE,
patterns=[
r"(create|build|make|implement|add)\s*(a|the|new)",
r"(write|code)\s*(a|the|new)",
],
keywords={"create", "build", "implement", "add", "new"},
priority=5,
),
IntentPattern(
intent=SemanticIntent.REFACTOR,
patterns=[
r"refactor",
r"clean\s*up\s*(the\s*)?(code|function|class)",
r"improve\s*(the\s*)?(code|structure)",
],
keywords={"refactor", "cleanup", "restructure", "improve code"},
priority=6,
),
IntentPattern(
intent=SemanticIntent.DEBUG,
patterns=[
r"debug",
r"fix\s*(the\s*)?(bug|error|issue|problem)",
r"(what|why)\s*(is|causes?)\s*(the\s*)?(error|bug|issue)",
r"investigate\s*(the\s*)?(error|bug|crash)",
],
keywords={"debug", "bug", "error", "fix", "crash", "investigate"},
priority=7,
),
IntentPattern(
intent=SemanticIntent.TEST,
patterns=[
r"(write|create|add)\s*(unit\s*)?(test|spec)",
r"test\s*(the|this)",
r"(run|execute)\s*test",
],
keywords={"test", "spec", "unittest", "pytest", "coverage"},
priority=6,
),
IntentPattern(
intent=SemanticIntent.REVIEW,
patterns=[
r"(code\s*)?review",
r"(check|look\s*at)\s*(the\s*)?(code|pr|pull\s*request)",
],
keywords={"review", "pr", "pull request", "code review"},
priority=6,
),

# Analysis
IntentPattern(
intent=SemanticIntent.ANALYZE,
patterns=[
r"analyze",
r"(what|how)\s*(does|is)",
r"understand\s*(the|this)",
r"explain\s*(how|what|why)",
],
keywords={"analyze", "understand", "examine", "investigate"},
priority=4,
),
IntentPattern(
intent=SemanticIntent.DATA_ANALYSIS,
patterns=[
r"analyze\s*(the\s*)?(data|metrics|logs)",
r"(data|metrics)\s*analysis",
],
keywords={"data analysis", "metrics", "statistics", "trends"},
priority=7,
),

# Operations
IntentPattern(
intent=SemanticIntent.DEPLOY,
patterns=[
r"deploy",
r"(push|release)\s*to\s*(prod|production|staging)",
r"(ship|rollout)",
],
keywords={"deploy", "release", "ship", "production", "staging"},
priority=7,
),
IntentPattern(
intent=SemanticIntent.MIGRATE,
patterns=[
r"migrat(e|ion)",
r"(move|transfer)\s*(to|from)",
r"upgrad(e|ing)\s*(to|from)",
],
keywords={"migrate", "migration", "upgrade", "transfer"},
priority=7,
),

# Documentation
IntentPattern(
intent=SemanticIntent.DOCUMENT,
patterns=[
r"(write|create|update)\s*(the\s*)?(doc|documentation)",
r"document\s*(the|this)",
],
keywords={"document", "documentation", "readme", "docs"},
priority=6,
),
IntentPattern(
intent=SemanticIntent.SEARCH,
patterns=[
r"(find|search|look\s*for|where\s*is)",
r"(show\s*me|list)\s*(all|the)",
],
keywords={"find", "search", "locate", "where"},
priority=3,
),

# Workflow
IntentPattern(
intent=SemanticIntent.WORKFLOW_RESUME,
patterns=[
r"(resume|continue)\s*(the\s*)?(workflow|task|work)",
r"pick\s*up\s*where",
],
keywords={"resume", "continue", "pick up"},
priority=8,
),
]

def _build_intent_mappings(self) -> Dict[SemanticIntent, IntentMapping]:
"""Build mappings from intents to templates."""
return {
# Security intents
SemanticIntent.SECURITY_AUDIT: IntentMapping(
intent=SemanticIntent.SECURITY_AUDIT,
template_names=["security-audit-context"],
template_categories=["agent-context"],
template_tags=["security", "audit"],
fallback_template="architecture-context",
),
SemanticIntent.COMPLIANCE_CHECK: IntentMapping(
intent=SemanticIntent.COMPLIANCE_CHECK,
template_names=["security-audit-context", "adr-compliance-context"],
template_categories=["agent-context"],
template_tags=["compliance", "audit"],
),
SemanticIntent.VULNERABILITY_SCAN: IntentMapping(
intent=SemanticIntent.VULNERABILITY_SCAN,
template_names=["security-audit-context"],
template_categories=["agent-context"],
template_tags=["security", "vulnerability"],
),

# Architecture intents
SemanticIntent.ARCHITECTURE_REVIEW: IntentMapping(
intent=SemanticIntent.ARCHITECTURE_REVIEW,
template_names=["architecture-context", "adr-compliance-context"],
template_categories=["agent-context"],
template_tags=["architecture", "adr", "design"],
),
SemanticIntent.DESIGN: IntentMapping(
intent=SemanticIntent.DESIGN,
template_names=["architecture-context", "api-design-context"],
template_categories=["agent-context"],
template_tags=["design", "architecture"],
),
SemanticIntent.PLAN: IntentMapping(
intent=SemanticIntent.PLAN,
template_names=["migration-planning-context"],
template_categories=["agent-context"],
template_tags=["planning", "strategy"],
),

# Development intents
SemanticIntent.CREATE: IntentMapping(
intent=SemanticIntent.CREATE,
template_names=[],
template_categories=["agent-context"],
template_tags=["development"],
fallback_template="architecture-context",
),
SemanticIntent.REFACTOR: IntentMapping(
intent=SemanticIntent.REFACTOR,
template_names=["code-review-context"],
template_categories=["agent-context"],
template_tags=["refactoring", "code-quality"],
),
SemanticIntent.DEBUG: IntentMapping(
intent=SemanticIntent.DEBUG,
template_names=["error-investigation-context"],
template_categories=["agent-context"],
template_tags=["debugging", "error"],
),
SemanticIntent.TEST: IntentMapping(
intent=SemanticIntent.TEST,
template_names=["testing-qa-context"],
template_categories=["agent-context"],
template_tags=["testing", "qa"],
),
SemanticIntent.REVIEW: IntentMapping(
intent=SemanticIntent.REVIEW,
template_names=["code-review-context"],
template_categories=["agent-context"],
template_tags=["review", "code-quality"],
),

# Analysis intents
SemanticIntent.ANALYZE: IntentMapping(
intent=SemanticIntent.ANALYZE,
template_names=["session-analysis-context"],
template_categories=["agent-context"],
template_tags=["analysis"],
fallback_template="architecture-context",
),
SemanticIntent.DATA_ANALYSIS: IntentMapping(
intent=SemanticIntent.DATA_ANALYSIS,
template_names=["token-economics-context"],
template_categories=["agent-context"],
template_tags=["data", "analytics"],
),

# Operations intents
SemanticIntent.DEPLOY: IntentMapping(
intent=SemanticIntent.DEPLOY,
template_names=["devops-deployment-context"],
template_categories=["agent-context"],
template_tags=["deployment", "devops"],
),
SemanticIntent.CONFIGURE: IntentMapping(
intent=SemanticIntent.CONFIGURE,
template_names=["devops-deployment-context"],
template_categories=["agent-context"],
template_tags=["configuration", "setup"],
),
SemanticIntent.MIGRATE: IntentMapping(
intent=SemanticIntent.MIGRATE,
template_names=["migration-planning-context"],
template_categories=["agent-context"],
template_tags=["migration"],
),

# Documentation intents
SemanticIntent.DOCUMENT: IntentMapping(
intent=SemanticIntent.DOCUMENT,
template_names=["documentation-context"],
template_categories=["agent-context"],
template_tags=["documentation"],
),
SemanticIntent.SEARCH: IntentMapping(
intent=SemanticIntent.SEARCH,
template_names=["component-search-context"],
template_categories=["agent-context"],
template_tags=["search"],
),

# Workflow intents
SemanticIntent.WORKFLOW_RESUME: IntentMapping(
intent=SemanticIntent.WORKFLOW_RESUME,
template_names=["workflow-resume-context"],
template_categories=["agent-context"],
template_tags=["workflow"],
),
}

def classify_intent(self, user_message: str) -> Tuple[SemanticIntent, float]:
"""
Classify user message into a semantic intent.

Args:
user_message: The user's input message

Returns:
Tuple of (intent, confidence)
"""
best_intent = SemanticIntent.ANALYZE # Default
best_confidence = 0.0
best_priority = -1

for pattern in sorted(self._patterns, key=lambda p: -p.priority):
matched, confidence = pattern.matches(user_message)
if matched:
# Prefer higher priority patterns, then higher confidence
if pattern.priority > best_priority or (
pattern.priority == best_priority and confidence > best_confidence
):
best_intent = pattern.intent
best_confidence = confidence
best_priority = pattern.priority

return (best_intent, best_confidence)

def map_intent(
self,
intent: SemanticIntent,
agent_type: Optional[str] = None,
track: Optional[str] = None,
) -> List[QueryTemplate]:
"""
Map a semantic intent to appropriate query templates.

Args:
intent: The classified semantic intent
agent_type: Optional agent type for filtering
track: Optional PILOT track for filtering

Returns:
List of matching QueryTemplates
"""
templates = []

# Get templates from intent mapping
mapping = self._mappings.get(intent)
if mapping:
templates = mapping.get_templates(self.registry)

# Filter by agent type if specified
if agent_type and templates:
agent_templates = [
t for t in templates
if not t.agent_types or agent_type in t.agent_types
]
if agent_templates:
templates = agent_templates

# Filter by track if specified
if track and templates:
track_templates = [
t for t in templates
if not t.track or t.track == track
]
if track_templates:
templates = track_templates

# Also include agent-specific templates
if agent_type:
for t in self.registry.find_for_agent(agent_type):
if t not in templates:
templates.append(t)

return templates

def get_templates_for_message(
self,
user_message: str,
agent_type: Optional[str] = None,
track: Optional[str] = None,
) -> Tuple[List[QueryTemplate], SemanticIntent, float]:
"""
Get appropriate templates for a user message.

This is the main entry point for the Cross-LLM Bridge integration.

Args:
user_message: The user's input message
agent_type: Optional agent type for filtering
track: Optional PILOT track for filtering

Returns:
Tuple of (templates, classified_intent, confidence)
"""
intent, confidence = self.classify_intent(user_message)
templates = self.map_intent(intent, agent_type, track)
return (templates, intent, confidence)

def list_intents(self) -> List[str]:
"""List all supported intents."""
return [i.value for i in SemanticIntent]

def list_mappings(self) -> Dict[str, List[str]]:
"""List all intent-to-template mappings."""
result = {}
for intent, mapping in self._mappings.items():
result[intent.value] = mapping.template_names
return result

Singleton instance for module-level access

_mapper_instance: Optional[IntentTemplateMapper] = None

def get_mapper() -> IntentTemplateMapper: """Get or create the singleton IntentTemplateMapper instance.""" global _mapper_instance if _mapper_instance is None: _mapper_instance = IntentTemplateMapper() return _mapper_instance

CLI interface for testing

if name == "main": import argparse

parser = argparse.ArgumentParser(description="Intent to Template Mapper")
parser.add_argument("--classify", "-c", help="Classify a user message")
parser.add_argument("--list-intents", "-i", action="store_true", help="List all intents")
parser.add_argument("--list-mappings", "-m", action="store_true", help="List all mappings")
parser.add_argument("--agent", "-a", help="Agent type filter")
parser.add_argument("--track", "-t", help="Track filter")
parser.add_argument("--json", "-j", action="store_true", help="Output as JSON")

args = parser.parse_args()

import json as json_module

mapper = IntentTemplateMapper()

if args.list_intents:
intents = mapper.list_intents()
if args.json:
print(json_module.dumps(intents, indent=2))
else:
for intent in intents:
print(f" - {intent}")

elif args.list_mappings:
mappings = mapper.list_mappings()
if args.json:
print(json_module.dumps(mappings, indent=2))
else:
for intent, templates in mappings.items():
print(f"{intent}:")
for t in templates:
print(f" - {t}")

elif args.classify:
templates, intent, confidence = mapper.get_templates_for_message(
args.classify,
agent_type=args.agent,
track=args.track,
)

if args.json:
print(json_module.dumps({
"message": args.classify,
"intent": intent.value,
"confidence": confidence,
"templates": [t.name for t in templates],
}, indent=2))
else:
print(f"Message: {args.classify}")
print(f"Intent: {intent.value} (confidence: {confidence:.2f})")
print(f"Templates ({len(templates)}):")
for t in templates:
print(f" - {t.name}: {t.description}")

else:
parser.print_help()