Skip to main content

ADR-031: RAG Query Engine

Status

PROPOSED

Date

2026-01-15

Context

After corpus processing and hierarchical storage, users need to interactively query the knowledge base. Retrieval-Augmented Generation (RAG) enables:

  1. Semantic search: Find relevant content by meaning, not just keywords
  2. Grounded generation: Generate responses with source citations
  3. Reduced hallucination: Anchor responses in actual corpus content
  4. Efficient token usage: Retrieve only relevant chunks per query

Requirements

RequirementPriorityRationale
Sub-second retrievalP0Interactive UX
Mandatory citationsP0Compliance, trust
Multi-corpus searchP1Cross-analysis queries
Adaptive retrieval depthP1Query complexity varies
Self-correction on low confidenceP2Quality assurance

RAG Landscape (2025)

VariantDescriptionUse Case
Traditional RAGRetrieve → GenerateSimple factual queries
Long RAGRetrieve entire sectionsContext-heavy analysis
Self-RAGModel decides when to retrieveComplex reasoning
Corrective RAGValidate and correct retrieved contextHigh-accuracy needs
GraphRAGKnowledge graph + vector retrievalEntity relationships
Adaptive RAGDynamic strategy based on queryMixed query types

Decision

Implement an Adaptive RAG Query Engine that selects retrieval strategy based on query characteristics.

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│ RAG QUERY ENGINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ QUERY ANALYZER │ │
│ │ - Intent classification │ │
│ │ - Complexity estimation │ │
│ │ - Strategy selection │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ SIMPLE │ │ STANDARD │ │ COMPLEX │ │
│ │ RETRIEVER │ │ RETRIEVER │ │ RETRIEVER │ │
│ │ (top-k=3) │ │ (top-k=10) │ │ (multi-hop) │ │
│ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ CONTEXT ASSEMBLER │ │
│ │ - Deduplicate chunks │ │
│ │ - Order by relevance │ │
│ │ - Fit within token budget │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RESPONSE GENERATOR │ │
│ │ - Generate with mandatory citations │ │
│ │ - Confidence scoring │ │
│ │ - Hallucination detection │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RESPONSE VALIDATOR │ │
│ │ - Citation verification │ │
│ │ - Self-correction if needed │ │
│ │ - Confidence thresholding │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘

Core Implementation

class QueryIntent(Enum):
"""Classification of query intent"""
FACTUAL = "factual" # Single fact lookup
ANALYTICAL = "analytical" # Multi-fact synthesis
COMPARATIVE = "comparative" # Compare across sources
EXPLORATORY = "exploratory" # Open-ended exploration
PROCEDURAL = "procedural" # How-to questions


class RetrievalStrategy(Enum):
"""Retrieval strategy selection"""
SIMPLE = "simple" # Top-k vector search
STANDARD = "standard" # Top-k with reranking
MULTI_HOP = "multi_hop" # Iterative retrieval
GRAPH_ENHANCED = "graph" # Vector + knowledge graph
EXHAUSTIVE = "exhaustive" # Full corpus scan


@dataclass
class RAGConfig:
"""Configuration for RAG engine"""

# Retrieval settings
default_top_k: int = 10
max_top_k: int = 50
similarity_threshold: float = 0.7

# Reranking
enable_reranking: bool = True
reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"

# Generation
require_citations: bool = True
min_citations_per_claim: int = 1
max_context_tokens: int = 8000

# Self-correction
enable_self_correction: bool = True
confidence_threshold: float = 0.8
max_correction_rounds: int = 2

# Compliance
audit_all_queries: bool = True


class RAGQueryEngine:
"""Adaptive RAG query engine"""

def __init__(
self,
knowledge_store: HierarchicalKnowledgeStore,
vector_index: VectorIndex,
llm: LLM,
config: RAGConfig
):
self.store = knowledge_store
self.vector_index = vector_index
self.llm = llm
self.config = config
self.query_analyzer = QueryAnalyzer()
self.reranker = CrossEncoderReranker(config.reranker_model)

async def query(
self,
query: str,
corpus_ids: List[UUID],
user_context: UserContext,
access_reason: str
) -> RAGResponse:
"""Execute RAG query with adaptive strategy"""

# Step 1: Analyze query
analysis = await self.query_analyzer.analyze(query)
strategy = self._select_strategy(analysis)

# Step 2: Retrieve context
context = await self._retrieve(
query=query,
corpus_ids=corpus_ids,
strategy=strategy,
analysis=analysis
)

# Step 3: Generate response
response = await self._generate(
query=query,
context=context,
analysis=analysis
)

# Step 4: Validate and correct
if self.config.enable_self_correction:
response = await self._validate_and_correct(
query=query,
response=response,
context=context
)

# Step 5: Audit
await self._audit_query(
query=query,
response=response,
user_context=user_context,
access_reason=access_reason
)

return response

def _select_strategy(self, analysis: QueryAnalysis) -> RetrievalStrategy:
"""Select retrieval strategy based on query analysis"""

if analysis.intent == QueryIntent.FACTUAL:
return RetrievalStrategy.SIMPLE

elif analysis.intent == QueryIntent.ANALYTICAL:
if analysis.estimated_complexity < 0.5:
return RetrievalStrategy.STANDARD
else:
return RetrievalStrategy.MULTI_HOP

elif analysis.intent == QueryIntent.COMPARATIVE:
return RetrievalStrategy.GRAPH_ENHANCED

elif analysis.intent == QueryIntent.EXPLORATORY:
return RetrievalStrategy.EXHAUSTIVE

else:
return RetrievalStrategy.STANDARD

async def _retrieve(
self,
query: str,
corpus_ids: List[UUID],
strategy: RetrievalStrategy,
analysis: QueryAnalysis
) -> RetrievalContext:
"""Execute retrieval with selected strategy"""

if strategy == RetrievalStrategy.SIMPLE:
return await self._simple_retrieve(query, corpus_ids)

elif strategy == RetrievalStrategy.STANDARD:
return await self._standard_retrieve(query, corpus_ids)

elif strategy == RetrievalStrategy.MULTI_HOP:
return await self._multi_hop_retrieve(query, corpus_ids, analysis)

elif strategy == RetrievalStrategy.GRAPH_ENHANCED:
return await self._graph_retrieve(query, corpus_ids)

else: # EXHAUSTIVE
return await self._exhaustive_retrieve(query, corpus_ids)

async def _simple_retrieve(
self,
query: str,
corpus_ids: List[UUID]
) -> RetrievalContext:
"""Simple top-k vector retrieval"""

# Embed query
query_embedding = await self._embed(query)

# Search
results = await self.vector_index.search(
query_embedding,
corpus_filter=corpus_ids,
top_k=self.config.default_top_k
)

# Load chunks
chunks = await self._load_chunks(results)

return RetrievalContext(
chunks=chunks,
strategy_used=RetrievalStrategy.SIMPLE,
total_retrieved=len(chunks)
)

async def _standard_retrieve(
self,
query: str,
corpus_ids: List[UUID]
) -> RetrievalContext:
"""Standard retrieval with reranking"""

# Initial retrieval (over-fetch)
query_embedding = await self._embed(query)

initial_results = await self.vector_index.search(
query_embedding,
corpus_filter=corpus_ids,
top_k=self.config.max_top_k
)

# Load chunks
chunks = await self._load_chunks(initial_results)

# Rerank with cross-encoder
if self.config.enable_reranking:
reranked = await self.reranker.rerank(query, chunks)
chunks = reranked[:self.config.default_top_k]

return RetrievalContext(
chunks=chunks,
strategy_used=RetrievalStrategy.STANDARD,
total_retrieved=len(chunks)
)

async def _multi_hop_retrieve(
self,
query: str,
corpus_ids: List[UUID],
analysis: QueryAnalysis
) -> RetrievalContext:
"""Iterative multi-hop retrieval for complex queries"""

all_chunks = []
current_query = query

for hop in range(3): # Max 3 hops
# Retrieve for current query
hop_results = await self._standard_retrieve(current_query, corpus_ids)
all_chunks.extend(hop_results.chunks)

# Check if we have enough
if self._sufficient_coverage(all_chunks, analysis):
break

# Generate follow-up query
follow_up = await self._generate_follow_up(
original_query=query,
current_context=all_chunks,
analysis=analysis
)

if follow_up is None:
break

current_query = follow_up

# Deduplicate and rank
unique_chunks = self._deduplicate_chunks(all_chunks)

return RetrievalContext(
chunks=unique_chunks,
strategy_used=RetrievalStrategy.MULTI_HOP,
total_retrieved=len(unique_chunks),
hops_executed=hop + 1
)

async def _graph_retrieve(
self,
query: str,
corpus_ids: List[UUID]
) -> RetrievalContext:
"""Graph-enhanced retrieval for entity relationships"""

# Extract entities from query
query_entities = await self._extract_entities(query)

# Vector retrieval
vector_chunks = await self._standard_retrieve(query, corpus_ids)

# Graph traversal for related entities
graph_chunks = []
for entity in query_entities:
related = await self.store.get_chunks_by_entity(
entity=entity,
corpus_ids=corpus_ids,
max_hops=2
)
graph_chunks.extend(related)

# Merge and deduplicate
all_chunks = self._merge_results(
vector_chunks.chunks,
graph_chunks,
vector_weight=0.6,
graph_weight=0.4
)

return RetrievalContext(
chunks=all_chunks,
strategy_used=RetrievalStrategy.GRAPH_ENHANCED,
total_retrieved=len(all_chunks),
entities_traversed=query_entities
)

async def _generate(
self,
query: str,
context: RetrievalContext,
analysis: QueryAnalysis
) -> RAGResponse:
"""Generate response with mandatory citations"""

# Build prompt
prompt = self._build_generation_prompt(query, context, analysis)

# Generate
response = await self.llm.generate(
prompt=prompt,
system=CITATION_SYSTEM_PROMPT,
max_tokens=2000
)

# Parse citations
parsed = self._parse_citations(response.content, context.chunks)

return RAGResponse(
answer=parsed.text,
citations=parsed.citations,
confidence=self._calculate_confidence(parsed),
context_used=context,
tokens_used=response.usage.total_tokens
)

async def _validate_and_correct(
self,
query: str,
response: RAGResponse,
context: RetrievalContext
) -> RAGResponse:
"""Validate citations and self-correct if needed"""

# Validate each citation
validation_results = []
for citation in response.citations:
is_valid = await self._validate_citation(citation, context)
validation_results.append(is_valid)

invalid_count = sum(1 for v in validation_results if not v)

# If too many invalid, regenerate
if invalid_count > len(response.citations) * 0.3:
# Remove invalid citations from context and regenerate
valid_chunks = [
c for c, v in zip(context.chunks, validation_results) if v
]

corrected_context = RetrievalContext(
chunks=valid_chunks,
strategy_used=context.strategy_used,
total_retrieved=len(valid_chunks)
)

return await self._generate(
query=query,
context=corrected_context,
analysis=QueryAnalysis(intent=QueryIntent.ANALYTICAL)
)

# Check confidence threshold
if response.confidence < self.config.confidence_threshold:
# Retrieve more context
expanded_context = await self._expand_context(query, context)
return await self._generate(query, expanded_context, QueryAnalysis())

return response


# Citation enforcement
CITATION_SYSTEM_PROMPT = """
You are a research assistant that ALWAYS cites sources.

RULES:
1. Every factual claim MUST have a citation in [N] format
2. Citations reference the provided context chunks by number
3. Do not make claims without supporting evidence in context
4. If you cannot find evidence for something, say "No evidence found"
5. Prefer direct quotes when possible

Example:
"The company reported revenue of $10M in Q3 [1], representing a 15% increase [2]."
"""

Citation Format

@dataclass
class Citation:
"""Structured citation"""
citation_id: int
claim_text: str
source_chunk_id: UUID
source_text: str # Exact text from source
confidence: float
span_in_response: Tuple[int, int]


@dataclass
class RAGResponse:
"""Response with citations"""
answer: str
citations: List[Citation]
confidence: float
context_used: RetrievalContext
tokens_used: int

def to_markdown(self) -> str:
"""Format response with citation footnotes"""
text = self.answer

footnotes = []
for c in self.citations:
footnotes.append(f"[{c.citation_id}] {c.source_text[:200]}...")

return f"{text}\n\n---\n**Sources:**\n" + "\n".join(footnotes)

def verify_claims(self) -> List[ClaimVerification]:
"""Verify each claim has valid citation"""
verifications = []
for citation in self.citations:
# Check if source text actually supports claim
similarity = self._semantic_similarity(
citation.claim_text,
citation.source_text
)
verifications.append(ClaimVerification(
citation=citation,
supported=similarity > 0.7,
similarity_score=similarity
))
return verifications

Commands

rag_commands:
- command: "@corpus:query"
description: RAG query against indexed corpus
handler: RAGQueryEngine.query
parameters:
- query: string (required)
- corpus_ids: List[string] (optional, default: all accessible)
- strategy: enum[auto, simple, standard, multi_hop, graph] (default: auto)
- top_k: int (default: 10)
- require_citations: bool (default: true)
- access_reason: string (required)
returns:
- answer: string
- citations: List[Citation]
- confidence: float
- sources: List[SourceReference]

- command: "@corpus:ask"
description: Simplified query interface
handler: RAGQueryEngine.simple_query
parameters:
- question: string (required)
returns:
- answer: string
- sources: List[string]

Consequences

Positive

  • Adaptive strategy: Right approach for each query type
  • Mandatory citations: Every claim grounded in source
  • Self-correction: Automatic quality improvement
  • Multi-corpus: Search across analysis runs
  • Sub-second: Fast retrieval for interactive use

Negative

  • Complexity: Multiple retrieval strategies to maintain
  • Reranker latency: Cross-encoder adds 100-200ms
  • Citation parsing: LLM may not always follow format

Performance Targets

MetricTargetStretch
P50 latency<1s<500ms
P99 latency<3s<2s
Citation accuracy>90%>95%
Retrieval relevance>80%>90%
Hallucination rate<5%<2%

References

Approval

RoleNameDateDecision
CTOHal Casteel
ML Lead