Skip to main content

scripts-group-sort-processor

#!/usr/bin/env python3 """

title: GROUP BY and ORDER BY Processor for /cxq component_type: script version: 1.0.0 status: active summary: Processes --group-by and --sort flags for /cxq (ADR-149 Phase 1, J.4.7.2) keywords: [cxq, query, group-by, sort, order, aggregation] track: J task_id: J.4.7.2 created: 2026-02-03

GROUP BY and ORDER BY Processor - J.4.7.2 Implementation

Parses and validates --group-by and --sort flags for /cxq command.

Features:

  • GROUP BY: Comma-separated field names with time functions
  • ORDER BY: Field names with optional direction (asc/desc)
  • Field validation per query type
  • Time-based grouping (day, week, month)

Example: --group-by model_name,week # Group by model and week --sort cost_total_usd_desc # Sort by cost descending --sort created_at_asc # Sort by date ascending

Usage in /cxq: /cxq --tokens --group-by model_name --sort cost_total_usd_desc /cxq --tools --group-by tool_name --sort invocation_count_desc """

import re import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple

logger = logging.getLogger(name)

=============================================================================

Data Classes

=============================================================================

@dataclass class GroupByResult: """Result of parsing --group-by flag.""" sql: str # SQL GROUP BY clause (without "GROUP BY" keyword) select_fields: List[str] # Fields to add to SELECT success: bool error: Optional[str] = None original: str = ""

def __bool__(self) -> bool:
return self.success

@dataclass class SortResult: """Result of parsing --sort flag.""" sql: str # SQL ORDER BY clause (without "ORDER BY" keyword) success: bool error: Optional[str] = None original: str = ""

def __bool__(self) -> bool:
return self.success

=============================================================================

Time Functions for Grouping

=============================================================================

SQLite time functions for date grouping

TIME_FUNCTIONS = { 'day': "strftime('%Y-%m-%d', {field})", 'week': "strftime('%Y-W%W', {field})", 'month': "strftime('%Y-%m', {field})", 'year': "strftime('%Y', {field})", 'hour': "strftime('%Y-%m-%d %H:00', {field})", }

Fields that can be used with time functions

TIME_FIELDS = ['created_at', 'timestamp', 'extraction_date', 'updated_at']

=============================================================================

Field Definitions per Query Type

=============================================================================

Token economics fields (sessions.db)

TOKEN_FIELDS = [ 'id', 'session_id', 'model_name', 'model_tier', 'token_input', 'token_output', 'token_cache_read', 'token_cache_write', 'cost_input_usd', 'cost_output_usd', 'cost_total_usd', 'task_id', 'agent_name', 'operation_type', 'created_at' ]

Tool analytics fields (sessions.db)

TOOL_FIELDS = [ 'id', 'session_id', 'tool_name', 'tool_category', 'invocation_count', 'success_count', 'failure_count', 'avg_latency_ms', 'max_latency_ms', 'min_latency_ms', 'task_id', 'operation_context', 'created_at' ]

Decisions fields (org.db)

DECISION_FIELDS = [ 'id', 'session_id', 'decision_type', 'decision', 'rationale', 'confidence', 'source', 'created_at', 'extraction_date' ]

Error solutions fields (org.db)

ERROR_FIELDS = [ 'id', 'session_id', 'error_type', 'error_message', 'solution', 'confidence', 'context', 'created_at', 'extraction_date' ]

Skill learnings fields (org.db)

LEARNING_FIELDS = [ 'id', 'session_id', 'skill_name', 'pattern_type', 'learning', 'confidence', 'context', 'created_at', 'extraction_date' ]

Message fields (sessions.db)

MESSAGE_FIELDS = [ 'id', 'session_id', 'llm_source', 'model', 'role', 'content', 'timestamp', 'message_type', 'tool_name', 'tool_status' ]

Activity associations fields (sessions.db)

ACTIVITY_FIELDS = [ 'id', 'session_id', 'component_type', 'component_name', 'activity_type', 'confidence', 'evidence', 'created_at' ]

=============================================================================

Aggregation Functions

=============================================================================

Standard SQL aggregation functions

AGGREGATION_FUNCTIONS = { 'count': 'COUNT({field})', 'sum': 'SUM({field})', 'avg': 'AVG({field})', 'min': 'MIN({field})', 'max': 'MAX({field})', 'total': 'SUM({field})', # alias for sum }

Fields that commonly need aggregation when grouping

NUMERIC_FIELDS = [ 'token_input', 'token_output', 'token_cache_read', 'token_cache_write', 'cost_input_usd', 'cost_output_usd', 'cost_total_usd', 'invocation_count', 'success_count', 'failure_count', 'avg_latency_ms', 'max_latency_ms', 'min_latency_ms', 'confidence' ]

=============================================================================

Parsers

=============================================================================

def parse_group_by( expression: str, query_type: str = 'tokens', date_field: str = 'created_at' ) -> GroupByResult: """ Parse a --group-by expression and return SQL GROUP BY components.

Args:
expression: Comma-separated field names with optional time functions
Examples: "model_name", "model_name,week", "session_id,day"
query_type: Type of query ('tokens', 'tools', 'decisions', etc.)
date_field: Default date field for time functions

Returns:
GroupByResult with sql, select_fields, and success status

Example:
>>> result = parse_group_by("model_name,week", "tokens")
>>> print(result.sql)
model_name, strftime('%Y-W%W', created_at)
>>> print(result.select_fields)
['model_name', "strftime('%Y-W%W', created_at) AS week"]
"""
if not expression or not expression.strip():
return GroupByResult(
sql="",
select_fields=[],
success=True,
original=expression
)

try:
allowed_fields = get_allowed_fields(query_type)
parts = [p.strip() for p in expression.split(',')]
group_parts = []
select_parts = []

for part in parts:
if not part:
continue

# Check for time function (day, week, month, year, hour)
if part.lower() in TIME_FUNCTIONS:
time_func = TIME_FUNCTIONS[part.lower()]
sql_expr = time_func.format(field=date_field)
group_parts.append(sql_expr)
select_parts.append(f"{sql_expr} AS {part.lower()}")
# Check for field.time_func format (e.g., created_at.week)
elif '.' in part:
field, time_key = part.split('.', 1)
if field not in allowed_fields:
return GroupByResult(
sql="",
select_fields=[],
success=False,
error=f"Unknown field: {field}. Allowed: {', '.join(sorted(allowed_fields))}",
original=expression
)
if time_key.lower() not in TIME_FUNCTIONS:
return GroupByResult(
sql="",
select_fields=[],
success=False,
error=f"Unknown time function: {time_key}. Allowed: {', '.join(TIME_FUNCTIONS.keys())}",
original=expression
)
time_func = TIME_FUNCTIONS[time_key.lower()]
sql_expr = time_func.format(field=field)
group_parts.append(sql_expr)
select_parts.append(f"{sql_expr} AS {field}_{time_key.lower()}")
# Regular field
elif part in allowed_fields:
group_parts.append(part)
select_parts.append(part)
else:
return GroupByResult(
sql="",
select_fields=[],
success=False,
error=f"Unknown field: {part}. Allowed: {', '.join(sorted(allowed_fields))}",
original=expression
)

return GroupByResult(
sql=", ".join(group_parts),
select_fields=select_parts,
success=True,
original=expression
)

except Exception as e:
logger.error(f"GROUP BY parse error: {e}")
return GroupByResult(
sql="",
select_fields=[],
success=False,
error=str(e),
original=expression
)

def parse_sort( expression: str, query_type: str = 'tokens' ) -> SortResult: """ Parse a --sort expression and return SQL ORDER BY clause.

Args:
expression: Comma-separated field names with optional direction
Direction suffix: _asc, _desc, or none (defaults to DESC)
Examples: "cost_total_usd_desc", "created_at_asc", "model_name"

Returns:
SortResult with sql and success status

Example:
>>> result = parse_sort("cost_total_usd_desc,created_at_asc", "tokens")
>>> print(result.sql)
cost_total_usd DESC, created_at ASC
"""
if not expression or not expression.strip():
return SortResult(
sql="",
success=True,
original=expression
)

try:
allowed_fields = get_allowed_fields(query_type)
parts = [p.strip() for p in expression.split(',')]
sort_parts = []

for part in parts:
if not part:
continue

# Parse direction suffix
direction = "DESC" # default
field = part

if part.lower().endswith('_asc'):
direction = "ASC"
field = part[:-4] # remove _asc
elif part.lower().endswith('_desc'):
direction = "DESC"
field = part[:-5] # remove _desc

# Validate field
if field not in allowed_fields:
return SortResult(
sql="",
success=False,
error=f"Unknown field: {field}. Allowed: {', '.join(sorted(allowed_fields))}",
original=expression
)

sort_parts.append(f"{field} {direction}")

return SortResult(
sql=", ".join(sort_parts),
success=True,
original=expression
)

except Exception as e:
logger.error(f"SORT parse error: {e}")
return SortResult(
sql="",
success=False,
error=str(e),
original=expression
)

def get_allowed_fields(query_type: str) -> List[str]: """Get allowed fields for a query type.""" mapping = { 'tokens': TOKEN_FIELDS, 'token_economics': TOKEN_FIELDS, 'tools': TOOL_FIELDS, 'tool_analytics': TOOL_FIELDS, 'decisions': DECISION_FIELDS, 'errors': ERROR_FIELDS, 'learnings': LEARNING_FIELDS, 'messages': MESSAGE_FIELDS, 'activities': ACTIVITY_FIELDS, } return mapping.get(query_type, [])

def build_aggregation_select( group_fields: List[str], query_type: str = 'tokens' ) -> List[str]: """ Build SELECT fields with aggregations for grouped queries.

When grouping, numeric fields should be aggregated.

Args:
group_fields: Fields being grouped by
query_type: Type of query

Returns:
List of SELECT field expressions

Example:
>>> fields = build_aggregation_select(['model_name'], 'tokens')
>>> print(fields)
['model_name', 'SUM(token_input) AS total_input', ...]
"""
select_fields = list(group_fields)

# Add standard aggregations based on query type
if query_type in ('tokens', 'token_economics'):
select_fields.extend([
'COUNT(*) AS record_count',
'SUM(token_input) AS total_input',
'SUM(token_output) AS total_output',
'SUM(cost_total_usd) AS total_cost',
'AVG(cost_total_usd) AS avg_cost',
])
elif query_type in ('tools', 'tool_analytics'):
select_fields.extend([
'COUNT(*) AS record_count',
'SUM(invocation_count) AS total_invocations',
'SUM(success_count) AS total_successes',
'SUM(failure_count) AS total_failures',
'AVG(avg_latency_ms) AS avg_latency',
])
elif query_type == 'decisions':
select_fields.extend([
'COUNT(*) AS decision_count',
'AVG(confidence) AS avg_confidence',
])
elif query_type == 'errors':
select_fields.extend([
'COUNT(*) AS error_count',
'AVG(confidence) AS avg_confidence',
])
elif query_type == 'learnings':
select_fields.extend([
'COUNT(*) AS learning_count',
'AVG(confidence) AS avg_confidence',
])
elif query_type == 'activities':
select_fields.extend([
'COUNT(*) AS activity_count',
'AVG(confidence) AS avg_confidence',
])

return select_fields

=============================================================================

CLI for Testing

=============================================================================

def main(): """CLI interface for testing the processor.""" import argparse

parser = argparse.ArgumentParser(
description='GROUP BY and ORDER BY Processor (J.4.7.2)'
)
parser.add_argument('--group-by', '-g', metavar='EXPR',
help='GROUP BY expression to parse')
parser.add_argument('--sort', '-s', metavar='EXPR',
help='SORT expression to parse')
parser.add_argument('--type', '-t', default='tokens',
choices=['tokens', 'tools', 'decisions', 'errors', 'learnings', 'messages', 'activities'],
help='Query type (default: tokens)')
parser.add_argument('--test', action='store_true',
help='Run test cases')

args = parser.parse_args()

if args.test:
# Run test cases
print("=" * 60)
print("GROUP BY / SORT PROCESSOR TEST CASES")
print("=" * 60)

group_tests = [
("model_name", "tokens"),
("model_name,week", "tokens"),
("session_id,day", "tokens"),
("created_at.month", "tokens"),
("tool_name", "tools"),
("decision_type,confidence", "decisions"),
]

print("\n--- GROUP BY Tests ---")
for expr, qtype in group_tests:
result = parse_group_by(expr, qtype)
print(f"\nInput: --group-by \"{expr}\" (type={qtype})")
if result.success:
print(f"SQL: {result.sql}")
print(f"SELECT: {result.select_fields}")
else:
print(f"ERROR: {result.error}")

sort_tests = [
("cost_total_usd_desc", "tokens"),
("created_at_asc", "tokens"),
("model_name,cost_total_usd_desc", "tokens"),
("invocation_count_desc,tool_name_asc", "tools"),
("confidence_desc", "decisions"),
]

print("\n--- SORT Tests ---")
for expr, qtype in sort_tests:
result = parse_sort(expr, qtype)
print(f"\nInput: --sort \"{expr}\" (type={qtype})")
if result.success:
print(f"SQL: {result.sql}")
else:
print(f"ERROR: {result.error}")

return

if args.group_by:
result = parse_group_by(args.group_by, args.type)
if result.success:
print(f"GROUP BY: {result.sql}")
print(f"SELECT: {result.select_fields}")
agg_fields = build_aggregation_select(result.select_fields, args.type)
print(f"With Agg: {agg_fields}")
else:
print(f"ERROR: {result.error}")
return 1

if args.sort:
result = parse_sort(args.sort, args.type)
if result.success:
print(f"ORDER BY: {result.sql}")
else:
print(f"ERROR: {result.error}")
return 1

if not args.group_by and not args.sort and not args.test:
parser.print_help()

return 0

if name == "main": import sys sys.exit(main() or 0)