Skip to main content

#!/usr/bin/env python3 """ ADR-149 Phase 1 - Field-Specific Search Parser (J.4.7.4)

Parses field:value specifications for field-specific search queries.

Syntax: field:value - Exact match field:value* - Prefix match (LIKE 'value%') field:*value - Suffix match (LIKE '%value') field:value - Contains match (LIKE '%value%') field1:value1,field2:value2 - Multiple field constraints (AND)

Examples: /cxq --field "role:assistant" /cxq --field "role:assistant,content:error" /cxq --field "tool_name:Bash,success:true" /cxq --field "decision_type:architecture"

Created: 2026-02-03 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.4.7.4 """

import re from dataclasses import dataclass from typing import Dict, List, Optional, Tuple, Any

@dataclass class FieldConstraint: """A single field constraint.""" field: str value: str operator: str # '=', 'LIKE' like_pattern: Optional[str] = None # For LIKE operator

@dataclass class FieldQuery: """Parsed field query with multiple constraints.""" constraints: List[FieldConstraint] raw: str

def to_sql_conditions(self, table_alias: str = "") -> Tuple[str, List[Any]]:
"""
Convert constraints to SQL WHERE conditions.

Returns:
Tuple of (condition_string, parameters_list)
"""
if not self.constraints:
return "", []

conditions = []
params = []

prefix = f"{table_alias}." if table_alias else ""

for c in self.constraints:
if c.operator == 'LIKE':
conditions.append(f"{prefix}{c.field} LIKE ?")
params.append(c.like_pattern)
else:
conditions.append(f"{prefix}{c.field} = ?")
params.append(c.value)

return " AND ".join(conditions), params

Valid fields per query type (for validation)

VALID_FIELDS = { 'messages': [ 'role', 'content', 'session_id', 'model_name', 'timestamp', 'token_count', 'cost_usd', 'message_type', 'source_llm', ], 'decisions': [ 'decision_type', 'description', 'context', 'outcome', 'status', 'confidence', 'session_id', 'track', 'task_id', 'created_at', ], 'errors': [ 'error_type', 'error_message', 'solution', 'context', 'file_path', 'language', 'success_rate', 'session_id', 'created_at', ], 'tokens': [ 'model_name', 'session_id', 'input_tokens', 'output_tokens', 'total_tokens', 'cost_input_usd', 'cost_output_usd', 'cost_total_usd', 'timestamp', ], 'tools': [ 'tool_name', 'success', 'duration_ms', 'session_id', 'error_message', 'input_size', 'output_size', 'timestamp', ], 'components': [ 'name', 'type', 'description', 'status', 'model', 'track', 'version', 'path', 'created_at', 'updated_at', ], 'skills': [ 'skill_type', 'pattern_type', 'context', 'improvement', 'before_behavior', 'after_behavior', 'session_id', 'created_at', ], }

def parse_field_query(field_spec: str) -> FieldQuery: """ Parse a field specification string into a FieldQuery.

Args:
field_spec: Field specification like "role:assistant,content:*error*"

Returns:
FieldQuery with parsed constraints

Raises:
ValueError: If field specification is invalid
"""
if not field_spec or not field_spec.strip():
return FieldQuery(constraints=[], raw=field_spec)

constraints = []

# Split by comma, but handle escaped commas
parts = _split_respecting_quotes(field_spec, ',')

for part in parts:
part = part.strip()
if not part:
continue

# Parse field:value
if ':' not in part:
raise ValueError(f"Invalid field specification: '{part}'. Expected 'field:value' format.")

# Split on first colon only (value might contain colons)
colon_idx = part.index(':')
field = part[:colon_idx].strip()
value = part[colon_idx + 1:].strip()

if not field:
raise ValueError(f"Empty field name in: '{part}'")

# Remove quotes from value if present
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
elif value.startswith("'") and value.endswith("'"):
value = value[1:-1]

# Determine operator based on wildcards
constraint = _parse_value_pattern(field, value)
constraints.append(constraint)

return FieldQuery(constraints=constraints, raw=field_spec)

def _parse_value_pattern(field: str, value: str) -> FieldConstraint: """ Parse a value and determine if it's a LIKE pattern or exact match.

Wildcards:
* at start = prefix wildcard (LIKE '%value')
* at end = suffix wildcard (LIKE 'value%')
* at both = contains (LIKE '%value%')
"""
if not value:
return FieldConstraint(field=field, value='', operator='=')

starts_wild = value.startswith('*')
ends_wild = value.endswith('*')

if starts_wild or ends_wild:
# Remove wildcards and build LIKE pattern
clean_value = value.strip('*')

if starts_wild and ends_wild:
like_pattern = f'%{clean_value}%'
elif starts_wild:
like_pattern = f'%{clean_value}'
else: # ends_wild
like_pattern = f'{clean_value}%'

return FieldConstraint(
field=field,
value=clean_value,
operator='LIKE',
like_pattern=like_pattern
)
else:
return FieldConstraint(field=field, value=value, operator='=')

def _split_respecting_quotes(s: str, delimiter: str) -> List[str]: """Split string by delimiter, respecting quoted sections.""" result = [] current = [] in_quote = None

for char in s:
if char in ('"', "'") and in_quote is None:
in_quote = char
current.append(char)
elif char == in_quote:
in_quote = None
current.append(char)
elif char == delimiter and in_quote is None:
result.append(''.join(current))
current = []
else:
current.append(char)

if current:
result.append(''.join(current))

return result

def validate_fields(query: FieldQuery, query_type: str) -> List[str]: """ Validate that all fields in the query are valid for the query type.

Args:
query: Parsed field query
query_type: Type of query ('messages', 'decisions', etc.)

Returns:
List of invalid field names (empty if all valid)
"""
valid = VALID_FIELDS.get(query_type, [])
if not valid:
# Unknown query type, allow all fields
return []

invalid = []
for constraint in query.constraints:
if constraint.field not in valid:
invalid.append(constraint.field)

return invalid

def get_valid_fields(query_type: str) -> List[str]: """Get list of valid fields for a query type.""" return VALID_FIELDS.get(query_type, [])

def format_field_help(query_type: str = None) -> str: """ Format help text showing valid fields.

Args:
query_type: Specific query type, or None for all

Returns:
Formatted help string
"""
lines = ["Field-Specific Search (--field / -F)", "=" * 40, ""]
lines.append("Syntax: field:value[,field2:value2,...]")
lines.append("")
lines.append("Wildcards:")
lines.append(" *value - Ends with value")
lines.append(" value* - Starts with value")
lines.append(" *value* - Contains value")
lines.append("")

if query_type and query_type in VALID_FIELDS:
lines.append(f"Valid fields for --{query_type}:")
for field in sorted(VALID_FIELDS[query_type]):
lines.append(f" {field}")
else:
lines.append("Valid fields by query type:")
for qtype, fields in sorted(VALID_FIELDS.items()):
lines.append(f"\n --{qtype}:")
for field in sorted(fields):
lines.append(f" {field}")

lines.append("")
lines.append("Examples:")
lines.append(' /cxq --recent --field "role:assistant"')
lines.append(' /cxq --decisions --field "decision_type:architecture"')
lines.append(' /cxq --errors --field "error_type:*Error,language:python"')
lines.append(' /cxq --tokens --field "model_name:*opus*"')

return "\n".join(lines)

Test cases for validation

if name == "main": test_cases = [ "role:assistant", "role:assistant,content:error", "tool_name:Bash,success:true", "model_name:opus", 'field:"value with spaces"', "empty:", ]

print("Field Parser Test Cases")
print("=" * 50)

for spec in test_cases:
print(f"\nInput: {spec}")
try:
query = parse_field_query(spec)
print(f" Constraints: {len(query.constraints)}")
for c in query.constraints:
print(f" {c.field} {c.operator} {c.value!r}", end="")
if c.like_pattern:
print(f" (LIKE: {c.like_pattern!r})", end="")
print()

sql, params = query.to_sql_conditions()
print(f" SQL: {sql}")
print(f" Params: {params}")
except ValueError as e:
print(f" Error: {e}")