ADR-031: RAG Query Engine
Status
PROPOSED
Date
2026-01-15
Context
After corpus processing and hierarchical storage, users need to interactively query the knowledge base. Retrieval-Augmented Generation (RAG) enables:
- Semantic search: Find relevant content by meaning, not just keywords
- Grounded generation: Generate responses with source citations
- Reduced hallucination: Anchor responses in actual corpus content
- Efficient token usage: Retrieve only relevant chunks per query
Requirements
| Requirement | Priority | Rationale |
|---|---|---|
| Sub-second retrieval | P0 | Interactive UX |
| Mandatory citations | P0 | Compliance, trust |
| Multi-corpus search | P1 | Cross-analysis queries |
| Adaptive retrieval depth | P1 | Query complexity varies |
| Self-correction on low confidence | P2 | Quality assurance |
RAG Landscape (2025)
| Variant | Description | Use Case |
|---|---|---|
| Traditional RAG | Retrieve → Generate | Simple factual queries |
| Long RAG | Retrieve entire sections | Context-heavy analysis |
| Self-RAG | Model decides when to retrieve | Complex reasoning |
| Corrective RAG | Validate and correct retrieved context | High-accuracy needs |
| GraphRAG | Knowledge graph + vector retrieval | Entity relationships |
| Adaptive RAG | Dynamic strategy based on query | Mixed query types |
Decision
Implement an Adaptive RAG Query Engine that selects retrieval strategy based on query characteristics.
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ RAG QUERY ENGINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ QUERY ANALYZER │ │
│ │ - Intent classification │ │
│ │ - Complexity estimation │ │
│ │ - Strategy selection │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ SIMPLE │ │ STANDARD │ │ COMPLEX │ │
│ │ RETRIEVER │ │ RETRIEVER │ │ RETRIEVER │ │
│ │ (top-k=3) │ │ (top-k=10) │ │ (multi-hop) │ │
│ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ CONTEXT ASSEMBLER │ │
│ │ - Deduplicate chunks │ │
│ │ - Order by relevance │ │
│ │ - Fit within token budget │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RESPONSE GENERATOR │ │
│ │ - Generate with mandatory citations │ │
│ │ - Confidence scoring │ │
│ │ - Hallucination detection │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RESPONSE VALIDATOR │ │
│ │ - Citation verification │ │
│ │ - Self-correction if needed │ │
│ │ - Confidence thresholding │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Core Implementation
class QueryIntent(Enum):
"""Classification of query intent"""
FACTUAL = "factual" # Single fact lookup
ANALYTICAL = "analytical" # Multi-fact synthesis
COMPARATIVE = "comparative" # Compare across sources
EXPLORATORY = "exploratory" # Open-ended exploration
PROCEDURAL = "procedural" # How-to questions
class RetrievalStrategy(Enum):
"""Retrieval strategy selection"""
SIMPLE = "simple" # Top-k vector search
STANDARD = "standard" # Top-k with reranking
MULTI_HOP = "multi_hop" # Iterative retrieval
GRAPH_ENHANCED = "graph" # Vector + knowledge graph
EXHAUSTIVE = "exhaustive" # Full corpus scan
@dataclass
class RAGConfig:
"""Configuration for RAG engine"""
# Retrieval settings
default_top_k: int = 10
max_top_k: int = 50
similarity_threshold: float = 0.7
# Reranking
enable_reranking: bool = True
reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"
# Generation
require_citations: bool = True
min_citations_per_claim: int = 1
max_context_tokens: int = 8000
# Self-correction
enable_self_correction: bool = True
confidence_threshold: float = 0.8
max_correction_rounds: int = 2
# Compliance
audit_all_queries: bool = True
class RAGQueryEngine:
"""Adaptive RAG query engine"""
def __init__(
self,
knowledge_store: HierarchicalKnowledgeStore,
vector_index: VectorIndex,
llm: LLM,
config: RAGConfig
):
self.store = knowledge_store
self.vector_index = vector_index
self.llm = llm
self.config = config
self.query_analyzer = QueryAnalyzer()
self.reranker = CrossEncoderReranker(config.reranker_model)
async def query(
self,
query: str,
corpus_ids: List[UUID],
user_context: UserContext,
access_reason: str
) -> RAGResponse:
"""Execute RAG query with adaptive strategy"""
# Step 1: Analyze query
analysis = await self.query_analyzer.analyze(query)
strategy = self._select_strategy(analysis)
# Step 2: Retrieve context
context = await self._retrieve(
query=query,
corpus_ids=corpus_ids,
strategy=strategy,
analysis=analysis
)
# Step 3: Generate response
response = await self._generate(
query=query,
context=context,
analysis=analysis
)
# Step 4: Validate and correct
if self.config.enable_self_correction:
response = await self._validate_and_correct(
query=query,
response=response,
context=context
)
# Step 5: Audit
await self._audit_query(
query=query,
response=response,
user_context=user_context,
access_reason=access_reason
)
return response
def _select_strategy(self, analysis: QueryAnalysis) -> RetrievalStrategy:
"""Select retrieval strategy based on query analysis"""
if analysis.intent == QueryIntent.FACTUAL:
return RetrievalStrategy.SIMPLE
elif analysis.intent == QueryIntent.ANALYTICAL:
if analysis.estimated_complexity < 0.5:
return RetrievalStrategy.STANDARD
else:
return RetrievalStrategy.MULTI_HOP
elif analysis.intent == QueryIntent.COMPARATIVE:
return RetrievalStrategy.GRAPH_ENHANCED
elif analysis.intent == QueryIntent.EXPLORATORY:
return RetrievalStrategy.EXHAUSTIVE
else:
return RetrievalStrategy.STANDARD
async def _retrieve(
self,
query: str,
corpus_ids: List[UUID],
strategy: RetrievalStrategy,
analysis: QueryAnalysis
) -> RetrievalContext:
"""Execute retrieval with selected strategy"""
if strategy == RetrievalStrategy.SIMPLE:
return await self._simple_retrieve(query, corpus_ids)
elif strategy == RetrievalStrategy.STANDARD:
return await self._standard_retrieve(query, corpus_ids)
elif strategy == RetrievalStrategy.MULTI_HOP:
return await self._multi_hop_retrieve(query, corpus_ids, analysis)
elif strategy == RetrievalStrategy.GRAPH_ENHANCED:
return await self._graph_retrieve(query, corpus_ids)
else: # EXHAUSTIVE
return await self._exhaustive_retrieve(query, corpus_ids)
async def _simple_retrieve(
self,
query: str,
corpus_ids: List[UUID]
) -> RetrievalContext:
"""Simple top-k vector retrieval"""
# Embed query
query_embedding = await self._embed(query)
# Search
results = await self.vector_index.search(
query_embedding,
corpus_filter=corpus_ids,
top_k=self.config.default_top_k
)
# Load chunks
chunks = await self._load_chunks(results)
return RetrievalContext(
chunks=chunks,
strategy_used=RetrievalStrategy.SIMPLE,
total_retrieved=len(chunks)
)
async def _standard_retrieve(
self,
query: str,
corpus_ids: List[UUID]
) -> RetrievalContext:
"""Standard retrieval with reranking"""
# Initial retrieval (over-fetch)
query_embedding = await self._embed(query)
initial_results = await self.vector_index.search(
query_embedding,
corpus_filter=corpus_ids,
top_k=self.config.max_top_k
)
# Load chunks
chunks = await self._load_chunks(initial_results)
# Rerank with cross-encoder
if self.config.enable_reranking:
reranked = await self.reranker.rerank(query, chunks)
chunks = reranked[:self.config.default_top_k]
return RetrievalContext(
chunks=chunks,
strategy_used=RetrievalStrategy.STANDARD,
total_retrieved=len(chunks)
)
async def _multi_hop_retrieve(
self,
query: str,
corpus_ids: List[UUID],
analysis: QueryAnalysis
) -> RetrievalContext:
"""Iterative multi-hop retrieval for complex queries"""
all_chunks = []
current_query = query
for hop in range(3): # Max 3 hops
# Retrieve for current query
hop_results = await self._standard_retrieve(current_query, corpus_ids)
all_chunks.extend(hop_results.chunks)
# Check if we have enough
if self._sufficient_coverage(all_chunks, analysis):
break
# Generate follow-up query
follow_up = await self._generate_follow_up(
original_query=query,
current_context=all_chunks,
analysis=analysis
)
if follow_up is None:
break
current_query = follow_up
# Deduplicate and rank
unique_chunks = self._deduplicate_chunks(all_chunks)
return RetrievalContext(
chunks=unique_chunks,
strategy_used=RetrievalStrategy.MULTI_HOP,
total_retrieved=len(unique_chunks),
hops_executed=hop + 1
)
async def _graph_retrieve(
self,
query: str,
corpus_ids: List[UUID]
) -> RetrievalContext:
"""Graph-enhanced retrieval for entity relationships"""
# Extract entities from query
query_entities = await self._extract_entities(query)
# Vector retrieval
vector_chunks = await self._standard_retrieve(query, corpus_ids)
# Graph traversal for related entities
graph_chunks = []
for entity in query_entities:
related = await self.store.get_chunks_by_entity(
entity=entity,
corpus_ids=corpus_ids,
max_hops=2
)
graph_chunks.extend(related)
# Merge and deduplicate
all_chunks = self._merge_results(
vector_chunks.chunks,
graph_chunks,
vector_weight=0.6,
graph_weight=0.4
)
return RetrievalContext(
chunks=all_chunks,
strategy_used=RetrievalStrategy.GRAPH_ENHANCED,
total_retrieved=len(all_chunks),
entities_traversed=query_entities
)
async def _generate(
self,
query: str,
context: RetrievalContext,
analysis: QueryAnalysis
) -> RAGResponse:
"""Generate response with mandatory citations"""
# Build prompt
prompt = self._build_generation_prompt(query, context, analysis)
# Generate
response = await self.llm.generate(
prompt=prompt,
system=CITATION_SYSTEM_PROMPT,
max_tokens=2000
)
# Parse citations
parsed = self._parse_citations(response.content, context.chunks)
return RAGResponse(
answer=parsed.text,
citations=parsed.citations,
confidence=self._calculate_confidence(parsed),
context_used=context,
tokens_used=response.usage.total_tokens
)
async def _validate_and_correct(
self,
query: str,
response: RAGResponse,
context: RetrievalContext
) -> RAGResponse:
"""Validate citations and self-correct if needed"""
# Validate each citation
validation_results = []
for citation in response.citations:
is_valid = await self._validate_citation(citation, context)
validation_results.append(is_valid)
invalid_count = sum(1 for v in validation_results if not v)
# If too many invalid, regenerate
if invalid_count > len(response.citations) * 0.3:
# Remove invalid citations from context and regenerate
valid_chunks = [
c for c, v in zip(context.chunks, validation_results) if v
]
corrected_context = RetrievalContext(
chunks=valid_chunks,
strategy_used=context.strategy_used,
total_retrieved=len(valid_chunks)
)
return await self._generate(
query=query,
context=corrected_context,
analysis=QueryAnalysis(intent=QueryIntent.ANALYTICAL)
)
# Check confidence threshold
if response.confidence < self.config.confidence_threshold:
# Retrieve more context
expanded_context = await self._expand_context(query, context)
return await self._generate(query, expanded_context, QueryAnalysis())
return response
# Citation enforcement
CITATION_SYSTEM_PROMPT = """
You are a research assistant that ALWAYS cites sources.
RULES:
1. Every factual claim MUST have a citation in [N] format
2. Citations reference the provided context chunks by number
3. Do not make claims without supporting evidence in context
4. If you cannot find evidence for something, say "No evidence found"
5. Prefer direct quotes when possible
Example:
"The company reported revenue of $10M in Q3 [1], representing a 15% increase [2]."
"""
Citation Format
@dataclass
class Citation:
"""Structured citation"""
citation_id: int
claim_text: str
source_chunk_id: UUID
source_text: str # Exact text from source
confidence: float
span_in_response: Tuple[int, int]
@dataclass
class RAGResponse:
"""Response with citations"""
answer: str
citations: List[Citation]
confidence: float
context_used: RetrievalContext
tokens_used: int
def to_markdown(self) -> str:
"""Format response with citation footnotes"""
text = self.answer
footnotes = []
for c in self.citations:
footnotes.append(f"[{c.citation_id}] {c.source_text[:200]}...")
return f"{text}\n\n---\n**Sources:**\n" + "\n".join(footnotes)
def verify_claims(self) -> List[ClaimVerification]:
"""Verify each claim has valid citation"""
verifications = []
for citation in self.citations:
# Check if source text actually supports claim
similarity = self._semantic_similarity(
citation.claim_text,
citation.source_text
)
verifications.append(ClaimVerification(
citation=citation,
supported=similarity > 0.7,
similarity_score=similarity
))
return verifications
Commands
rag_commands:
- command: "@corpus:query"
description: RAG query against indexed corpus
handler: RAGQueryEngine.query
parameters:
- query: string (required)
- corpus_ids: List[string] (optional, default: all accessible)
- strategy: enum[auto, simple, standard, multi_hop, graph] (default: auto)
- top_k: int (default: 10)
- require_citations: bool (default: true)
- access_reason: string (required)
returns:
- answer: string
- citations: List[Citation]
- confidence: float
- sources: List[SourceReference]
- command: "@corpus:ask"
description: Simplified query interface
handler: RAGQueryEngine.simple_query
parameters:
- question: string (required)
returns:
- answer: string
- sources: List[string]
Consequences
Positive
- Adaptive strategy: Right approach for each query type
- Mandatory citations: Every claim grounded in source
- Self-correction: Automatic quality improvement
- Multi-corpus: Search across analysis runs
- Sub-second: Fast retrieval for interactive use
Negative
- Complexity: Multiple retrieval strategies to maintain
- Reranker latency: Cross-encoder adds 100-200ms
- Citation parsing: LLM may not always follow format
Performance Targets
| Metric | Target | Stretch |
|---|---|---|
| P50 latency | <1s | <500ms |
| P99 latency | <3s | <2s |
| Citation accuracy | >90% | >95% |
| Retrieval relevance | >80% | >90% |
| Hallucination rate | <5% | <2% |
References
- ADR-027: Corpus Processing Subsystem Architecture
- ADR-030: Hierarchical Knowledge Store
- RAG Best Practices 2025
- Self-RAG Paper
- GraphRAG - Microsoft
Approval
| Role | Name | Date | Decision |
|---|---|---|---|
| CTO | Hal Casteel | ||
| ML Lead |