scripts-aggregator
#!/usr/bin/env python3 """
title: "Aggregator - Time-based aggregations, topic clustering, trend analysis" component_type: script version: "1.0.0" audience: contributor status: active summary: "Aggregation functions for /cxq: --by-day, --by-week, topic clustering, --trending" keywords: ['aggregation', 'time-series', 'clustering', 'trending', 'analytics', 'cxq'] tokens: ~400 created: 2026-02-04 updated: 2026-02-04 script_name: "aggregator.py" language: python track: J task: J.4.6
Aggregator Module (J.4.6)
Provides aggregation capabilities for /cxq search results:
- J.4.6.1: Time-based aggregations (--by-day, --by-week, --by-month)
- J.4.6.2: Topic clustering (keyword extraction and grouping)
- J.4.6.3: Trend analysis (--trending for rising topics)
Usage: from scripts.context_graph.aggregator import ( aggregate_by_time, cluster_by_topic, analyze_trends, TimeAggregation, TopicCluster, TrendResult, )
# Time aggregation
daily = aggregate_by_time(messages, period='day')
weekly = aggregate_by_time(messages, period='week')
# Topic clustering
clusters = cluster_by_topic(messages, min_cluster_size=3)
# Trend analysis
trends = analyze_trends(messages, window_days=7)
ADR References: - ADR-149: Query Language Evolution Strategy - ADR-118: Four-Tier Database Architecture """
import re import math from collections import Counter, defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple
Common stop words to exclude from topic extraction
STOP_WORDS = { 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'from', 'as', 'is', 'was', 'are', 'were', 'been', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', 'need', 'it', 'its', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'we', 'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'her', 'our', 'their', 'what', 'which', 'who', 'whom', 'when', 'where', 'why', 'how', 'all', 'each', 'every', 'both', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'not', 'only', 'same', 'so', 'than', 'too', 'very', 'just', 'also', 'now', 'here', 'there', 'then', 'if', 'else', 'while', 'because', 'although', 'though', 'unless', 'since', 'until', 'before', 'after', 'above', 'below', 'between', 'into', 'through', 'during', 'against', 'about', 'without', 'within', # Technical stop words 'use', 'using', 'used', 'file', 'files', 'code', 'run', 'running', 'get', 'set', 'add', 'added', 'update', 'updated', 'create', 'created', 'make', 'made', 'new', 'old', 'first', 'last', 'next', 'previous', }
Valid time periods for aggregation
VALID_PERIODS = ['hour', 'day', 'week', 'month', 'year']
@dataclass class TimeAggregation: """Result of time-based aggregation.""" period: str # 'day', 'week', 'month', etc. buckets: List[Dict[str, Any]] = field(default_factory=list) total_count: int = 0 date_range: Tuple[Optional[str], Optional[str]] = (None, None)
def to_dict(self) -> Dict[str, Any]:
return {
'period': self.period,
'buckets': self.buckets,
'total_count': self.total_count,
'date_range': {
'start': self.date_range[0],
'end': self.date_range[1],
},
}
@dataclass class TopicCluster: """A cluster of messages around a topic.""" topic: str # Primary topic keyword keywords: List[str] = field(default_factory=list) # Related keywords count: int = 0 messages: List[Dict[str, Any]] = field(default_factory=list) score: float = 0.0 # Relevance score
def to_dict(self) -> Dict[str, Any]:
return {
'topic': self.topic,
'keywords': self.keywords,
'count': self.count,
'message_ids': [m.get('id', m.get('rowid', i)) for i, m in enumerate(self.messages)],
'score': round(self.score, 3),
}
@dataclass class TrendResult: """Result of trend analysis.""" keyword: str current_count: int # Count in recent window previous_count: int # Count in previous window growth_rate: float # Percentage growth trend: str # 'rising', 'falling', 'stable'
def to_dict(self) -> Dict[str, Any]:
return {
'keyword': self.keyword,
'current_count': self.current_count,
'previous_count': self.previous_count,
'growth_rate': round(self.growth_rate, 2),
'trend': self.trend,
}
def _parse_timestamp(ts: Any) -> Optional[datetime]: """Parse timestamp from various formats.""" if ts is None: return None if isinstance(ts, datetime): return ts if isinstance(ts, (int, float)): # Unix timestamp try: return datetime.fromtimestamp(ts) except (ValueError, OSError): return None if isinstance(ts, str): # Try various ISO formats for fmt in [ '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d', ]: try: return datetime.strptime(ts, fmt) except ValueError: continue return None
def _get_bucket_key(dt: datetime, period: str) -> str: """Get the bucket key for a datetime based on period.""" if period == 'hour': return dt.strftime('%Y-%m-%d %H:00') elif period == 'day': return dt.strftime('%Y-%m-%d') elif period == 'week': # ISO week format return dt.strftime('%Y-W%W') elif period == 'month': return dt.strftime('%Y-%m') elif period == 'year': return dt.strftime('%Y') else: return dt.strftime('%Y-%m-%d')
def aggregate_by_time( messages: List[Dict[str, Any]], period: str = 'day', timestamp_field: str = 'timestamp', include_messages: bool = False, ) -> TimeAggregation: """ Aggregate messages by time period.
J.4.6.1: Time-based aggregations (--by-day, --by-week)
Args:
messages: List of message dictionaries
period: Aggregation period ('hour', 'day', 'week', 'month', 'year')
timestamp_field: Field name containing the timestamp
include_messages: Whether to include message list in each bucket
Returns:
TimeAggregation with buckets sorted chronologically
"""
if period not in VALID_PERIODS:
raise ValueError(f"Invalid period '{period}'. Must be one of: {VALID_PERIODS}")
buckets: Dict[str, Dict[str, Any]] = defaultdict(lambda: {
'key': '',
'count': 0,
'messages': [] if include_messages else None,
})
min_date: Optional[datetime] = None
max_date: Optional[datetime] = None
for msg in messages:
ts_value = msg.get(timestamp_field) or msg.get('created_at') or msg.get('date')
ts = _parse_timestamp(ts_value)
if ts is None:
continue
# Track date range
if min_date is None or ts < min_date:
min_date = ts
if max_date is None or ts > max_date:
max_date = ts
key = _get_bucket_key(ts, period)
buckets[key]['key'] = key
buckets[key]['count'] += 1
if include_messages:
buckets[key]['messages'].append(msg)
# Sort buckets chronologically
sorted_buckets = sorted(buckets.values(), key=lambda b: b['key'])
# Remove messages list if not requested
if not include_messages:
for bucket in sorted_buckets:
del bucket['messages']
return TimeAggregation(
period=period,
buckets=sorted_buckets,
total_count=len(messages),
date_range=(
min_date.isoformat() if min_date else None,
max_date.isoformat() if max_date else None,
),
)
def _extract_keywords(text: str, min_length: int = 3) -> List[str]: """Extract keywords from text.""" if not text: return []
# Tokenize: split on non-alphanumeric, keep underscores and hyphens
words = re.findall(r'[a-zA-Z][a-zA-Z0-9_-]*', text.lower())
# Filter: remove stop words, short words
keywords = [
w for w in words
if len(w) >= min_length and w not in STOP_WORDS
]
return keywords
def cluster_by_topic( messages: List[Dict[str, Any]], content_field: str = 'content', min_cluster_size: int = 2, max_clusters: int = 20, min_keyword_freq: int = 2, ) -> List[TopicCluster]: """ Cluster messages by topic using keyword extraction.
J.4.6.2: Topic clustering
Args:
messages: List of message dictionaries
content_field: Field name containing the text content
min_cluster_size: Minimum messages per cluster
max_clusters: Maximum number of clusters to return
min_keyword_freq: Minimum keyword frequency to be a topic
Returns:
List of TopicCluster objects sorted by count (descending)
"""
# Extract keywords from all messages
keyword_counter: Counter = Counter()
keyword_to_messages: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for msg in messages:
content = msg.get(content_field) or msg.get('message') or msg.get('text') or ''
keywords = _extract_keywords(content)
# Count keywords
for kw in set(keywords): # unique per message
keyword_counter[kw] += 1
keyword_to_messages[kw].append(msg)
# Build clusters from most frequent keywords
clusters: List[TopicCluster] = []
used_messages: set = set()
for keyword, count in keyword_counter.most_common():
if count < min_keyword_freq:
break
if len(clusters) >= max_clusters:
break
# Get messages for this keyword that haven't been used
cluster_messages = [
m for m in keyword_to_messages[keyword]
if id(m) not in used_messages
]
if len(cluster_messages) < min_cluster_size:
continue
# Find related keywords (co-occurring frequently)
related_keywords: Counter = Counter()
for msg in cluster_messages:
content = msg.get(content_field) or msg.get('message') or msg.get('text') or ''
for kw in _extract_keywords(content):
if kw != keyword:
related_keywords[kw] += 1
top_related = [kw for kw, _ in related_keywords.most_common(5) if related_keywords[kw] >= 2]
# Calculate relevance score (TF-IDF inspired)
tf = count / len(messages) if messages else 0
idf = math.log(len(messages) / (count + 1)) + 1 if messages else 1
score = tf * idf * len(cluster_messages)
cluster = TopicCluster(
topic=keyword,
keywords=top_related,
count=len(cluster_messages),
messages=cluster_messages[:10], # Keep at most 10 sample messages
score=score,
)
clusters.append(cluster)
# Mark messages as used
for m in cluster_messages:
used_messages.add(id(m))
# Sort by count descending
clusters.sort(key=lambda c: c.count, reverse=True)
return clusters
def analyze_trends( messages: List[Dict[str, Any]], window_days: int = 7, timestamp_field: str = 'timestamp', content_field: str = 'content', min_current_count: int = 3, growth_threshold: float = 50.0, ) -> List[TrendResult]: """ Analyze trending topics by comparing recent vs previous time windows.
J.4.6.3: Trend analysis (--trending)
Args:
messages: List of message dictionaries
window_days: Days in each comparison window
timestamp_field: Field name containing the timestamp
content_field: Field name containing the text content
min_current_count: Minimum count in current window to be considered
growth_threshold: Minimum growth percentage to be "rising"
Returns:
List of TrendResult objects sorted by growth rate (descending)
"""
now = datetime.now()
window_start = now - timedelta(days=window_days)
prev_window_start = window_start - timedelta(days=window_days)
# Separate messages into windows
current_messages: List[Dict[str, Any]] = []
previous_messages: List[Dict[str, Any]] = []
for msg in messages:
ts_value = msg.get(timestamp_field) or msg.get('created_at') or msg.get('date')
ts = _parse_timestamp(ts_value)
if ts is None:
continue
if ts >= window_start:
current_messages.append(msg)
elif ts >= prev_window_start:
previous_messages.append(msg)
# Count keywords in each window
def count_keywords(msgs: List[Dict[str, Any]]) -> Counter:
counter: Counter = Counter()
for msg in msgs:
content = msg.get(content_field) or msg.get('message') or msg.get('text') or ''
for kw in set(_extract_keywords(content)):
counter[kw] += 1
return counter
current_counts = count_keywords(current_messages)
previous_counts = count_keywords(previous_messages)
# Calculate trends
trends: List[TrendResult] = []
all_keywords = set(current_counts.keys()) | set(previous_counts.keys())
for keyword in all_keywords:
current = current_counts.get(keyword, 0)
previous = previous_counts.get(keyword, 0)
if current < min_current_count:
continue
# Calculate growth rate
if previous == 0:
growth_rate = 100.0 if current > 0 else 0.0
else:
growth_rate = ((current - previous) / previous) * 100.0
# Determine trend direction
if growth_rate >= growth_threshold:
trend = 'rising'
elif growth_rate <= -growth_threshold:
trend = 'falling'
else:
trend = 'stable'
trends.append(TrendResult(
keyword=keyword,
current_count=current,
previous_count=previous,
growth_rate=growth_rate,
trend=trend,
))
# Sort by growth rate descending (rising trends first)
trends.sort(key=lambda t: t.growth_rate, reverse=True)
return trends
def format_time_aggregation(agg: TimeAggregation, format: str = 'text') -> str: """Format time aggregation for display.""" if format == 'json': return json.dumps(agg.to_dict(), indent=2)
# Text format
lines = [
f"Time Aggregation by {agg.period.upper()}",
f"{'=' * 40}",
f"Total: {agg.total_count} messages",
]
if agg.date_range[0] and agg.date_range[1]:
lines.append(f"Range: {agg.date_range[0][:10]} to {agg.date_range[1][:10]}")
lines.append("")
lines.append(f"{'Period':<20} {'Count':>8} {'Bar':<30}")
lines.append("-" * 60)
max_count = max((b['count'] for b in agg.buckets), default=1)
for bucket in agg.buckets:
bar_len = int((bucket['count'] / max_count) * 25)
bar = '#' * bar_len
lines.append(f"{bucket['key']:<20} {bucket['count']:>8} {bar}")
return '\n'.join(lines)
def format_topic_clusters(clusters: List[TopicCluster], format: str = 'text') -> str: """Format topic clusters for display.""" if format == 'json': return json.dumps([c.to_dict() for c in clusters], indent=2)
# Text format
lines = [
"Topic Clusters",
"=" * 40,
f"Found {len(clusters)} topic clusters",
"",
]
for i, cluster in enumerate(clusters, 1):
lines.append(f"{i}. {cluster.topic.upper()} ({cluster.count} messages)")
if cluster.keywords:
lines.append(f" Related: {', '.join(cluster.keywords)}")
lines.append("")
return '\n'.join(lines)
def format_trends(trends: List[TrendResult], format: str = 'text') -> str: """Format trend results for display.""" if format == 'json': return json.dumps([t.to_dict() for t in trends], indent=2)
# Text format
rising = [t for t in trends if t.trend == 'rising']
falling = [t for t in trends if t.trend == 'falling']
lines = [
"Trend Analysis",
"=" * 40,
]
if rising:
lines.append("")
lines.append("RISING TOPICS:")
for t in rising[:10]:
arrow = "+" if t.growth_rate >= 0 else ""
lines.append(f" {t.keyword}: {t.current_count} ({arrow}{t.growth_rate:.0f}%)")
if falling:
lines.append("")
lines.append("FALLING TOPICS:")
for t in falling[:10]:
lines.append(f" {t.keyword}: {t.current_count} ({t.growth_rate:.0f}%)")
if not rising and not falling:
lines.append("")
lines.append("No significant trends detected.")
return '\n'.join(lines)
def get_aggregation_help() -> str: """Return help text for aggregation options.""" return """ AGGREGATION OPTIONS (J.4.6)
Time-Based Aggregations (--by-day, --by-week, --by-month): Group search results by time period to see activity patterns.
Examples: /cxq "error" --by-day # Daily error counts /cxq --recent 500 --by-week # Weekly message volume /cxq --decisions --by-month # Monthly decision frequency
Periods: --by-hour Group by hour (YYYY-MM-DD HH:00) --by-day Group by day (YYYY-MM-DD) --by-week Group by ISO week (YYYY-WNN) --by-month Group by month (YYYY-MM) --by-year Group by year (YYYY)
Topic Clustering (--topics): Automatically cluster messages by extracted keywords.
Examples: /cxq --recent 200 --topics # Find topics in recent messages /cxq "deploy" --topics # Cluster deployment-related messages
Options: --min-cluster N Minimum messages per cluster (default: 2) --max-clusters N Maximum clusters to return (default: 20)
Trend Analysis (--trending): Identify rising and falling topics over time.
Examples: /cxq --recent 500 --trending # Find trending topics /cxq --trending --window 14 # 14-day comparison window
Options: --window N Days in comparison window (default: 7) --growth-threshold Minimum % change to be "rising" (default: 50)
Output Formats: --format text Human-readable output (default) --format json JSON output for programmatic use """
Import for convenience
import json
all = [ 'TimeAggregation', 'TopicCluster', 'TrendResult', 'aggregate_by_time', 'cluster_by_topic', 'analyze_trends', 'format_time_aggregation', 'format_topic_clusters', 'format_trends', 'get_aggregation_help', 'VALID_PERIODS', ]