scripts-group-sort-processor
#!/usr/bin/env python3 """
title: GROUP BY and ORDER BY Processor for /cxq component_type: script version: 1.0.0 status: active summary: Processes --group-by and --sort flags for /cxq (ADR-149 Phase 1, J.4.7.2) keywords: [cxq, query, group-by, sort, order, aggregation] track: J task_id: J.4.7.2 created: 2026-02-03
GROUP BY and ORDER BY Processor - J.4.7.2 Implementation
Parses and validates --group-by and --sort flags for /cxq command.
Features:
- GROUP BY: Comma-separated field names with time functions
- ORDER BY: Field names with optional direction (asc/desc)
- Field validation per query type
- Time-based grouping (day, week, month)
Example: --group-by model_name,week # Group by model and week --sort cost_total_usd_desc # Sort by cost descending --sort created_at_asc # Sort by date ascending
Usage in /cxq: /cxq --tokens --group-by model_name --sort cost_total_usd_desc /cxq --tools --group-by tool_name --sort invocation_count_desc """
import re import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(name)
=============================================================================
Data Classes
=============================================================================
@dataclass class GroupByResult: """Result of parsing --group-by flag.""" sql: str # SQL GROUP BY clause (without "GROUP BY" keyword) select_fields: List[str] # Fields to add to SELECT success: bool error: Optional[str] = None original: str = ""
def __bool__(self) -> bool:
return self.success
@dataclass class SortResult: """Result of parsing --sort flag.""" sql: str # SQL ORDER BY clause (without "ORDER BY" keyword) success: bool error: Optional[str] = None original: str = ""
def __bool__(self) -> bool:
return self.success
=============================================================================
Time Functions for Grouping
=============================================================================
SQLite time functions for date grouping
TIME_FUNCTIONS = { 'day': "strftime('%Y-%m-%d', {field})", 'week': "strftime('%Y-W%W', {field})", 'month': "strftime('%Y-%m', {field})", 'year': "strftime('%Y', {field})", 'hour': "strftime('%Y-%m-%d %H:00', {field})", }
Fields that can be used with time functions
TIME_FIELDS = ['created_at', 'timestamp', 'extraction_date', 'updated_at']
=============================================================================
Field Definitions per Query Type
=============================================================================
Token economics fields (sessions.db)
TOKEN_FIELDS = [ 'id', 'session_id', 'model_name', 'model_tier', 'token_input', 'token_output', 'token_cache_read', 'token_cache_write', 'cost_input_usd', 'cost_output_usd', 'cost_total_usd', 'task_id', 'agent_name', 'operation_type', 'created_at' ]
Tool analytics fields (sessions.db)
TOOL_FIELDS = [ 'id', 'session_id', 'tool_name', 'tool_category', 'invocation_count', 'success_count', 'failure_count', 'avg_latency_ms', 'max_latency_ms', 'min_latency_ms', 'task_id', 'operation_context', 'created_at' ]
Decisions fields (org.db)
DECISION_FIELDS = [ 'id', 'session_id', 'decision_type', 'decision', 'rationale', 'confidence', 'source', 'created_at', 'extraction_date' ]
Error solutions fields (org.db)
ERROR_FIELDS = [ 'id', 'session_id', 'error_type', 'error_message', 'solution', 'confidence', 'context', 'created_at', 'extraction_date' ]
Skill learnings fields (org.db)
LEARNING_FIELDS = [ 'id', 'session_id', 'skill_name', 'pattern_type', 'learning', 'confidence', 'context', 'created_at', 'extraction_date' ]
Message fields (sessions.db)
MESSAGE_FIELDS = [ 'id', 'session_id', 'llm_source', 'model', 'role', 'content', 'timestamp', 'message_type', 'tool_name', 'tool_status' ]
Activity associations fields (sessions.db)
ACTIVITY_FIELDS = [ 'id', 'session_id', 'component_type', 'component_name', 'activity_type', 'confidence', 'evidence', 'created_at' ]
=============================================================================
Aggregation Functions
=============================================================================
Standard SQL aggregation functions
AGGREGATION_FUNCTIONS = { 'count': 'COUNT({field})', 'sum': 'SUM({field})', 'avg': 'AVG({field})', 'min': 'MIN({field})', 'max': 'MAX({field})', 'total': 'SUM({field})', # alias for sum }
Fields that commonly need aggregation when grouping
NUMERIC_FIELDS = [ 'token_input', 'token_output', 'token_cache_read', 'token_cache_write', 'cost_input_usd', 'cost_output_usd', 'cost_total_usd', 'invocation_count', 'success_count', 'failure_count', 'avg_latency_ms', 'max_latency_ms', 'min_latency_ms', 'confidence' ]
=============================================================================
Parsers
=============================================================================
def parse_group_by( expression: str, query_type: str = 'tokens', date_field: str = 'created_at' ) -> GroupByResult: """ Parse a --group-by expression and return SQL GROUP BY components.
Args:
expression: Comma-separated field names with optional time functions
Examples: "model_name", "model_name,week", "session_id,day"
query_type: Type of query ('tokens', 'tools', 'decisions', etc.)
date_field: Default date field for time functions
Returns:
GroupByResult with sql, select_fields, and success status
Example:
>>> result = parse_group_by("model_name,week", "tokens")
>>> print(result.sql)
model_name, strftime('%Y-W%W', created_at)
>>> print(result.select_fields)
['model_name', "strftime('%Y-W%W', created_at) AS week"]
"""
if not expression or not expression.strip():
return GroupByResult(
sql="",
select_fields=[],
success=True,
original=expression
)
try:
allowed_fields = get_allowed_fields(query_type)
parts = [p.strip() for p in expression.split(',')]
group_parts = []
select_parts = []
for part in parts:
if not part:
continue
# Check for time function (day, week, month, year, hour)
if part.lower() in TIME_FUNCTIONS:
time_func = TIME_FUNCTIONS[part.lower()]
sql_expr = time_func.format(field=date_field)
group_parts.append(sql_expr)
select_parts.append(f"{sql_expr} AS {part.lower()}")
# Check for field.time_func format (e.g., created_at.week)
elif '.' in part:
field, time_key = part.split('.', 1)
if field not in allowed_fields:
return GroupByResult(
sql="",
select_fields=[],
success=False,
error=f"Unknown field: {field}. Allowed: {', '.join(sorted(allowed_fields))}",
original=expression
)
if time_key.lower() not in TIME_FUNCTIONS:
return GroupByResult(
sql="",
select_fields=[],
success=False,
error=f"Unknown time function: {time_key}. Allowed: {', '.join(TIME_FUNCTIONS.keys())}",
original=expression
)
time_func = TIME_FUNCTIONS[time_key.lower()]
sql_expr = time_func.format(field=field)
group_parts.append(sql_expr)
select_parts.append(f"{sql_expr} AS {field}_{time_key.lower()}")
# Regular field
elif part in allowed_fields:
group_parts.append(part)
select_parts.append(part)
else:
return GroupByResult(
sql="",
select_fields=[],
success=False,
error=f"Unknown field: {part}. Allowed: {', '.join(sorted(allowed_fields))}",
original=expression
)
return GroupByResult(
sql=", ".join(group_parts),
select_fields=select_parts,
success=True,
original=expression
)
except Exception as e:
logger.error(f"GROUP BY parse error: {e}")
return GroupByResult(
sql="",
select_fields=[],
success=False,
error=str(e),
original=expression
)
def parse_sort( expression: str, query_type: str = 'tokens' ) -> SortResult: """ Parse a --sort expression and return SQL ORDER BY clause.
Args:
expression: Comma-separated field names with optional direction
Direction suffix: _asc, _desc, or none (defaults to DESC)
Examples: "cost_total_usd_desc", "created_at_asc", "model_name"
Returns:
SortResult with sql and success status
Example:
>>> result = parse_sort("cost_total_usd_desc,created_at_asc", "tokens")
>>> print(result.sql)
cost_total_usd DESC, created_at ASC
"""
if not expression or not expression.strip():
return SortResult(
sql="",
success=True,
original=expression
)
try:
allowed_fields = get_allowed_fields(query_type)
parts = [p.strip() for p in expression.split(',')]
sort_parts = []
for part in parts:
if not part:
continue
# Parse direction suffix
direction = "DESC" # default
field = part
if part.lower().endswith('_asc'):
direction = "ASC"
field = part[:-4] # remove _asc
elif part.lower().endswith('_desc'):
direction = "DESC"
field = part[:-5] # remove _desc
# Validate field
if field not in allowed_fields:
return SortResult(
sql="",
success=False,
error=f"Unknown field: {field}. Allowed: {', '.join(sorted(allowed_fields))}",
original=expression
)
sort_parts.append(f"{field} {direction}")
return SortResult(
sql=", ".join(sort_parts),
success=True,
original=expression
)
except Exception as e:
logger.error(f"SORT parse error: {e}")
return SortResult(
sql="",
success=False,
error=str(e),
original=expression
)
def get_allowed_fields(query_type: str) -> List[str]: """Get allowed fields for a query type.""" mapping = { 'tokens': TOKEN_FIELDS, 'token_economics': TOKEN_FIELDS, 'tools': TOOL_FIELDS, 'tool_analytics': TOOL_FIELDS, 'decisions': DECISION_FIELDS, 'errors': ERROR_FIELDS, 'learnings': LEARNING_FIELDS, 'messages': MESSAGE_FIELDS, 'activities': ACTIVITY_FIELDS, } return mapping.get(query_type, [])
def build_aggregation_select( group_fields: List[str], query_type: str = 'tokens' ) -> List[str]: """ Build SELECT fields with aggregations for grouped queries.
When grouping, numeric fields should be aggregated.
Args:
group_fields: Fields being grouped by
query_type: Type of query
Returns:
List of SELECT field expressions
Example:
>>> fields = build_aggregation_select(['model_name'], 'tokens')
>>> print(fields)
['model_name', 'SUM(token_input) AS total_input', ...]
"""
select_fields = list(group_fields)
# Add standard aggregations based on query type
if query_type in ('tokens', 'token_economics'):
select_fields.extend([
'COUNT(*) AS record_count',
'SUM(token_input) AS total_input',
'SUM(token_output) AS total_output',
'SUM(cost_total_usd) AS total_cost',
'AVG(cost_total_usd) AS avg_cost',
])
elif query_type in ('tools', 'tool_analytics'):
select_fields.extend([
'COUNT(*) AS record_count',
'SUM(invocation_count) AS total_invocations',
'SUM(success_count) AS total_successes',
'SUM(failure_count) AS total_failures',
'AVG(avg_latency_ms) AS avg_latency',
])
elif query_type == 'decisions':
select_fields.extend([
'COUNT(*) AS decision_count',
'AVG(confidence) AS avg_confidence',
])
elif query_type == 'errors':
select_fields.extend([
'COUNT(*) AS error_count',
'AVG(confidence) AS avg_confidence',
])
elif query_type == 'learnings':
select_fields.extend([
'COUNT(*) AS learning_count',
'AVG(confidence) AS avg_confidence',
])
elif query_type == 'activities':
select_fields.extend([
'COUNT(*) AS activity_count',
'AVG(confidence) AS avg_confidence',
])
return select_fields
=============================================================================
CLI for Testing
=============================================================================
def main(): """CLI interface for testing the processor.""" import argparse
parser = argparse.ArgumentParser(
description='GROUP BY and ORDER BY Processor (J.4.7.2)'
)
parser.add_argument('--group-by', '-g', metavar='EXPR',
help='GROUP BY expression to parse')
parser.add_argument('--sort', '-s', metavar='EXPR',
help='SORT expression to parse')
parser.add_argument('--type', '-t', default='tokens',
choices=['tokens', 'tools', 'decisions', 'errors', 'learnings', 'messages', 'activities'],
help='Query type (default: tokens)')
parser.add_argument('--test', action='store_true',
help='Run test cases')
args = parser.parse_args()
if args.test:
# Run test cases
print("=" * 60)
print("GROUP BY / SORT PROCESSOR TEST CASES")
print("=" * 60)
group_tests = [
("model_name", "tokens"),
("model_name,week", "tokens"),
("session_id,day", "tokens"),
("created_at.month", "tokens"),
("tool_name", "tools"),
("decision_type,confidence", "decisions"),
]
print("\n--- GROUP BY Tests ---")
for expr, qtype in group_tests:
result = parse_group_by(expr, qtype)
print(f"\nInput: --group-by \"{expr}\" (type={qtype})")
if result.success:
print(f"SQL: {result.sql}")
print(f"SELECT: {result.select_fields}")
else:
print(f"ERROR: {result.error}")
sort_tests = [
("cost_total_usd_desc", "tokens"),
("created_at_asc", "tokens"),
("model_name,cost_total_usd_desc", "tokens"),
("invocation_count_desc,tool_name_asc", "tools"),
("confidence_desc", "decisions"),
]
print("\n--- SORT Tests ---")
for expr, qtype in sort_tests:
result = parse_sort(expr, qtype)
print(f"\nInput: --sort \"{expr}\" (type={qtype})")
if result.success:
print(f"SQL: {result.sql}")
else:
print(f"ERROR: {result.error}")
return
if args.group_by:
result = parse_group_by(args.group_by, args.type)
if result.success:
print(f"GROUP BY: {result.sql}")
print(f"SELECT: {result.select_fields}")
agg_fields = build_aggregation_select(result.select_fields, args.type)
print(f"With Agg: {agg_fields}")
else:
print(f"ERROR: {result.error}")
return 1
if args.sort:
result = parse_sort(args.sort, args.type)
if result.success:
print(f"ORDER BY: {result.sql}")
else:
print(f"ERROR: {result.error}")
return 1
if not args.group_by and not args.sort and not args.test:
parser.print_help()
return 0
if name == "main": import sys sys.exit(main() or 0)