Skip to main content

#!/usr/bin/env python3 """ Query Template Loader and Registry for Context Graph Queries.

Implements ADR-154 Query Template Schema:

  • Loads YAML templates from template directories
  • Validates against JSON Schema
  • Provides registry for template discovery
  • Supports agent-type and category filtering

Usage: from scripts.context_graph.query_templates import QueryTemplateRegistry

registry = QueryTemplateRegistry()
template = registry.get("security-audit-context")
templates = registry.find_for_agent("security-specialist")

Author: CODITECT Team Version: 1.0.0 ADR: ADR-154 (Context Graph Query DSL and Agent Workflow) """

import json import logging from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional

import yaml

Configure logging

logger = logging.getLogger(name)

Default paths

CODITECT_DATA = Path.home() / "PROJECTS" / ".coditect-data" CODITECT_CORE = Path.home() / ".coditect"

Script directory for relative path resolution

SCRIPT_DIR = Path(file).parent.parent.parent # coditect-core root

SCHEMA_PATH = SCRIPT_DIR / "schemas" / "query-template-v1.0.0.json"

Template directories (searched in order)

TEMPLATE_DIRS = [ CODITECT_DATA / "queries" / "templates", # User templates (highest priority) CODITECT_CORE / "config" / "query-templates", # Protected installation SCRIPT_DIR / "config" / "query-templates", # Submodule location ]

@dataclass class QueryAnchor: """Anchor node definition for graph expansion."""

type: str
id: Optional[str] = None
filter: Optional[str] = None
limit: int = 10
required: bool = False

@dataclass class QueryExpansion: """Graph expansion configuration."""

strategy: str = "semantic"
depth: int = 2
edge_types: List[str] = field(default_factory=list)
exclude_types: List[str] = field(default_factory=list)
max_nodes: int = 100

@dataclass class QueryPruning: """Graph pruning configuration."""

strategy: str = "token_budget"
token_budget: int = 4000
relevance_threshold: float = 0.3
type_priorities: Dict[str, float] = field(default_factory=dict)

@dataclass class QueryOutput: """Output configuration."""

format: str = "markdown"
max_tokens: int = 4000
include_metadata: bool = True
group_by_type: bool = True

@dataclass class QueryWorkflow: """Workflow state integration."""

persist_results: bool = False
workflow_id_param: Optional[str] = None
include_prior_decisions: bool = False

@dataclass class QueryTemplate: """Parsed query template ready for execution."""

name: str
version: str
description: str
anchors: List[QueryAnchor]
expansion: QueryExpansion
pruning: QueryPruning
output: QueryOutput
workflow: QueryWorkflow
author: Optional[str] = None
category: Optional[str] = None
agent_types: List[str] = field(default_factory=list)
track: Optional[str] = None
tags: List[str] = field(default_factory=list)
source_path: Optional[Path] = None

@classmethod
def from_dict(cls, data: Dict[str, Any], source_path: Optional[Path] = None) -> "QueryTemplate":
"""Create QueryTemplate from validated dictionary."""
query = data.get("query", {})

# Parse anchors
anchors = []
for anchor_data in query.get("anchors", []):
anchors.append(
QueryAnchor(
type=anchor_data["type"],
id=anchor_data.get("id"),
filter=anchor_data.get("filter"),
limit=anchor_data.get("limit", 10),
required=anchor_data.get("required", False),
)
)

# Parse expansion
exp_data = query.get("expansion", {})
expansion = QueryExpansion(
strategy=exp_data.get("strategy", "semantic"),
depth=exp_data.get("depth", 2),
edge_types=exp_data.get("edge_types", []),
exclude_types=exp_data.get("exclude_types", []),
max_nodes=exp_data.get("max_nodes", 100),
)

# Parse pruning
prune_data = query.get("pruning", {})
pruning = QueryPruning(
strategy=prune_data.get("strategy", "token_budget"),
token_budget=prune_data.get("token_budget", 4000),
relevance_threshold=prune_data.get("relevance_threshold", 0.3),
type_priorities=prune_data.get("type_priorities", {}),
)

# Parse output
out_data = data.get("output", {})
output = QueryOutput(
format=out_data.get("format", "markdown"),
max_tokens=out_data.get("max_tokens", 4000),
include_metadata=out_data.get("include_metadata", True),
group_by_type=out_data.get("group_by_type", True),
)

# Parse workflow
wf_data = data.get("workflow", {})
workflow = QueryWorkflow(
persist_results=wf_data.get("persist_results", False),
workflow_id_param=wf_data.get("workflow_id_param"),
include_prior_decisions=wf_data.get("include_prior_decisions", False),
)

return cls(
name=data["name"],
version=data["version"],
description=data["description"],
anchors=anchors,
expansion=expansion,
pruning=pruning,
output=output,
workflow=workflow,
author=data.get("author"),
category=data.get("category"),
agent_types=data.get("agent_types", []),
track=data.get("track"),
tags=data.get("tags", []),
source_path=source_path,
)

class QueryTemplateValidator: """Validates query templates against JSON Schema."""

def __init__(self, schema_path: Optional[Path] = None):
self.schema_path = schema_path or SCHEMA_PATH
self._schema: Optional[Dict] = None

@property
def schema(self) -> Dict:
"""Load schema lazily."""
if self._schema is None:
if self.schema_path.exists():
with open(self.schema_path) as f:
self._schema = json.load(f)
else:
logger.warning(f"Schema not found at {self.schema_path}, using minimal validation")
self._schema = {}
return self._schema

def validate(self, data: Dict[str, Any]) -> List[str]:
"""
Validate template data against schema.

Returns:
List of error messages (empty if valid)
"""
errors = []

# Required fields
for field in ["name", "version", "description", "query"]:
if field not in data:
errors.append(f"Missing required field: {field}")

if errors:
return errors

# Name format
name = data.get("name", "")
if not name or not name.replace("-", "").isalnum():
errors.append(f"Invalid name format: {name} (must be kebab-case)")

# Version format
version = data.get("version", "")
parts = version.split(".")
if len(parts) != 3 or not all(p.isdigit() for p in parts):
errors.append(f"Invalid version format: {version} (must be semver)")

# Query validation
query = data.get("query", {})
if not query.get("anchors"):
errors.append("Query must have at least one anchor")

# Anchor validation
valid_anchor_types = {
"track",
"decision",
"error_solution",
"skill_learning",
"component",
"session",
"file",
"function",
"adr",
"policy",
"recent_decisions",
"recent_errors",
"recent_sessions",
}
for i, anchor in enumerate(query.get("anchors", [])):
if "type" not in anchor:
errors.append(f"Anchor {i}: missing 'type' field")
elif anchor["type"] not in valid_anchor_types:
errors.append(f"Anchor {i}: invalid type '{anchor['type']}'")

# Expansion validation
expansion = query.get("expansion", {})
valid_strategies = {"anchor", "semantic", "policy_first", "hybrid"}
if expansion.get("strategy") and expansion["strategy"] not in valid_strategies:
errors.append(f"Invalid expansion strategy: {expansion['strategy']}")

depth = expansion.get("depth", 2)
if not (1 <= depth <= 5):
errors.append(f"Expansion depth must be 1-5, got {depth}")

# Category validation
valid_categories = {"agent-context", "analytics", "audit", "search", "workflow", "debugging"}
if data.get("category") and data["category"] not in valid_categories:
errors.append(f"Invalid category: {data['category']}")

return errors

class QueryTemplateRegistry: """Registry for discovering and loading query templates."""

def __init__(
self,
template_dirs: Optional[List[Path]] = None,
auto_load: bool = True,
):
self.template_dirs = template_dirs or TEMPLATE_DIRS
self.validator = QueryTemplateValidator()
self._templates: Dict[str, QueryTemplate] = {}
self._loaded = False

if auto_load:
self.load_all()

def load_all(self) -> int:
"""
Load all templates from template directories.

Returns:
Number of templates loaded
"""
count = 0
for template_dir in self.template_dirs:
if not template_dir.exists():
logger.debug(f"Template directory not found: {template_dir}")
continue

for yaml_file in template_dir.glob("*.yaml"):
try:
template = self._load_file(yaml_file)
if template:
self._templates[template.name] = template
count += 1
except Exception as e:
logger.error(f"Error loading {yaml_file}: {e}")

for yaml_file in template_dir.glob("*.yml"):
try:
template = self._load_file(yaml_file)
if template:
self._templates[template.name] = template
count += 1
except Exception as e:
logger.error(f"Error loading {yaml_file}: {e}")

self._loaded = True
logger.info(f"Loaded {count} query templates")
return count

def _load_file(self, path: Path) -> Optional[QueryTemplate]:
"""Load and validate a single template file."""
with open(path) as f:
data = yaml.safe_load(f)

if not data:
return None

errors = self.validator.validate(data)
if errors:
logger.warning(f"Validation errors in {path}: {errors}")
return None

return QueryTemplate.from_dict(data, source_path=path)

def get(self, name: str) -> Optional[QueryTemplate]:
"""Get template by name."""
return self._templates.get(name)

def list_all(self) -> List[str]:
"""List all template names."""
return list(self._templates.keys())

def find_by_category(self, category: str) -> List[QueryTemplate]:
"""Find templates by category."""
return [t for t in self._templates.values() if t.category == category]

def find_for_agent(self, agent_type: str) -> List[QueryTemplate]:
"""Find templates designed for a specific agent type."""
return [t for t in self._templates.values() if agent_type in t.agent_types]

def find_by_track(self, track: str) -> List[QueryTemplate]:
"""Find templates associated with a PILOT track."""
return [t for t in self._templates.values() if t.track == track]

def find_by_tags(self, tags: List[str]) -> List[QueryTemplate]:
"""Find templates matching any of the given tags."""
tag_set = set(tags)
return [t for t in self._templates.values() if tag_set & set(t.tags)]

def register(self, template: QueryTemplate) -> None:
"""Register a template (programmatically)."""
self._templates[template.name] = template

def unregister(self, name: str) -> bool:
"""Remove a template from registry."""
if name in self._templates:
del self._templates[name]
return True
return False

def validate_file(self, path: Path) -> List[str]:
"""Validate a template file without loading it."""
with open(path) as f:
data = yaml.safe_load(f)
return self.validator.validate(data)

def to_dict(self) -> Dict[str, Any]:
"""Export registry state as dictionary."""
return {
"template_count": len(self._templates),
"templates": [
{
"name": t.name,
"version": t.version,
"category": t.category,
"agent_types": t.agent_types,
"track": t.track,
}
for t in self._templates.values()
],
"categories": list(set(t.category for t in self._templates.values() if t.category)),
"agent_types": list(
set(at for t in self._templates.values() for at in t.agent_types)
),
}

def create_template_dirs() -> None: """Create template directories if they don't exist.""" for dir_path in TEMPLATE_DIRS: dir_path.mkdir(parents=True, exist_ok=True) logger.info(f"Created template directory: {dir_path}")

def main(): """CLI for query template management.""" import argparse

parser = argparse.ArgumentParser(description="Query Template Registry")
parser.add_argument("--list", action="store_true", help="List all templates")
parser.add_argument("--validate", type=Path, help="Validate a template file")
parser.add_argument("--get", type=str, help="Get template by name")
parser.add_argument("--category", type=str, help="Filter by category")
parser.add_argument("--agent", type=str, help="Filter by agent type")
parser.add_argument("--track", type=str, help="Filter by track")
parser.add_argument("--init", action="store_true", help="Create template directories")
parser.add_argument("--json", action="store_true", help="Output as JSON")

args = parser.parse_args()

if args.init:
create_template_dirs()
print("Template directories created")
return

if args.validate:
validator = QueryTemplateValidator()
with open(args.validate) as f:
data = yaml.safe_load(f)
errors = validator.validate(data)
if errors:
print("Validation errors:")
for error in errors:
print(f" - {error}")
return 1
print("✅ Template is valid")
return 0

registry = QueryTemplateRegistry()

if args.list:
if args.json:
print(json.dumps(registry.to_dict(), indent=2))
else:
templates = registry.list_all()
if templates:
print(f"Templates ({len(templates)}):")
for name in sorted(templates):
t = registry.get(name)
print(f" {name} [{t.category or 'uncategorized'}] - {t.description[:50]}...")
else:
print("No templates found")
return 0

if args.get:
template = registry.get(args.get)
if template:
if args.json:
print(
json.dumps(
{
"name": template.name,
"version": template.version,
"description": template.description,
"category": template.category,
"agent_types": template.agent_types,
"track": template.track,
"tags": template.tags,
"anchors": [
{"type": a.type, "id": a.id, "filter": a.filter, "limit": a.limit}
for a in template.anchors
],
"expansion": {
"strategy": template.expansion.strategy,
"depth": template.expansion.depth,
"edge_types": template.expansion.edge_types,
},
"output": {
"format": template.output.format,
"max_tokens": template.output.max_tokens,
},
},
indent=2,
)
)
else:
print(f"Template: {template.name} v{template.version}")
print(f"Description: {template.description}")
print(f"Category: {template.category or 'uncategorized'}")
print(f"Agent types: {', '.join(template.agent_types) or 'any'}")
print(f"Track: {template.track or 'none'}")
print(f"Anchors: {len(template.anchors)}")
print(f"Expansion: {template.expansion.strategy}, depth={template.expansion.depth}")
else:
print(f"Template not found: {args.get}")
return 1
return 0

if args.category:
templates = registry.find_by_category(args.category)
print(f"Templates in category '{args.category}' ({len(templates)}):")
for t in templates:
print(f" {t.name} - {t.description[:50]}...")
return 0

if args.agent:
templates = registry.find_for_agent(args.agent)
print(f"Templates for agent '{args.agent}' ({len(templates)}):")
for t in templates:
print(f" {t.name} - {t.description[:50]}...")
return 0

if args.track:
templates = registry.find_by_track(args.track)
print(f"Templates for track '{args.track}' ({len(templates)}):")
for t in templates:
print(f" {t.name} - {t.description[:50]}...")
return 0

# Default: show summary
summary = registry.to_dict()
print(f"Query Template Registry")
print(f"Templates: {summary['template_count']}")
print(f"Categories: {', '.join(summary['categories']) or 'none'}")
print(f"Agent types: {len(summary['agent_types'])}")

if name == "main": exit(main() or 0)