Skip to main content

scripts-agent-context-injector

#!/usr/bin/env python3 """​

title: Agent Context Injector component_type: script version: 1.0.0 status: active summary: Injects relevant context into agent prompts based on agent type, task intent, and workflow state keywords: [agent, context, injection, workflow, adr-154, memory] track: J task_id: J.4.9 created: 2026-02-03​

Agent Context Injector - ADR-154 Implementation

Automatically injects relevant context into agent prompts by:

  1. Classifying task intent from user message
  2. Selecting appropriate query templates for agent type
  3. Building context graph using templates
  4. Merging workflow state for multi-turn conversations
  5. Serializing context for prompt injection

Usage: from scripts.context_graph.agent_context_injector import AgentContextInjector

injector = AgentContextInjector()
injection = injector.inject_context(
agent_type="security-specialist",
user_message="Review the authentication code for vulnerabilities",
session_id="session-123"
)

# Use injection.prompt_section in agent prompt
print(injection.prompt_section)

"""

import json import logging import re from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set

Setup logging

logger = logging.getLogger(name)

Add parent to path for imports

import sys _script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))

Import context graph components

try: from scripts.context_graph.builder import ContextGraphBuilder from scripts.context_graph.algorithms import ContextGraph, GraphNode BUILDER_AVAILABLE = True except ImportError: BUILDER_AVAILABLE = False logger.warning("ContextGraphBuilder not available")

Import query templates

try: from scripts.context_graph.query_templates import QueryTemplateRegistry, QueryTemplate TEMPLATES_AVAILABLE = True except ImportError: TEMPLATES_AVAILABLE = False logger.warning("QueryTemplateRegistry not available")

=============================================================================

Data Classes

=============================================================================

@dataclass class IntentClassification: """Result of classifying user message intent.""" primary_intent: str # e.g., "security-review", "code-review", "debug" confidence: float # 0.0 to 1.0 secondary_intents: List[str] = field(default_factory=list) detected_keywords: List[str] = field(default_factory=list) suggested_track: Optional[str] = None # PILOT track letter suggested_agent: Optional[str] = None # Agent type suggestion

@dataclass class ContextInjection: """Result of context injection for an agent turn.""" prompt_section: str # Formatted markdown for prompt injection context_graph: Optional[Any] # ContextGraph if available metadata: Dict[str, Any] = field(default_factory=dict)

@property
def node_count(self) -> int:
"""Number of nodes in context graph."""
if self.context_graph and hasattr(self.context_graph, 'nodes'):
return len(self.context_graph.nodes)
return 0

@property
def templates_used(self) -> List[str]:
"""Templates used for context building."""
return self.metadata.get('templates_used', [])

=============================================================================

Intent Classification

=============================================================================

Intent patterns for classification

INTENT_PATTERNS: Dict[str, Dict[str, Any]] = { "security-review": { "keywords": ["security", "vulnerab", "auth", "permission", "access", "xss", "injection", "owasp", "pentest", "audit"], "track": "D", "agents": ["security-specialist", "penetration-testing-agent", "compliance-auditor"], "templates": ["security-audit-context"], }, "code-review": { "keywords": ["review", "code", "pr", "pull request", "changes", "refactor", "clean"], "track": "E", "agents": ["code-reviewer", "senior-architect", "qa-specialist"], "templates": ["code-review-context"], }, "architecture": { "keywords": ["architect", "design", "adr", "pattern", "structure", "system", "component"], "track": "H", "agents": ["senior-architect", "database-architect", "cloud-architect"], "templates": ["architecture-context"], }, "debugging": { "keywords": ["debug", "error", "fix", "bug", "issue", "problem", "crash", "fail", "exception", "traceback"], "track": "J", "agents": ["debug-specialist", "testing-specialist", "senior-architect"], "templates": ["error-investigation-context"], }, "testing": { "keywords": ["test", "coverage", "unit", "integration", "e2e", "qa", "validation"], "track": "E", "agents": ["testing-specialist", "qa-specialist"], "templates": ["code-review-context"], }, "deployment": { "keywords": ["deploy", "release", "ci/cd", "pipeline", "kubernetes", "docker", "infra"], "track": "C", "agents": ["devops-engineer", "cloud-architect"], "templates": ["workflow-resume-context"], }, "documentation": { "keywords": ["document", "docs", "readme", "guide", "explain", "comment"], "track": "F", "agents": ["codi-documentation-writer", "technical-writer"], "templates": ["architecture-context"], }, "workflow": { "keywords": ["workflow", "orchestrat", "coordinate", "multi-step", "plan", "task"], "track": "K", "agents": ["orchestrator", "workflow-orchestrator"], "templates": ["workflow-resume-context"], }, "moe-evaluation": { "keywords": ["evaluate", "judge", "classify", "verify", "validate", "assess", "moe", "quality", "review"], "track": "H", "agents": ["moe-content-classifier", "moe-judge", "moe-evaluator"], "templates": ["architecture-context", "code-review-context"], }, }

Agent type to template mapping

AGENT_TEMPLATE_MAP: Dict[str, List[str]] = { # Security agents "security-specialist": ["security-audit-context", "code-review-context"], "penetration-testing-agent": ["security-audit-context"], "compliance-auditor": ["security-audit-context"],

# Architecture agents
"senior-architect": ["architecture-context", "code-review-context"],
"database-architect": ["architecture-context"],
"cloud-architect": ["architecture-context", "workflow-resume-context"],
"system-architect": ["architecture-context"],

# Code review agents
"code-reviewer": ["code-review-context", "error-investigation-context"],
"qa-specialist": ["code-review-context"],

# Debug/test agents
"debug-specialist": ["error-investigation-context"],
"testing-specialist": ["error-investigation-context", "code-review-context"],

# DevOps agents
"devops-engineer": ["workflow-resume-context", "architecture-context"],

# Documentation agents
"codi-documentation-writer": ["architecture-context"],
"technical-writer": ["architecture-context"],

# Orchestration agents
"orchestrator": ["workflow-resume-context", "architecture-context"],
"workflow-orchestrator": ["workflow-resume-context"],
"task-coordinator": ["workflow-resume-context"],

# MoE agents (J.25.5.3)
"moe-content-classifier": ["architecture-context", "code-review-context"],
"moe-judge": ["architecture-context", "security-audit-context"],
"moe-evaluator": ["architecture-context", "error-investigation-context"],
"moe-coordinator": ["architecture-context", "workflow-resume-context"],

}

def classify_intent(message: str) -> IntentClassification: """ Classify the intent of a user message.

Args:
message: User message to classify

Returns:
IntentClassification with detected intent, confidence, and suggestions
"""
message_lower = message.lower()
intent_scores: Dict[str, float] = {}
detected_keywords: Dict[str, List[str]] = {}

# Score each intent based on keyword matches
for intent, config in INTENT_PATTERNS.items():
keywords = config["keywords"]
matches = []
for kw in keywords:
if kw in message_lower:
matches.append(kw)

if matches:
# Score based on number of matches and keyword significance
score = len(matches) / len(keywords)
# Boost for exact matches
if any(f" {kw} " in f" {message_lower} " for kw in matches):
score *= 1.2
intent_scores[intent] = min(score, 1.0)
detected_keywords[intent] = matches

# Find best intent
if intent_scores:
sorted_intents = sorted(intent_scores.items(), key=lambda x: x[1], reverse=True)
primary_intent = sorted_intents[0][0]
confidence = sorted_intents[0][1]
secondary_intents = [i[0] for i in sorted_intents[1:3] if i[1] > 0.2]

config = INTENT_PATTERNS[primary_intent]
return IntentClassification(
primary_intent=primary_intent,
confidence=confidence,
secondary_intents=secondary_intents,
detected_keywords=detected_keywords.get(primary_intent, []),
suggested_track=config.get("track"),
suggested_agent=config.get("agents", [None])[0],
)

# Default to general intent
return IntentClassification(
primary_intent="general",
confidence=0.3,
secondary_intents=[],
detected_keywords=[],
suggested_track=None,
suggested_agent=None,
)

=============================================================================

Agent Context Injector

=============================================================================

class AgentContextInjector: """ Injects relevant context into agent prompts based on: - Agent type (from agent registry) - Task intent (classified from user message) - Workflow state (if multi-turn) - Governance policies (from policy nodes)

ADR-154 compliant implementation.
"""

VERSION = "1.0.0"

def __init__(
self,
template_registry: Optional["QueryTemplateRegistry"] = None,
context_builder: Optional["ContextGraphBuilder"] = None,
default_budget: int = 4000,
):
"""
Initialize the agent context injector.

Args:
template_registry: QueryTemplateRegistry instance (auto-loaded if None)
context_builder: ContextGraphBuilder instance (created per injection if None)
default_budget: Default token budget for context graphs
"""
self.default_budget = default_budget

# Initialize template registry
if template_registry:
self._registry = template_registry
elif TEMPLATES_AVAILABLE:
self._registry = QueryTemplateRegistry()
else:
self._registry = None
logger.warning("QueryTemplateRegistry not available - template features disabled")

# Context builder created per injection to manage connections
self._builder_class = ContextGraphBuilder if BUILDER_AVAILABLE else None

def inject_context(
self,
agent_type: str,
user_message: str,
session_id: str,
workflow_id: Optional[str] = None,
budget_tokens: int = None,
include_intent: bool = True,
) -> ContextInjection:
"""
Build and inject context for an agent turn.

Args:
agent_type: Type of agent (e.g., "security-specialist")
user_message: User's message/task description
session_id: Current session ID
workflow_id: Optional workflow ID for multi-turn context
budget_tokens: Token budget override
include_intent: Include intent classification in output

Returns:
ContextInjection with formatted prompt section and metadata
"""
budget = budget_tokens or self.default_budget
metadata: Dict[str, Any] = {
"agent_type": agent_type,
"session_id": session_id,
"workflow_id": workflow_id,
"budget_tokens": budget,
"injected_at": datetime.now().isoformat(),
}

# Step 1: Classify intent
intent = classify_intent(user_message)
metadata["intent"] = {
"primary": intent.primary_intent,
"confidence": intent.confidence,
"secondary": intent.secondary_intents,
"keywords": intent.detected_keywords,
"suggested_track": intent.suggested_track,
}

# Step 2: Select templates
templates = self.select_templates(agent_type, intent)
metadata["templates_used"] = [t.name for t in templates] if templates else []

# Step 3: Build context graph
context_graph = None
if templates and self._builder_class:
try:
context_graph = self._build_context_from_templates(
templates=templates,
task_description=user_message,
budget=budget,
)
metadata["node_count"] = len(context_graph.nodes) if context_graph else 0
metadata["edge_count"] = len(context_graph.edges) if context_graph else 0
except Exception as e:
logger.error(f"Error building context graph: {e}")
metadata["build_error"] = str(e)

# Step 4: Add workflow state if multi-turn
if workflow_id:
workflow_context = self._get_workflow_state(workflow_id)
if workflow_context:
metadata["workflow_state"] = workflow_context
# TODO: Merge workflow context into graph

# Step 5: Serialize for prompt
prompt_section = self._serialize_for_prompt(
context_graph=context_graph,
intent=intent if include_intent else None,
templates=templates,
agent_type=agent_type,
)

metadata["token_estimate"] = self._estimate_tokens(prompt_section)

return ContextInjection(
prompt_section=prompt_section,
context_graph=context_graph,
metadata=metadata,
)

def select_templates(
self,
agent_type: str,
intent: IntentClassification,
) -> List["QueryTemplate"]:
"""
Select appropriate query templates based on agent type and intent.

Args:
agent_type: Type of agent
intent: Classified intent

Returns:
List of QueryTemplate objects to use
"""
if not self._registry:
return []

templates = []
template_names_used: Set[str] = set()

# 1. Get templates mapped to this agent type
agent_templates = AGENT_TEMPLATE_MAP.get(agent_type, [])
for name in agent_templates:
if name not in template_names_used:
template = self._registry.get(name)
if template:
templates.append(template)
template_names_used.add(name)

# 2. Get templates from intent
intent_config = INTENT_PATTERNS.get(intent.primary_intent, {})
intent_templates = intent_config.get("templates", [])
for name in intent_templates:
if name not in template_names_used:
template = self._registry.get(name)
if template:
templates.append(template)
template_names_used.add(name)

# 3. Also check secondary intents
for secondary in intent.secondary_intents[:1]: # Just first secondary
sec_config = INTENT_PATTERNS.get(secondary, {})
sec_templates = sec_config.get("templates", [])
for name in sec_templates:
if name not in template_names_used and len(templates) < 3:
template = self._registry.get(name)
if template:
templates.append(template)
template_names_used.add(name)

# 4. Query registry for agent-type templates if we have few
if len(templates) < 2 and self._registry:
registry_templates = self._registry.find_for_agent(agent_type)
for t in registry_templates:
if t.name not in template_names_used and len(templates) < 3:
templates.append(t)
template_names_used.add(t.name)

return templates[:3] # Max 3 templates

def _build_context_from_templates(
self,
templates: List["QueryTemplate"],
task_description: str,
budget: int,
) -> Optional["ContextGraph"]:
"""Build context graph using selected templates."""
if not templates or not self._builder_class:
return None

# Use first template's configuration
primary = templates[0]

try:
with self._builder_class() as builder:
# Extract config from template
strategy = primary.expansion.strategy
depth = primary.expansion.depth
template_budget = primary.pruning.token_budget

# Use smaller of template budget and request budget
effective_budget = min(template_budget, budget)

graph = builder.build(
task_description=task_description,
seed_strategy=strategy,
token_budget=effective_budget,
max_depth=depth,
)
return graph
except Exception as e:
logger.error(f"Failed to build context graph: {e}")
return None

def _get_workflow_state(self, workflow_id: str) -> Optional[Dict[str, Any]]:
"""
Get accumulated context for a workflow.

TODO: Implement J.4.10 workflow state management
"""
# Placeholder for J.4.10 implementation
return None

# Node type-specific property keys to include in rich serialization
_RICH_NODE_PROPERTIES: Dict[str, List[str]] = {
"decision": ["rationale", "decision_type", "status", "project_path"],
"adr": ["status", "decision", "context"],
"policy": ["rule", "enforcement_level", "scope"],
"error_solution": ["error_pattern", "solution", "language"],
"component": ["component_type", "status", "version"],
"session": ["llm", "created_at"],
}

# Edge types most useful for MoE evaluation context
_MOE_EDGE_TYPES: Set[str] = {
"CONTRADICTS", "GOVERNED_BY", "IMPLEMENTS", "SUPERSEDES",
"DEPENDS_ON", "RELATED_TO",
}

def _is_moe_agent(self, agent_type: str) -> bool:
"""Check if agent type is an MoE agent requiring rich context."""
return agent_type.startswith("moe-") or agent_type in {
"moe-content-classifier", "moe-judge", "moe-evaluator", "moe-coordinator",
}

def _serialize_for_prompt(
self,
context_graph: Optional["ContextGraph"],
intent: Optional[IntentClassification],
templates: List["QueryTemplate"],
agent_type: str,
) -> str:
"""Serialize context graph to markdown for prompt injection.

MoE agents get enriched output with relevance scores, node properties,
and edge relationships. Other agents get a compact summary.
"""
rich = self._is_moe_agent(agent_type)
sections = []

# Header
sections.append("## Injected Context")
sections.append("")
sections.append(f"**Agent:** {agent_type}")

# Intent summary
if intent:
sections.append(f"**Intent:** {intent.primary_intent} (confidence: {intent.confidence:.0%})")
if intent.suggested_track:
sections.append(f"**Track:** {intent.suggested_track}")

# Templates used
if templates:
template_names = ", ".join(t.name for t in templates)
sections.append(f"**Templates:** {template_names}")

# Graph stats for MoE agents
if rich and context_graph and context_graph.nodes:
sections.append(f"**Graph:** {len(context_graph.nodes)} nodes, "
f"{len(context_graph.edges)} edges")

sections.append("")

# Context graph nodes
if context_graph and context_graph.nodes:
nodes = list(context_graph.nodes.values())

# Group by type
nodes_by_type: Dict[str, List[Any]] = {}
for node in nodes:
node_type = node.node_type
if node_type not in nodes_by_type:
nodes_by_type[node_type] = []
nodes_by_type[node_type].append(node)

# Sort each group by relevance
for node_type in nodes_by_type:
nodes_by_type[node_type].sort(
key=lambda n: -getattr(n, 'relevance_score', 0)
)

# Output each type section
type_order = ["policy", "adr", "decision", "component", "error_solution", "session"]
for node_type in type_order:
if node_type not in nodes_by_type:
continue
type_nodes = nodes_by_type[node_type][:10]
sections.append(f"### {node_type.replace('_', ' ').title()}s ({len(type_nodes)})")
sections.append("")

if rich:
self._serialize_nodes_rich(sections, type_nodes, node_type)
else:
self._serialize_nodes_compact(sections, type_nodes)

sections.append("")

# Any remaining types
for node_type, type_nodes in nodes_by_type.items():
if node_type not in type_order:
sections.append(f"### {node_type.replace('_', ' ').title()}s ({len(type_nodes)})")
sections.append("")
for node in type_nodes[:5]:
sections.append(f"- {node.name or node.id}")
sections.append("")

# Edge relationships for MoE agents
if rich and context_graph.edges:
self._serialize_edges(sections, context_graph)

else:
sections.append("*No context graph nodes loaded.*")
sections.append("")

sections.append("---")

return "\n".join(sections)

def _serialize_nodes_compact(
self,
sections: List[str],
nodes: List[Any],
) -> None:
"""Compact node serialization for non-MoE agents."""
for node in nodes:
name = node.name or node.id
sections.append(f"- **{name}**")
if node.properties:
desc = node.properties.get("description", "")[:100]
if desc:
sections.append(f" {desc}")

def _serialize_nodes_rich(
self,
sections: List[str],
nodes: List[Any],
node_type: str,
) -> None:
"""Rich node serialization for MoE agents with relevance and properties."""
prop_keys = self._RICH_NODE_PROPERTIES.get(node_type, ["description"])

for node in nodes:
name = node.name or node.id
score = getattr(node, 'relevance_score', 0)
seed_marker = " [SEED]" if getattr(node, 'is_seed', False) else ""

sections.append(f"- **{name}**{seed_marker} (relevance: {score:.2f})")

if not node.properties:
continue

# Always include description first if present
desc = node.properties.get("description", "")
if desc:
sections.append(f" {desc[:300]}")

# Add type-specific properties
for key in prop_keys:
if key == "description":
continue
value = node.properties.get(key)
if value and str(value).strip():
val_str = str(value)
if len(val_str) > 200:
val_str = val_str[:200] + "..."
sections.append(f" **{key}:** {val_str}")

def _serialize_edges(
self,
sections: List[str],
context_graph: Any,
) -> None:
"""Serialize key edge relationships for MoE agents."""
# Group edges by type, filter to MoE-relevant types
edges_by_type: Dict[str, List[Any]] = {}
for edge in context_graph.edges:
if edge.edge_type in self._MOE_EDGE_TYPES:
if edge.edge_type not in edges_by_type:
edges_by_type[edge.edge_type] = []
edges_by_type[edge.edge_type].append(edge)

if not edges_by_type:
return

sections.append("### Key Relationships")
sections.append("")

for edge_type in sorted(edges_by_type.keys()):
edges = edges_by_type[edge_type][:8]
sections.append(f"**{edge_type}** ({len(edges_by_type[edge_type])})")
for edge in edges:
from_node = context_graph.nodes.get(edge.from_node)
to_node = context_graph.nodes.get(edge.to_node)
from_name = from_node.name if from_node else edge.from_node
to_name = to_node.name if to_node else edge.to_node
sections.append(f"- {from_name} -> {to_name}")
sections.append("")

def _estimate_tokens(self, text: str) -> int:
"""Estimate token count (rough approximation)."""
# Rough estimate: ~4 characters per token for English
return len(text) // 4

def get_templates_for_agent(self, agent_type: str) -> List[str]:
"""Get list of template names suitable for an agent type."""
direct = AGENT_TEMPLATE_MAP.get(agent_type, [])
if self._registry:
from_registry = [t.name for t in self._registry.find_for_agent(agent_type)]
return list(set(direct + from_registry))
return direct

=============================================================================

MoE Context Builder (J.25.5.3)

=============================================================================

def build_moe_context( task_description: str, agent_type: str = "moe-evaluator", budget_tokens: int = 4000, session_id: str = "moe-eval", include_edges: bool = True, ) -> str: """ Build a context graph and return rich markdown for MoE agent prompts.

Convenience function for MoE scripts (classifier, judges, evaluators)
that need knowledge graph context without full agent invocation.

Args:
task_description: What the MoE agent is evaluating
agent_type: MoE agent type (determines templates)
budget_tokens: Token budget for context graph
session_id: Session identifier
include_edges: Include edge relationships in output

Returns:
Markdown string with rich context including relevance scores,
node properties, and key relationships

Example:
from scripts.context_graph.agent_context_injector import build_moe_context

context = build_moe_context(
task_description="Classify document: session-recovery-guide.md",
agent_type="moe-content-classifier",
budget_tokens=2000,
)
# Inject into classifier prompt
prompt = f"{context}\\n\\nClassify the above document..."
"""
injector = AgentContextInjector(default_budget=budget_tokens)
injection = injector.inject_context(
agent_type=agent_type,
user_message=task_description,
session_id=session_id,
budget_tokens=budget_tokens,
)
return injection.prompt_section

=============================================================================

CLI Interface

=============================================================================

def main(): """CLI interface for testing agent context injection.""" import argparse

parser = argparse.ArgumentParser(
description="Agent Context Injector - ADR-154",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""

Examples: %(prog)s --agent security-specialist --message "Review auth code" %(prog)s --agent senior-architect --message "Design new API" %(prog)s --classify "Fix the bug in user login" %(prog)s --list-agents """ )

parser.add_argument('--agent', '-a', metavar='TYPE',
help='Agent type to inject context for')
parser.add_argument('--message', '-m', metavar='TEXT',
help='User message to process')
parser.add_argument('--session', '-s', default='test-session',
help='Session ID (default: test-session)')
parser.add_argument('--workflow', '-w', metavar='ID',
help='Workflow ID for multi-turn context')
parser.add_argument('--budget', '-b', type=int, default=4000,
help='Token budget (default: 4000)')

parser.add_argument('--classify', metavar='MESSAGE',
help='Classify intent of a message')
parser.add_argument('--list-agents', action='store_true',
help='List known agent types and their templates')
parser.add_argument('--json', action='store_true',
help='Output in JSON format')

args = parser.parse_args()

# Classify intent
if args.classify:
intent = classify_intent(args.classify)
if args.json:
print(json.dumps({
"primary_intent": intent.primary_intent,
"confidence": intent.confidence,
"secondary_intents": intent.secondary_intents,
"keywords": intent.detected_keywords,
"suggested_track": intent.suggested_track,
"suggested_agent": intent.suggested_agent,
}, indent=2))
else:
print(f"\nIntent Classification for: \"{args.classify}\"")
print(f" Primary: {intent.primary_intent} ({intent.confidence:.0%})")
print(f" Secondary: {', '.join(intent.secondary_intents) or 'none'}")
print(f" Keywords: {', '.join(intent.detected_keywords) or 'none'}")
print(f" Suggested Track: {intent.suggested_track or 'N/A'}")
print(f" Suggested Agent: {intent.suggested_agent or 'N/A'}")
return

# List agents
if args.list_agents:
print("\nKnown Agent Types and Templates:")
print("=" * 60)
for agent, templates in sorted(AGENT_TEMPLATE_MAP.items()):
print(f"\n {agent}:")
for t in templates:
print(f" - {t}")
return

# Inject context
if args.agent and args.message:
injector = AgentContextInjector(default_budget=args.budget)
injection = injector.inject_context(
agent_type=args.agent,
user_message=args.message,
session_id=args.session,
workflow_id=args.workflow,
)

if args.json:
print(json.dumps({
"node_count": injection.node_count,
"templates_used": injection.templates_used,
"metadata": injection.metadata,
"prompt_section": injection.prompt_section,
}, indent=2, default=str))
else:
print(injection.prompt_section)
print(f"\n[Metadata]")
print(f" Nodes: {injection.node_count}")
print(f" Templates: {', '.join(injection.templates_used) or 'none'}")
print(f" Tokens: ~{injection.metadata.get('token_estimate', 0)}")
return

# No action specified
parser.print_help()

if name == "main": main()