Skip to main content

scripts-where-parser

#!/usr/bin/env python3 """

title: WHERE Expression Parser for /cxq component_type: script version: 1.0.0 status: active summary: Simple expression parser for --where flag (ADR-149 Phase 1, J.4.7.1) keywords: [cxq, query, where, parser, expression, filter] track: J task_id: J.4.7.1 created: 2026-02-03

WHERE Expression Parser - J.4.7.1 Implementation

Parses simple WHERE expressions and converts them to SQL WHERE clauses. Supports:

  • Comparison operators: =, !=, >, <, >=, <=
  • String values: 'quoted' or "quoted"
  • Numeric values: integers and floats
  • Boolean operators: AND, OR (case-insensitive)
  • Field names: alphanumeric + underscores

Example: expression = "confidence > 0.8 AND decision_type = 'api'" sql, params = parse_where(expression) # sql = "confidence > ? AND decision_type = ?" # params = [0.8, 'api']

Usage in /cxq: /cxq --decisions --where "confidence > 0.8 AND decision_type = 'api'" """

import re import logging from dataclasses import dataclass from typing import Any, List, Optional, Tuple

logger = logging.getLogger(name)

=============================================================================

Data Classes

=============================================================================

@dataclass class ParsedCondition: """A single parsed condition.""" field: str operator: str value: Any

def to_sql(self) -> Tuple[str, Any]:
"""Convert to SQL fragment and parameter."""
# Map operators
op_map = {
'=': '=',
'==': '=',
'!=': '!=',
'<>': '!=',
'>': '>',
'<': '<',
'>=': '>=',
'<=': '<=',
'LIKE': 'LIKE',
'like': 'LIKE',
'IN': 'IN',
'in': 'IN',
}
sql_op = op_map.get(self.operator, self.operator)
return f"{self.field} {sql_op} ?", self.value

@dataclass class ParseResult: """Result of parsing a WHERE expression.""" sql: str params: List[Any] success: bool error: Optional[str] = None original: str = ""

def __bool__(self) -> bool:
return self.success

=============================================================================

Tokenizer

=============================================================================

Token patterns

TOKEN_PATTERNS = [ ('STRING_DOUBLE', r'"([^"\]|\.)"'), # "double quoted" ('STRING_SINGLE', r"'([^'\]|\.)'"), # 'single quoted' ('NUMBER', r'-?\d+.?\d*'), # integers and floats ('OPERATOR', r'>=|<=|!=|<>|==|=|>|<'), # comparison operators ('AND', r'\bAND\b', re.IGNORECASE), # AND (case-insensitive) ('OR', r'\bOR\b', re.IGNORECASE), # OR (case-insensitive) ('NOT', r'\bNOT\b', re.IGNORECASE), # NOT (case-insensitive) ('LIKE', r'\bLIKE\b', re.IGNORECASE), # LIKE (case-insensitive) ('IN', r'\bIN\b', re.IGNORECASE), # IN (case-insensitive) ('NULL', r'\bNULL\b', re.IGNORECASE), # NULL (case-insensitive) ('LPAREN', r'('), # ( ('RPAREN', r')'), # ) ('COMMA', r','), # , ('FIELD', r'[a-zA-Z_][a-zA-Z0-9_]*'), # field names ('WHITESPACE', r'\s+'), # whitespace (ignored) ]

def tokenize(expression: str) -> List[Tuple[str, str]]: """ Tokenize a WHERE expression.

Args:
expression: The WHERE expression string

Returns:
List of (token_type, token_value) tuples
"""
tokens = []
pos = 0

while pos < len(expression):
match = None
for token_type, pattern, *flags in TOKEN_PATTERNS:
flag = flags[0] if flags else 0
regex = re.compile(pattern, flag) if flag else re.compile(pattern)
match = regex.match(expression, pos)
if match:
value = match.group()
if token_type != 'WHITESPACE': # Skip whitespace
tokens.append((token_type, value))
pos = match.end()
break

if not match:
raise ValueError(f"Invalid character at position {pos}: '{expression[pos]}'")

return tokens

=============================================================================

Parser

=============================================================================

class WhereParser: """ Simple recursive descent parser for WHERE expressions.

Grammar:
expression ::= term (('AND' | 'OR') term)*
term ::= condition | '(' expression ')' | 'NOT' term
condition ::= field operator value
operator ::= '=' | '!=' | '>' | '<' | '>=' | '<=' | 'LIKE' | 'IN'
value ::= string | number | 'NULL' | '(' value_list ')'
value_list ::= value (',' value)*
"""

def __init__(self, tokens: List[Tuple[str, str]]):
self.tokens = tokens
self.pos = 0
self.sql_parts = []
self.params = []

def current_token(self) -> Optional[Tuple[str, str]]:
"""Get current token or None if exhausted."""
if self.pos < len(self.tokens):
return self.tokens[self.pos]
return None

def consume(self, expected_type: Optional[str] = None) -> Tuple[str, str]:
"""Consume and return current token."""
token = self.current_token()
if token is None:
raise ValueError("Unexpected end of expression")
if expected_type and token[0] != expected_type:
raise ValueError(f"Expected {expected_type}, got {token[0]}: {token[1]}")
self.pos += 1
return token

def peek_type(self) -> Optional[str]:
"""Peek at current token type."""
token = self.current_token()
return token[0] if token else None

def parse(self) -> Tuple[str, List[Any]]:
"""Parse the expression and return SQL + params."""
self.parse_expression()
return ' '.join(self.sql_parts), self.params

def parse_expression(self):
"""Parse: term (('AND' | 'OR') term)*"""
self.parse_term()

while self.peek_type() in ('AND', 'OR'):
op_token = self.consume()
self.sql_parts.append(op_token[1].upper())
self.parse_term()

def parse_term(self):
"""Parse: condition | '(' expression ')' | 'NOT' term"""
if self.peek_type() == 'LPAREN':
self.consume('LPAREN')
self.sql_parts.append('(')
self.parse_expression()
self.consume('RPAREN')
self.sql_parts.append(')')
elif self.peek_type() == 'NOT':
self.consume('NOT')
self.sql_parts.append('NOT')
self.parse_term()
else:
self.parse_condition()

def parse_condition(self):
"""Parse: field operator value"""
# Field name
field_token = self.consume('FIELD')
field = field_token[1]

# Operator
op_type = self.peek_type()
if op_type == 'OPERATOR':
op_token = self.consume('OPERATOR')
op = op_token[1]
# Normalize == to =
if op == '==':
op = '='
elif op_type == 'LIKE':
op_token = self.consume('LIKE')
op = 'LIKE'
elif op_type == 'IN':
op_token = self.consume('IN')
op = 'IN'
elif op_type == 'NOT':
# NOT IN case
self.consume('NOT')
if self.peek_type() == 'IN':
self.consume('IN')
op = 'NOT IN'
else:
raise ValueError(f"Expected IN after NOT, got {self.peek_type()}")
else:
raise ValueError(f"Expected operator, got {op_type}")

# Value
value = self.parse_value()

# Handle IN operator with list
if op in ('IN', 'NOT IN') and isinstance(value, list):
placeholders = ', '.join(['?' for _ in value])
self.sql_parts.append(f"{field} {op} ({placeholders})")
self.params.extend(value)
else:
self.sql_parts.append(f"{field} {op} ?")
self.params.append(value)

def parse_value(self) -> Any:
"""Parse a value (string, number, NULL, or list)."""
token_type = self.peek_type()

if token_type in ('STRING_DOUBLE', 'STRING_SINGLE'):
token = self.consume()
# Remove quotes and unescape
return token[1][1:-1].replace("\\'", "'").replace('\\"', '"')

elif token_type == 'NUMBER':
token = self.consume()
value = token[1]
# Convert to appropriate type
if '.' in value:
return float(value)
return int(value)

elif token_type == 'NULL':
self.consume('NULL')
return None

elif token_type == 'LPAREN':
# List for IN operator
self.consume('LPAREN')
values = [self.parse_value()]
while self.peek_type() == 'COMMA':
self.consume('COMMA')
values.append(self.parse_value())
self.consume('RPAREN')
return values

else:
raise ValueError(f"Expected value, got {token_type}")

=============================================================================

Public API

=============================================================================

def parse_where(expression: str) -> ParseResult: """ Parse a WHERE expression and return SQL + parameters.

Args:
expression: The WHERE expression string

Returns:
ParseResult with sql, params, and success status

Example:
>>> result = parse_where("confidence > 0.8 AND decision_type = 'api'")
>>> print(result.sql)
confidence > ? AND decision_type = ?
>>> print(result.params)
[0.8, 'api']
"""
if not expression or not expression.strip():
return ParseResult(
sql="",
params=[],
success=True,
original=expression
)

try:
# Tokenize
tokens = tokenize(expression.strip())

if not tokens:
return ParseResult(
sql="",
params=[],
success=True,
original=expression
)

# Parse
parser = WhereParser(tokens)
sql, params = parser.parse()

# Ensure all tokens consumed
if parser.current_token() is not None:
remaining = ' '.join(t[1] for t in parser.tokens[parser.pos:])
raise ValueError(f"Unexpected tokens: {remaining}")

return ParseResult(
sql=sql,
params=params,
success=True,
original=expression
)

except Exception as e:
logger.error(f"WHERE parse error: {e}")
return ParseResult(
sql="",
params=[],
success=False,
error=str(e),
original=expression
)

def validate_fields(sql: str, allowed_fields: List[str]) -> Tuple[bool, Optional[str]]: """ Validate that all fields in the SQL are in the allowed list.

Args:
sql: The SQL WHERE clause
allowed_fields: List of allowed field names

Returns:
(is_valid, error_message)
"""
# Extract field names (words before operators)
field_pattern = r'(\w+)\s*(?:=|!=|>|<|>=|<=|LIKE|IN|NOT)'
fields = re.findall(field_pattern, sql, re.IGNORECASE)

allowed_set = set(f.lower() for f in allowed_fields)
for field in fields:
if field.lower() not in allowed_set:
return False, f"Unknown field: {field}. Allowed: {', '.join(sorted(allowed_fields))}"

return True, None

=============================================================================

Field Mappings per Query Type

=============================================================================

Fields allowed for --decisions query

DECISION_FIELDS = [ 'id', 'session_id', 'decision_type', 'decision', 'rationale', 'confidence', 'source', 'created_at', 'extraction_date' ]

Fields allowed for --errors query

ERROR_FIELDS = [ 'id', 'session_id', 'error_type', 'error_message', 'solution', 'confidence', 'context', 'created_at', 'extraction_date' ]

Fields allowed for --learnings query

LEARNING_FIELDS = [ 'id', 'session_id', 'skill_name', 'pattern_type', 'learning', 'confidence', 'context', 'created_at', 'extraction_date' ]

Fields allowed for message queries

MESSAGE_FIELDS = [ 'id', 'session_id', 'llm_source', 'model', 'role', 'content', 'timestamp', 'message_type', 'tool_name', 'tool_status' ]

Fields allowed for --tokens query

TOKEN_FIELDS = [ 'id', 'session_id', 'model', 'tokens_in', 'tokens_out', 'tokens_cache_read', 'tokens_cache_write', 'cost_input_usd', 'cost_output_usd', 'cost_cache_read_usd', 'cost_cache_write_usd', 'cost_total_usd', 'timestamp' ]

Fields allowed for --tools query

TOOL_FIELDS = [ 'id', 'session_id', 'tool_name', 'tool_category', 'invocations', 'success_count', 'failure_count', 'avg_latency_ms', 'timestamp' ]

def get_allowed_fields(query_type: str) -> List[str]: """Get allowed fields for a query type.""" mapping = { 'decisions': DECISION_FIELDS, 'errors': ERROR_FIELDS, 'learnings': LEARNING_FIELDS, 'messages': MESSAGE_FIELDS, 'tokens': TOKEN_FIELDS, 'tools': TOOL_FIELDS, } return mapping.get(query_type, [])

=============================================================================

CLI for Testing

=============================================================================

def main(): """CLI interface for testing the parser.""" import argparse

parser = argparse.ArgumentParser(
description='WHERE Expression Parser (J.4.7.1)'
)
parser.add_argument('expression', nargs='?', help='Expression to parse')
parser.add_argument('--validate', '-v', choices=['decisions', 'errors', 'learnings', 'messages', 'tokens', 'tools'],
help='Validate fields for query type')
parser.add_argument('--test', '-t', action='store_true',
help='Run test cases')

args = parser.parse_args()

if args.test:
# Run test cases
test_cases = [
"confidence > 0.8",
"decision_type = 'api'",
"confidence > 0.8 AND decision_type = 'api'",
"confidence >= 0.5 OR decision_type = 'architecture'",
"model = 'claude-opus-4-5-20251101' AND cost_total_usd > 0.1",
"error_type LIKE '%Type%'",
"skill_name IN ('git-workflow', 'code-review')",
"NOT confidence < 0.3",
"(confidence > 0.8 AND decision_type = 'api') OR source = 'user'",
]

print("=" * 60)
print("WHERE PARSER TEST CASES")
print("=" * 60)

for expr in test_cases:
result = parse_where(expr)
print(f"\nInput: {expr}")
if result.success:
print(f"SQL: {result.sql}")
print(f"Params: {result.params}")
else:
print(f"ERROR: {result.error}")

return

if args.expression:
result = parse_where(args.expression)

if result.success:
print(f"SQL: {result.sql}")
print(f"Params: {result.params}")

if args.validate:
allowed = get_allowed_fields(args.validate)
valid, error = validate_fields(result.sql, allowed)
if valid:
print(f"✓ Valid for --{args.validate}")
else:
print(f"✗ Invalid: {error}")
else:
print(f"ERROR: {result.error}")
return 1
else:
parser.print_help()

return 0

if name == "main": import sys sys.exit(main() or 0)