Skip to main content

scripts-result-ranker

#!/usr/bin/env python3 """​

title: Result Ranker for /cxq Search Results component_type: script version: 1.0.0 status: active summary: Relevance scoring, recency weighting, and source boosting for search results (ADR-149, J.4.5) keywords: [cxq, query, ranking, relevance, recency, scoring, bm25] track: J task_id: J.4.5.1 created: 2026-02-04​

Result Ranker - J.4.5 Implementation

Implements multi-factor ranking for /cxq search results:

  • J.4.5.1: Relevance scoring (FTS5 bm25 + term frequency)
  • J.4.5.2: Recency weighting (exponential decay)
  • J.4.5.3: Source type boosting (decisions > skill_learnings > messages)

Ranking Formula: final_score = (relevance * R_WEIGHT) + (recency * T_WEIGHT) + (source_boost * S_WEIGHT)

Where: - relevance: Normalized BM25 score (0.0 - 1.0) - recency: Exponential decay based on age (1.0 = now, 0.0 = old) - source_boost: Type-based multiplier (1.5 for decisions, 1.3 for learnings, 1.0 for messages)

Usage: from scripts.context_graph.result_ranker import ResultRanker, rank_results

# Basic usage
ranked = rank_results(results, query="authentication error")

# With custom weights
ranker = ResultRanker(relevance_weight=0.5, recency_weight=0.3, source_weight=0.2)
ranked = ranker.rank(results, query="authentication")

# Get scoring details
scored = ranker.score_results(results, query="auth")
for item in scored:
print(f"{item['score']:.3f} - {item['content'][:50]}")

"""

import math import re import logging from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import List, Dict, Any, Optional, Tuple from collections import Counter

logger = logging.getLogger(name)

=============================================================================

Configuration

=============================================================================

@dataclass class RankingConfig: """Configuration for result ranking weights and parameters."""

# Weight distribution (must sum to 1.0)
relevance_weight: float = 0.5 # J.4.5.1: Relevance scoring
recency_weight: float = 0.3 # J.4.5.2: Recency weighting
source_weight: float = 0.2 # J.4.5.3: Source type boosting

# Recency decay parameters
recency_half_life_days: float = 7.0 # Half-life for recency decay
recency_max_age_days: float = 365.0 # Maximum age to consider

# Source type boost multipliers (J.4.5.3)
source_boosts: Dict[str, float] = field(default_factory=lambda: {
'decision': 1.5, # Highest: architectural decisions
'skill_learning': 1.4, # High: learned patterns
'error_solution': 1.3, # High: problem solutions
'pattern': 1.2, # Medium: code patterns
'component': 1.1, # Medium: component references
'message': 1.0, # Base: regular messages
'unknown': 0.9, # Below base: unknown types
})

# BM25 parameters
bm25_k1: float = 1.2 # Term frequency saturation
bm25_b: float = 0.75 # Length normalization

def validate(self) -> bool:
"""Validate weight distribution sums to 1.0."""
total = self.relevance_weight + self.recency_weight + self.source_weight
return abs(total - 1.0) < 0.001

Default configuration

DEFAULT_CONFIG = RankingConfig()

=============================================================================

Scoring Functions

=============================================================================

def compute_term_frequency(text: str, terms: List[str]) -> float: """ Compute term frequency score for text.

Args:
text: Text to analyze
terms: Search terms to count

Returns:
Normalized term frequency (0.0 - 1.0)
"""
if not text or not terms:
return 0.0

text_lower = text.lower()
words = re.findall(r'\w+', text_lower)
word_count = len(words)

if word_count == 0:
return 0.0

# Count term occurrences
total_matches = 0
for term in terms:
term_lower = term.lower().strip('"')
# Count exact matches
total_matches += text_lower.count(term_lower)

# Normalize by document length with saturation
# Using BM25-style saturation: tf / (tf + k1)
k1 = DEFAULT_CONFIG.bm25_k1
tf_saturated = total_matches / (total_matches + k1) if total_matches > 0 else 0.0

return min(tf_saturated, 1.0)

def compute_bm25_score(text: str, terms: List[str], avg_doc_length: float = 100.0) -> float: """ Compute BM25-like relevance score.

Args:
text: Document text
terms: Query terms
avg_doc_length: Average document length for normalization

Returns:
BM25 score (0.0 - 1.0 normalized)
"""
if not text or not terms:
return 0.0

text_lower = text.lower()
doc_length = len(text_lower.split())

if doc_length == 0:
return 0.0

k1 = DEFAULT_CONFIG.bm25_k1
b = DEFAULT_CONFIG.bm25_b

# Length normalization factor
length_norm = 1 - b + b * (doc_length / avg_doc_length)

total_score = 0.0
for term in terms:
term_lower = term.lower().strip('"')
tf = text_lower.count(term_lower)

if tf > 0:
# BM25 term score
numerator = tf * (k1 + 1)
denominator = tf + k1 * length_norm
term_score = numerator / denominator
total_score += term_score

# Normalize to 0-1 range (assuming max reasonable score is num_terms * 2)
max_expected = len(terms) * 2
normalized = min(total_score / max_expected, 1.0) if max_expected > 0 else 0.0

return normalized

def compute_recency_score(timestamp: Optional[str], config: RankingConfig = None) -> float: """ Compute recency score using exponential decay.

Args:
timestamp: ISO format timestamp or None
config: Ranking configuration

Returns:
Recency score (0.0 - 1.0, where 1.0 is now)
"""
if config is None:
config = DEFAULT_CONFIG

if not timestamp:
return 0.5 # Unknown age gets middle score

try:
# Parse timestamp
if isinstance(timestamp, datetime):
dt = timestamp
else:
# Handle various timestamp formats
timestamp_str = str(timestamp)
if 'T' in timestamp_str:
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
else:
dt = datetime.fromisoformat(timestamp_str)

# Make timezone-naive for comparison
if dt.tzinfo is not None:
dt = dt.replace(tzinfo=None)

now = datetime.utcnow()
age_days = (now - dt).total_seconds() / 86400.0

# Clamp to max age
if age_days > config.recency_max_age_days:
return 0.0

# Exponential decay: score = 2^(-age/half_life)
decay = math.pow(2, -age_days / config.recency_half_life_days)

return min(max(decay, 0.0), 1.0)

except (ValueError, TypeError) as e:
logger.debug(f"Could not parse timestamp '{timestamp}': {e}")
return 0.5 # Unknown age gets middle score

def compute_source_boost(source_type: Optional[str], config: RankingConfig = None) -> float: """ Compute source type boost multiplier.

Args:
source_type: Type of source (decision, message, skill_learning, etc.)
config: Ranking configuration

Returns:
Boost multiplier (typically 0.9 - 1.5)
"""
if config is None:
config = DEFAULT_CONFIG

if not source_type:
return config.source_boosts.get('unknown', 0.9)

# Normalize source type
source_lower = source_type.lower().strip()

# Map common variations
type_mapping = {
'decisions': 'decision',
'skill_learnings': 'skill_learning',
'error_solutions': 'error_solution',
'patterns': 'pattern',
'components': 'component',
'messages': 'message',
}

normalized_type = type_mapping.get(source_lower, source_lower)

return config.source_boosts.get(normalized_type, config.source_boosts.get('unknown', 0.9))

=============================================================================

Result Ranker Class

=============================================================================

@dataclass class ScoredResult: """A search result with scoring breakdown.""" result: Dict[str, Any] final_score: float relevance_score: float recency_score: float source_boost: float

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary with scores included."""
return {
**self.result,
'_score': self.final_score,
'_relevance': self.relevance_score,
'_recency': self.recency_score,
'_source_boost': self.source_boost,
}

class ResultRanker: """ Multi-factor result ranker for /cxq search results.

Combines relevance, recency, and source type boosting.

Usage:
ranker = ResultRanker()
ranked = ranker.rank(results, query="authentication")

# With custom config
config = RankingConfig(relevance_weight=0.6, recency_weight=0.2, source_weight=0.2)
ranker = ResultRanker(config)
"""

def __init__(self, config: RankingConfig = None,
relevance_weight: float = None,
recency_weight: float = None,
source_weight: float = None):
"""
Initialize ranker with configuration.

Args:
config: Full ranking configuration
relevance_weight: Override relevance weight
recency_weight: Override recency weight
source_weight: Override source weight
"""
if config is None:
config = RankingConfig()

# Apply overrides
if relevance_weight is not None:
config.relevance_weight = relevance_weight
if recency_weight is not None:
config.recency_weight = recency_weight
if source_weight is not None:
config.source_weight = source_weight

# Validate
if not config.validate():
logger.warning("Ranking weights do not sum to 1.0, normalizing")
total = config.relevance_weight + config.recency_weight + config.source_weight
config.relevance_weight /= total
config.recency_weight /= total
config.source_weight /= total

self.config = config

def extract_terms(self, query: str) -> List[str]:
"""
Extract search terms from query.

Args:
query: Search query string

Returns:
List of search terms
"""
if not query:
return []

terms = []

# Extract quoted phrases
phrases = re.findall(r'"([^"]+)"', query)
terms.extend(phrases)

# Remove quoted phrases and operators
remaining = re.sub(r'"[^"]*"', '', query)
remaining = re.sub(r'\b(AND|OR|NOT)\b', ' ', remaining, flags=re.IGNORECASE)

# Extract remaining words (excluding wildcards as separate terms)
words = re.findall(r'\b[\w*]+\b', remaining)
terms.extend([w for w in words if w and len(w) > 1])

return terms

def get_content_field(self, result: Dict) -> str:
"""
Get the main content field from a result.

Args:
result: Result dictionary

Returns:
Content string
"""
# Try common field names
for field in ['content', 'highlighted', 'description', 'summary',
'decision', 'solution', 'pattern', 'text']:
if field in result and result[field]:
return str(result[field])

# Fallback to string representation
return str(result)

def get_timestamp_field(self, result: Dict) -> Optional[str]:
"""
Get the timestamp field from a result.

Args:
result: Result dictionary

Returns:
Timestamp string or None
"""
for field in ['timestamp', 'created_at', 'updated_at', 'date', 'time']:
if field in result and result[field]:
return str(result[field])
return None

def get_source_type(self, result: Dict) -> str:
"""
Determine the source type of a result.

Args:
result: Result dictionary

Returns:
Source type string
"""
# Check explicit type field
if 'type' in result:
return str(result['type'])
if 'source_type' in result:
return str(result['source_type'])
if '_source' in result:
return str(result['_source'])

# Infer from fields present
if 'decision' in result or 'rationale' in result:
return 'decision'
if 'pattern' in result or 'skill_name' in result:
return 'skill_learning'
if 'error' in result and 'solution' in result:
return 'error_solution'
if 'role' in result and 'content' in result:
return 'message'

return 'unknown'

def score_result(self, result: Dict, terms: List[str],
avg_doc_length: float = 100.0) -> ScoredResult:
"""
Score a single result.

Args:
result: Result dictionary
terms: Search terms
avg_doc_length: Average document length

Returns:
ScoredResult with breakdown
"""
content = self.get_content_field(result)
timestamp = self.get_timestamp_field(result)
source_type = self.get_source_type(result)

# Compute component scores
relevance = compute_bm25_score(content, terms, avg_doc_length)
recency = compute_recency_score(timestamp, self.config)
source_boost = compute_source_boost(source_type, self.config)

# Normalize source boost to 0-1 range for weighted sum
max_boost = max(self.config.source_boosts.values())
min_boost = min(self.config.source_boosts.values())
boost_range = max_boost - min_boost
source_normalized = (source_boost - min_boost) / boost_range if boost_range > 0 else 0.5

# Compute final weighted score
final_score = (
relevance * self.config.relevance_weight +
recency * self.config.recency_weight +
source_normalized * self.config.source_weight
)

return ScoredResult(
result=result,
final_score=final_score,
relevance_score=relevance,
recency_score=recency,
source_boost=source_boost,
)

def score_results(self, results: List[Dict], query: str) -> List[ScoredResult]:
"""
Score all results.

Args:
results: List of result dictionaries
query: Original search query

Returns:
List of ScoredResult objects
"""
if not results:
return []

terms = self.extract_terms(query)

# Calculate average document length for BM25
total_length = sum(len(self.get_content_field(r).split()) for r in results)
avg_doc_length = total_length / len(results) if results else 100.0

scored = []
for result in results:
scored_result = self.score_result(result, terms, avg_doc_length)
scored.append(scored_result)

return scored

def rank(self, results: List[Dict], query: str,
limit: Optional[int] = None,
include_scores: bool = False) -> List[Dict]:
"""
Rank results by combined score.

Args:
results: List of result dictionaries
query: Original search query
limit: Maximum results to return
include_scores: If True, include scoring breakdown in results

Returns:
Ranked list of result dictionaries
"""
if not results:
return []

# Score all results
scored = self.score_results(results, query)

# Sort by final score (descending)
scored.sort(key=lambda x: x.final_score, reverse=True)

# Apply limit
if limit:
scored = scored[:limit]

# Convert to output format
if include_scores:
return [s.to_dict() for s in scored]
else:
return [s.result for s in scored]

=============================================================================

Convenience Functions

=============================================================================

def rank_results(results: List[Dict], query: str, relevance_weight: float = 0.5, recency_weight: float = 0.3, source_weight: float = 0.2, limit: Optional[int] = None, include_scores: bool = False) -> List[Dict]: """ Rank search results with multi-factor scoring.

This is the primary convenience function for ranking results.

Args:
results: List of result dictionaries
query: Original search query
relevance_weight: Weight for relevance scoring (0.0 - 1.0)
recency_weight: Weight for recency scoring (0.0 - 1.0)
source_weight: Weight for source type boosting (0.0 - 1.0)
limit: Maximum results to return
include_scores: If True, include _score, _relevance, _recency fields

Returns:
Ranked list of results

Examples:
>>> results = search_messages("authentication error", limit=50)
>>> ranked = rank_results(results, "authentication error")

>>> # Prioritize recent results
>>> ranked = rank_results(results, query, recency_weight=0.5)

>>> # Include scoring breakdown
>>> ranked = rank_results(results, query, include_scores=True)
>>> print(ranked[0]['_score'], ranked[0]['_relevance'])
"""
ranker = ResultRanker(
relevance_weight=relevance_weight,
recency_weight=recency_weight,
source_weight=source_weight,
)
return ranker.rank(results, query, limit=limit, include_scores=include_scores)

def get_ranking_help() -> str: """Return help text for ranking options.""" return """ Result Ranking (J.4.5)

Ranking combines three factors to score search results:

FACTORS: Relevance (50%) BM25-style term frequency scoring Recency (30%) Exponential decay - recent results boosted Source (20%) Type-based boost - decisions ranked higher

SOURCE TYPE BOOSTS: decisions 1.5x (highest priority) skill_learnings 1.4x error_solutions 1.3x patterns 1.2x components 1.1x messages 1.0x (baseline)

OPTIONS: --rank Enable ranking (default for search) --no-rank Disable ranking (use raw order) --rank-weights R,T,S Custom weights (relevance,recency,source) --show-scores Include scoring breakdown in output

EXAMPLES: /cxq "error" --rank # Default ranking /cxq "auth" --rank-weights 0.7,0.2,0.1 # Prioritize relevance /cxq "deploy" --show-scores # Show score breakdown

NOTES:

  • Weights must sum to 1.0 (auto-normalized if not)
  • Recency uses 7-day half-life (recent = higher)
  • Source boosting helps surface decisions/learnings """

=============================================================================

Main (for testing)

=============================================================================

if name == 'main': import sys

# Test data
test_results = [
{
'content': 'Fixed authentication error in OAuth flow',
'timestamp': datetime.utcnow().isoformat(),
'type': 'message',
'role': 'assistant',
},
{
'decision': 'Use JWT tokens for API authentication',
'rationale': 'Industry standard, stateless, easy to verify',
'created_at': (datetime.utcnow() - timedelta(days=3)).isoformat(),
'type': 'decision',
},
{
'content': 'Authentication module needs refactoring for better error handling',
'timestamp': (datetime.utcnow() - timedelta(days=30)).isoformat(),
'type': 'message',
'role': 'user',
},
{
'error': 'AuthenticationError: Invalid token',
'solution': 'Check token expiration and refresh if needed',
'created_at': (datetime.utcnow() - timedelta(days=7)).isoformat(),
'type': 'error_solution',
},
{
'pattern': 'Token refresh pattern for OAuth',
'skill_name': 'oauth-refresh',
'created_at': (datetime.utcnow() - timedelta(days=14)).isoformat(),
'type': 'skill_learning',
},
]

query = "authentication error"

print("Result Ranking Test")
print("=" * 60)
print(f"Query: {query}")
print(f"Results: {len(test_results)}")
print()

# Rank with scores
ranker = ResultRanker()
ranked = ranker.rank(test_results, query, include_scores=True)

print("Ranked Results:")
print("-" * 60)
for i, result in enumerate(ranked, 1):
content = result.get('content') or result.get('decision') or result.get('error', '')
source = result.get('type', 'unknown')
score = result.get('_score', 0)
relevance = result.get('_relevance', 0)
recency = result.get('_recency', 0)
boost = result.get('_source_boost', 1.0)

print(f"{i}. [{source}] Score: {score:.3f}")
print(f" Relevance: {relevance:.3f}, Recency: {recency:.3f}, Boost: {boost:.1f}x")
print(f" {content[:60]}...")
print()

# Show help
if '--help' in sys.argv:
print(get_ranking_help())