#!/usr/bin/env python3 """ Context Health Analyzer - CODITECT Adapter
Wraps the degradation detection utilities from Agent-Skills-for-Context-Engineering for CODITECT framework integration.
Usage: python3 scripts/context-engineering/context_health_analyzer.py --analyze "context text" python3 scripts/context-engineering/context_health_analyzer.py --file path/to/context.txt python3 scripts/context-engineering/context_health_analyzer.py --session SESSION_ID
Source: external/Agent-Skills-for-Context-Engineering/skills/context-degradation/scripts/ """
import sys import json import argparse from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional
Add external module to path
EXTERNAL_PATH = Path(file).parent.parent.parent / "external" / "Agent-Skills-for-Context-Engineering" sys.path.insert(0, str(EXTERNAL_PATH / "skills" / "context-degradation" / "scripts"))
try: from degradation_detector import ( ContextHealthAnalyzer, PoisoningDetector, measure_attention_distribution, detect_lost_in_middle, analyze_context_structure ) EXTERNAL_AVAILABLE = True except ImportError: EXTERNAL_AVAILABLE = False print("Warning: External module not available. Using fallback implementation.", file=sys.stderr)
class CoditechContextHealthAnalyzer: """CODITECT-integrated context health analyzer."""
def __init__(self, context_limit: int = 100000):
self.context_limit = context_limit
self.results_dir = Path(__file__).parent.parent.parent / "context-storage" / "health-reports"
self.results_dir.mkdir(parents=True, exist_ok=True)
if EXTERNAL_AVAILABLE:
self._analyzer = ContextHealthAnalyzer(context_limit)
self._poisoning_detector = PoisoningDetector()
else:
self._analyzer = None
self._poisoning_detector = None
def analyze(self, context: str, critical_positions: List[int] = None,
session_id: str = None) -> Dict:
"""
Perform comprehensive context health analysis.
Args:
context: The context text to analyze
critical_positions: Positions of critical information
session_id: Optional session ID for report naming
Returns:
Health analysis results with score, metrics, and recommendations
"""
if EXTERNAL_AVAILABLE:
result = self._analyzer.analyze(context, critical_positions)
else:
result = self._fallback_analyze(context, critical_positions)
# Enhance with CODITECT metadata
result["analysis_time"] = datetime.now(timezone.utc).isoformat()
result["context_limit"] = self.context_limit
result["source"] = "coditect-context-health-analyzer"
# Save report if session_id provided
if session_id:
result["session_id"] = session_id
self._save_report(result, session_id)
return result
def detect_degradation_patterns(self, context: str) -> Dict:
"""
Detect specific degradation patterns in context.
Returns patterns found with severity and recommendations.
"""
patterns = {
"lost_in_middle": self._check_lost_in_middle(context),
"poisoning": self._check_poisoning(context),
"distraction": self._check_distraction(context),
"confusion": self._check_confusion(context),
"clash": self._check_clash(context)
}
# Calculate overall degradation risk
severe_count = sum(1 for p in patterns.values() if p.get("severity") == "high")
warning_count = sum(1 for p in patterns.values() if p.get("severity") == "medium")
patterns["overall_risk"] = (
"critical" if severe_count > 2 else
"high" if severe_count > 0 else
"medium" if warning_count > 1 else
"low"
)
return patterns
def _check_lost_in_middle(self, context: str) -> Dict:
"""Check for lost-in-middle degradation."""
structure = analyze_context_structure(context) if EXTERNAL_AVAILABLE else {}
middle_ratio = structure.get("middle_content_ratio", 0)
return {
"detected": middle_ratio > 0.5,
"severity": "high" if middle_ratio > 0.7 else "medium" if middle_ratio > 0.5 else "low",
"middle_content_ratio": middle_ratio,
"recommendation": "Move critical content to beginning/end of context"
}
def _check_poisoning(self, context: str) -> Dict:
"""Check for context poisoning indicators."""
if self._poisoning_detector:
result = self._poisoning_detector.detect_poisoning(context)
return {
"detected": result["poisoning_risk"],
"severity": result["overall_risk"],
"indicators": result.get("indicators", []),
"recommendation": "Review and remove erroneous information"
}
return {"detected": False, "severity": "low"}
def _check_distraction(self, context: str) -> Dict:
"""Check for distracting irrelevant content."""
# Simple heuristic: look for off-topic markers
distraction_markers = ["unrelated", "tangent", "aside", "btw", "by the way"]
found = sum(1 for marker in distraction_markers if marker in context.lower())
return {
"detected": found > 2,
"severity": "medium" if found > 2 else "low",
"marker_count": found,
"recommendation": "Filter irrelevant content before injection"
}
def _check_confusion(self, context: str) -> Dict:
"""Check for contradictory information."""
# Look for contradiction indicators
contradiction_markers = [
("yes", "no"), ("true", "false"), ("always", "never"),
("should", "should not"), ("must", "must not")
]
conflicts = []
for pos, neg in contradiction_markers:
if pos in context.lower() and neg in context.lower():
conflicts.append(f"{pos}/{neg}")
return {
"detected": len(conflicts) > 2,
"severity": "high" if len(conflicts) > 3 else "medium" if len(conflicts) > 1 else "low",
"conflicts": conflicts[:5],
"recommendation": "Resolve contradictory statements in context"
}
def _check_clash(self, context: str) -> Dict:
"""Check for semantic conflicts between context elements."""
# Simple check for common clash patterns
clash_patterns = [
("old version", "new version"),
("deprecated", "recommended"),
("v1", "v2"),
]
clashes = []
for old, new in clash_patterns:
if old in context.lower() and new in context.lower():
clashes.append(f"{old} vs {new}")
return {
"detected": len(clashes) > 0,
"severity": "medium" if len(clashes) > 0 else "low",
"clashes": clashes,
"recommendation": "Ensure version consistency in context"
}
def _fallback_analyze(self, context: str, critical_positions: List[int] = None) -> Dict:
"""Fallback analysis when external module unavailable."""
tokens = context.split()
token_count = len(tokens)
utilization = token_count / self.context_limit
return {
"health_score": max(0.0, 1.0 - utilization * 0.5),
"status": "healthy" if utilization < 0.7 else "warning" if utilization < 0.85 else "degraded",
"metrics": {
"token_count": token_count,
"utilization": utilization
},
"recommendations": [
"Install external module for full analysis capabilities"
]
}
def _save_report(self, result: Dict, session_id: str):
"""Save health report to file."""
report_path = self.results_dir / f"{session_id}.json"
with open(report_path, 'w') as f:
json.dump(result, f, indent=2)
print(f"Report saved: {report_path}", file=sys.stderr)
def main(): parser = argparse.ArgumentParser( description="CODITECT Context Health Analyzer", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 context_health_analyzer.py --analyze "Your context text here" python3 context_health_analyzer.py --file path/to/context.txt python3 context_health_analyzer.py --session SESSION_ID --file context.txt """ )
parser.add_argument("--analyze", "-a", help="Context text to analyze")
parser.add_argument("--file", "-f", help="File containing context to analyze")
parser.add_argument("--session", "-s", help="Session ID for report naming")
parser.add_argument("--limit", "-l", type=int, default=100000, help="Context token limit")
parser.add_argument("--json", "-j", action="store_true", help="Output JSON format")
parser.add_argument("--patterns", "-p", action="store_true", help="Show degradation patterns only")
args = parser.parse_args()
# Get context
context = None
if args.analyze:
context = args.analyze
elif args.file:
with open(args.file, 'r') as f:
context = f.read()
else:
# Read from stdin
if not sys.stdin.isatty():
context = sys.stdin.read()
else:
parser.print_help()
sys.exit(1)
# Analyze
analyzer = CoditechContextHealthAnalyzer(context_limit=args.limit)
if args.patterns:
result = analyzer.detect_degradation_patterns(context)
else:
result = analyzer.analyze(context, session_id=args.session)
# Output
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"\n{'='*60}")
print("CODITECT Context Health Analysis")
print(f"{'='*60}")
if "health_score" in result:
print(f"\nHealth Score: {result['health_score']:.2f}")
print(f"Status: {result['status']}")
if "metrics" in result:
print(f"\nMetrics:")
for key, value in result["metrics"].items():
print(f" {key}: {value}")
if "overall_risk" in result:
print(f"\nOverall Degradation Risk: {result['overall_risk']}")
if "recommendations" in result:
print(f"\nRecommendations:")
for rec in result["recommendations"]:
print(f" - {rec}")
print(f"\n{'='*60}\n")
if name == "main": main()