#!/usr/bin/env python3 """ Compression Evaluator - CODITECT Adapter
Evaluates context compression quality using probe-based testing from Agent-Skills-for-Context-Engineering.
Usage: python3 scripts/context-engineering/compression_evaluator.py --original orig.txt --compressed comp.txt python3 scripts/context-engineering/compression_evaluator.py --probes probes.json --context context.txt
Source: external/Agent-Skills-for-Context-Engineering/skills/context-compression/scripts/ """
import sys import json import argparse from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Tuple
Add external module to path
EXTERNAL_PATH = Path(file).parent.parent.parent / "external" / "Agent-Skills-for-Context-Engineering" sys.path.insert(0, str(EXTERNAL_PATH / "skills" / "context-compression" / "scripts"))
try: from compression_evaluator import ( ProbeEvaluator, CompressionQualityMetrics, generate_probes, evaluate_compression ) EXTERNAL_AVAILABLE = True except ImportError: EXTERNAL_AVAILABLE = False
class CoditechCompressionEvaluator: """CODITECT-integrated compression quality evaluator."""
def __init__(self):
# Probe-based evaluation dimensions from skill
self.evaluation_dimensions = [
"key_facts", # Are key facts preserved?
"relationships", # Are entity relationships intact?
"instructions", # Are instructions/commands preserved?
"constraints", # Are constraints/requirements present?
"context", # Is relevant context maintained?
"coherence" # Is the summary coherent and readable?
]
# Quality thresholds
self.thresholds = {
"excellent": 0.95,
"good": 0.85,
"acceptable": 0.75,
"poor": 0.60
}
def evaluate(self, original: str, compressed: str,
probes: List[Dict] = None) -> Dict:
"""
Evaluate compression quality.
Args:
original: Original context text
compressed: Compressed/summarized text
probes: Optional list of probe questions
Returns:
Evaluation results with quality score and dimension breakdown
"""
# Generate probes if not provided
if not probes:
probes = self._generate_probes(original)
# Evaluate each dimension
dimension_scores = {}
probe_results = []
for probe in probes:
result = self._evaluate_probe(original, compressed, probe)
probe_results.append(result)
dimension = probe.get("dimension", "general")
if dimension not in dimension_scores:
dimension_scores[dimension] = []
dimension_scores[dimension].append(result["score"])
# Calculate dimension averages
dimension_averages = {
dim: sum(scores) / len(scores)
for dim, scores in dimension_scores.items()
}
# Calculate overall score
overall_score = sum(dimension_averages.values()) / len(dimension_averages) if dimension_averages else 0
# Calculate compression ratio
original_tokens = len(original) // 4
compressed_tokens = len(compressed) // 4
compression_ratio = 1 - (compressed_tokens / original_tokens) if original_tokens > 0 else 0
return {
"overall_score": overall_score,
"quality_rating": self._get_quality_rating(overall_score),
"compression_ratio": compression_ratio,
"tokens_per_task": self._calculate_tokens_per_task(original_tokens, compressed_tokens, overall_score),
"dimension_scores": dimension_averages,
"probe_count": len(probes),
"probe_results": probe_results,
"recommendations": self._generate_recommendations(dimension_averages, compression_ratio),
"timestamp": datetime.now(timezone.utc).isoformat()
}
def generate_probes(self, context: str, dimension: str = None) -> List[Dict]:
"""
Generate probe questions for context evaluation.
Args:
context: The context to generate probes for
dimension: Optional specific dimension to focus on
Returns:
List of probe dictionaries
"""
return self._generate_probes(context, dimension)
def evaluate_anchored_summary(self, document: str, summary: str,
anchor_phrases: List[str]) -> Dict:
"""
Evaluate anchored iterative summarization.
Checks that key anchor phrases are preserved in summary.
"""
preserved = []
lost = []
for anchor in anchor_phrases:
if anchor.lower() in summary.lower():
preserved.append(anchor)
else:
lost.append(anchor)
preservation_rate = len(preserved) / len(anchor_phrases) if anchor_phrases else 1.0
return {
"preservation_rate": preservation_rate,
"preserved_anchors": preserved,
"lost_anchors": lost,
"quality": "excellent" if preservation_rate > 0.95 else
"good" if preservation_rate > 0.85 else
"acceptable" if preservation_rate > 0.75 else "poor",
"recommendations": [
f"Lost anchor: '{a}' - consider preserving in next iteration"
for a in lost[:5]
]
}
def _generate_probes(self, context: str, dimension: str = None) -> List[Dict]:
"""Generate probe questions based on context content."""
probes = []
# Extract potential probe targets
lines = context.split('\n')
sentences = [s.strip() for s in context.split('.') if len(s.strip()) > 20]
# Key facts probes
if dimension in (None, "key_facts"):
for i, sentence in enumerate(sentences[:5]):
# Generate simple factual probe
probes.append({
"dimension": "key_facts",
"question": f"Is this information present: '{sentence[:100]}...'?",
"expected_content": sentence[:100],
"type": "presence"
})
# Relationships probes
if dimension in (None, "relationships"):
# Look for entity mentions
import re
entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', context)
unique_entities = list(set(entities))[:5]
for entity in unique_entities:
probes.append({
"dimension": "relationships",
"question": f"Is '{entity}' mentioned with context?",
"expected_content": entity,
"type": "entity_context"
})
# Instructions probes
if dimension in (None, "instructions"):
instruction_markers = ["must", "should", "always", "never", "required"]
for marker in instruction_markers:
if marker in context.lower():
# Find the instruction
for sentence in sentences:
if marker in sentence.lower():
probes.append({
"dimension": "instructions",
"question": f"Is this instruction preserved: '{sentence[:80]}'?",
"expected_content": sentence[:80],
"type": "instruction"
})
break
# Coherence probes
if dimension in (None, "coherence"):
probes.append({
"dimension": "coherence",
"question": "Is the summary grammatically coherent?",
"expected_content": None,
"type": "coherence_check"
})
return probes
def _evaluate_probe(self, original: str, compressed: str, probe: Dict) -> Dict:
"""Evaluate a single probe against compressed content."""
probe_type = probe.get("type", "presence")
expected = probe.get("expected_content", "")
dimension = probe.get("dimension", "general")
if probe_type == "presence":
# Check if expected content is present
if expected and expected.lower() in compressed.lower():
score = 1.0
status = "preserved"
elif expected and any(word in compressed.lower() for word in expected.lower().split()[:3]):
score = 0.7
status = "partial"
else:
score = 0.0
status = "lost"
elif probe_type == "entity_context":
# Check if entity is mentioned with meaningful context
if expected and expected in compressed:
# Check for context around entity
idx = compressed.find(expected)
context_window = compressed[max(0, idx-50):idx+len(expected)+50]
if len(context_window) > len(expected) + 20:
score = 1.0
status = "preserved_with_context"
else:
score = 0.5
status = "mentioned_without_context"
else:
score = 0.0
status = "missing"
elif probe_type == "instruction":
# Check if instruction is preserved
if expected and expected.lower() in compressed.lower():
score = 1.0
status = "preserved"
elif expected:
# Check for key action words
action_words = [w for w in expected.lower().split() if len(w) > 4][:3]
if all(w in compressed.lower() for w in action_words):
score = 0.8
status = "paraphrased"
else:
score = 0.3
status = "weakly_preserved"
else:
score = 0.0
status = "lost"
elif probe_type == "coherence_check":
# Simple coherence check
# Check sentence structure
sentences = [s.strip() for s in compressed.split('.') if s.strip()]
if len(sentences) > 0:
avg_length = sum(len(s) for s in sentences) / len(sentences)
if 20 < avg_length < 200:
score = 1.0
status = "coherent"
elif 10 < avg_length < 300:
score = 0.7
status = "acceptable"
else:
score = 0.4
status = "issues"
else:
score = 0.5
status = "unable_to_evaluate"
else:
score = 0.5
status = "unknown_probe_type"
return {
"dimension": dimension,
"question": probe.get("question", ""),
"score": score,
"status": status,
"type": probe_type
}
def _get_quality_rating(self, score: float) -> str:
"""Convert score to quality rating."""
if score >= self.thresholds["excellent"]:
return "excellent"
elif score >= self.thresholds["good"]:
return "good"
elif score >= self.thresholds["acceptable"]:
return "acceptable"
elif score >= self.thresholds["poor"]:
return "poor"
else:
return "unacceptable"
def _calculate_tokens_per_task(self, original_tokens: int, compressed_tokens: int,
quality: float) -> Dict:
"""
Calculate tokens-per-task efficiency metric.
Lower is better, but must maintain quality.
"""
if quality <= 0:
return {"efficiency": 0, "rating": "quality_too_low"}
# Efficiency = quality / tokens
efficiency = quality / compressed_tokens if compressed_tokens > 0 else 0
# Compare to original
original_efficiency = quality / original_tokens if original_tokens > 0 else 0
improvement = (efficiency - original_efficiency) / original_efficiency if original_efficiency > 0 else 0
return {
"compressed_tokens": compressed_tokens,
"efficiency": efficiency,
"improvement": improvement,
"rating": "improved" if improvement > 0.1 else "neutral" if improvement > -0.1 else "degraded"
}
def _generate_recommendations(self, dimension_scores: Dict, compression_ratio: float) -> List[str]:
"""Generate recommendations based on evaluation results."""
recommendations = []
# Check weak dimensions
for dim, score in dimension_scores.items():
if score < 0.75:
if dim == "key_facts":
recommendations.append("Preserve more key factual information in summaries")
elif dim == "relationships":
recommendations.append("Maintain entity relationships when compressing")
elif dim == "instructions":
recommendations.append("Instructions are being lost - preserve action items")
elif dim == "coherence":
recommendations.append("Summary coherence needs improvement")
# Check compression ratio
if compression_ratio < 0.3:
recommendations.append("Compression is minimal - consider more aggressive summarization")
elif compression_ratio > 0.8:
recommendations.append("Very high compression - verify quality is maintained")
if not recommendations:
recommendations.append("Compression quality is good - no specific recommendations")
return recommendations
def main(): parser = argparse.ArgumentParser( description="CODITECT Compression Evaluator", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 compression_evaluator.py --original orig.txt --compressed comp.txt python3 compression_evaluator.py --probes probes.json --context context.txt python3 compression_evaluator.py --generate-probes --context context.txt """ )
parser.add_argument("--original", "-o", help="Original context file")
parser.add_argument("--compressed", "-c", help="Compressed context file")
parser.add_argument("--context", help="Context file for probe generation")
parser.add_argument("--probes", "-p", help="JSON file with probe questions")
parser.add_argument("--generate-probes", "-g", action="store_true", help="Generate probes only")
parser.add_argument("--dimension", "-d", help="Specific dimension to focus on")
parser.add_argument("--json", "-j", action="store_true", help="Output JSON format")
args = parser.parse_args()
evaluator = CoditechCompressionEvaluator()
# Generate probes mode
if args.generate_probes and args.context:
with open(args.context, 'r') as f:
context = f.read()
probes = evaluator.generate_probes(context, args.dimension)
print(json.dumps(probes, indent=2))
sys.exit(0)
# Evaluation mode
if args.original and args.compressed:
with open(args.original, 'r') as f:
original = f.read()
with open(args.compressed, 'r') as f:
compressed = f.read()
probes = None
if args.probes:
with open(args.probes, 'r') as f:
probes = json.load(f)
result = evaluator.evaluate(original, compressed, probes)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"\n{'='*60}")
print("CODITECT Compression Evaluation")
print(f"{'='*60}")
print(f"\nOverall Score: {result['overall_score']:.2f}")
print(f"Quality Rating: {result['quality_rating']}")
print(f"Compression Ratio: {result['compression_ratio']*100:.1f}%")
print(f"\nDimension Scores:")
for dim, score in result['dimension_scores'].items():
print(f" {dim}: {score:.2f}")
print(f"\nTokens-per-Task:")
tpt = result['tokens_per_task']
print(f" Compressed Tokens: {tpt['compressed_tokens']}")
print(f" Efficiency Rating: {tpt['rating']}")
print(f"\nRecommendations:")
for rec in result['recommendations']:
print(f" - {rec}")
print(f"\n{'='*60}\n")
else:
parser.print_help()
sys.exit(1)
if name == "main": main()