#!/usr/bin/env python3 """ MoE Document Classification CLI v3.0
Command-line interface for classifying documents using the Mixture of Experts classification system with 13 Type Experts.
Version History:
- v2.1: 13 Type Experts, autonomous mode, expert mode
- v3.0: Enhanced frontmatter, threshold, type override, respect-directory
Usage: # Classify a single file python classify.py path/to/file.md
# Classify a directory
python classify.py path/to/docs/ --recursive
# Batch classify with output
python classify.py docs/ -r --output results.json --format json
# Update frontmatter with classifications
python classify.py docs/ -r --update-frontmatter
# Dry run (show what would happen)
python classify.py docs/ -r --dry-run
# V3: ENHANCE FRONTMATTER MODE
# Add explicit type fields to boost low-confidence files
python classify.py docs/ -r --enhance-frontmatter
# V3: With custom threshold (default 95%)
python classify.py docs/ -r --enhance-frontmatter --threshold 90
# V3: Dry run to preview enhancements
python classify.py docs/ -r --enhance-frontmatter --dry-run
# V3: TYPE OVERRIDE MODE
# Force specific type on all files in a directory
python classify.py docs/workflows/ -r --enhance-frontmatter --type-override workflow
# V3: Correct misclassified WF-* files
python classify.py docs/workflows/WF-*.md --enhance-frontmatter --type-override workflow
# V3: RESPECT DIRECTORY MODE
# Use directory path as classification hint
python classify.py docs/ -r --enhance-frontmatter --respect-directory
# V3: Dry run to see where directory hints would be applied
python classify.py docs/ -r --enhance-frontmatter --respect-directory --dry-run --verbose
# V3: SUGGEST ENHANCEMENTS MODE
# Analyze files and suggest specific content additions
python classify.py docs/ -r --suggest-enhancements
# V3: Verbose to see content previews
python classify.py docs/ -r --suggest-enhancements --verbose
# V3: JSON OUTPUT MODE (CI/CD integration)
# Get structured JSON output for any mode
python classify.py docs/ -r --enhance-frontmatter --dry-run --json
python classify.py docs/ -r --suggest-enhancements --json
python classify.py docs/ -r --json # Standard classification
"""
import argparse import json import csv import sys import time from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Optional, Tuple from dataclasses import dataclass, asdict import logging
Add module path
sys.path.insert(0, str(Path(file).parent))
from core.models import Document, ClassificationResult, ApprovalType from core.orchestrator import create_default_orchestrator, MoEOrchestrator from core.enhanced_orchestrator import get_enhanced_orchestrator, EnhancedOrchestratorConfig
Import track registry for bi-lateral TRACK ↔ SKILL mapping
try: from track_registry import update_track_mappings, TrackRegistry TRACK_REGISTRY_AVAILABLE = True except ImportError: TRACK_REGISTRY_AVAILABLE = False logger.warning("Track registry not available - track mapping disabled")
Set up logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)
def check_embeddings_available() -> bool: """Check if sentence-transformers is available for embeddings.""" try: from sentence_transformers import SentenceTransformer return True except ImportError: return False
def create_orchestrator(use_embeddings: bool = False): """ Create the appropriate orchestrator based on configuration.
Args:
use_embeddings: If True, uses EnhancedMoEOrchestrator with semantic embeddings.
If False, uses standard MoEOrchestrator (faster).
Returns:
Configured orchestrator instance
Note:
Embeddings require sentence-transformers which must be installed in the venv:
source .venv/bin/activate && pip install sentence-transformers
"""
if use_embeddings:
# Check if embeddings are actually available
if not check_embeddings_available():
print("\n" + "="*70)
print("⚠️ EMBEDDINGS UNAVAILABLE - sentence-transformers not installed")
print("="*70)
print("To enable semantic embeddings, activate the virtual environment:")
print()
print(" source .venv/bin/activate")
print(" pip install sentence-transformers")
print()
print("Then re-run with --use-embeddings")
print("="*70)
logger.warning("Falling back to standard classification (no embeddings)...")
return create_default_orchestrator()
config = EnhancedOrchestratorConfig(
use_embeddings=True,
use_learning=True,
use_memory=True,
use_adaptive_thresholds=True,
use_calibration=True
)
logger.info("Initializing enhanced orchestrator with embeddings...")
orchestrator = get_enhanced_orchestrator(config)
# Verify embeddings are actually working
if orchestrator.embedding_service and orchestrator.embedding_service.is_available():
stats = orchestrator.embedding_service.get_stats()
logger.info(f"Embeddings active: model={stats.get('model', 'unknown')}, "
f"device={stats.get('device', 'unknown')}")
else:
logger.warning("Embeddings service initialized but not available - using fallback mode")
return orchestrator
else:
logger.info("Initializing standard MoE classifier...")
return create_default_orchestrator()
@dataclass class BatchResult: """Result of batch classification.""" total_files: int processed: int successful: int failed: int auto_approved: int judge_approved: int escalated: int total_time_ms: int results: List[Dict]
class ClassificationCLI: """Command-line interface for MoE classification."""
SUPPORTED_EXTENSIONS = {'.md', '.markdown', '.yaml', '.yml'}
def __init__(self, args: argparse.Namespace):
self.args = args
self.orchestrator: Optional[MoEOrchestrator] = None
self.results: List[ClassificationResult] = []
self.skills_dir: Optional[Path] = None
# Determine project root and skills directory
self._detect_project_paths()
def _detect_project_paths(self):
"""Detect project root and skills directory."""
# Start from current directory or path argument
start_path = Path(self.args.path) if hasattr(self.args, 'path') else Path.cwd()
# Walk up to find project root (directory containing skills/ folder)
current = start_path if start_path.is_dir() else start_path.parent
for _ in range(5): # Check up to 5 parent directories
skills_candidate = current / "skills"
if skills_candidate.exists() and skills_candidate.is_dir():
self.skills_dir = skills_candidate
self.project_root = current
logger.debug(f"Detected project root: {current}")
logger.debug(f"Detected skills dir: {skills_candidate}")
return
if current.parent == current: # Reached filesystem root
break
current = current.parent
# Fallback to current directory
self.project_root = Path.cwd()
self.skills_dir = self.project_root / "skills"
def run(self) -> int:
"""Execute the classification based on CLI arguments."""
try:
# Initialize orchestrator (with or without embeddings)
use_embeddings = getattr(self.args, 'use_embeddings', False)
self.orchestrator = create_orchestrator(use_embeddings=use_embeddings)
# Collect files to process
files = self._collect_files()
if not files:
logger.warning("No files found to classify")
return 0
logger.info(f"Found {len(files)} files to classify")
# Process files
if self.args.dry_run:
return self._dry_run(files)
else:
return self._process_files(files)
except KeyboardInterrupt:
logger.info("\nClassification interrupted by user")
return 130
except Exception as e:
logger.error(f"Classification failed: {e}")
if self.args.verbose:
import traceback
traceback.print_exc()
return 1
def _collect_files(self) -> List[Path]:
"""Collect files to classify based on arguments."""
path = Path(self.args.path)
files = []
if path.is_file():
if path.suffix.lower() in self.SUPPORTED_EXTENSIONS:
files.append(path)
else:
logger.warning(f"Unsupported file type: {path.suffix}")
elif path.is_dir():
pattern = '**/*' if self.args.recursive else '*'
for ext in self.SUPPORTED_EXTENSIONS:
files.extend(path.glob(f"{pattern}{ext}"))
else:
raise FileNotFoundError(f"Path not found: {path}")
# Apply filters
if self.args.exclude:
exclude_patterns = self.args.exclude.split(',')
files = [
f for f in files
if not any(p.strip() in str(f) for p in exclude_patterns)
]
# Sort for consistent ordering
files.sort()
# Apply limit
if self.args.limit and self.args.limit > 0:
files = files[:self.args.limit]
return files
def _dry_run(self, files: List[Path]) -> int:
"""Show what would be classified without processing."""
print(f"\nDry Run: Would classify {len(files)} files\n")
for i, f in enumerate(files[:20], 1): # Show first 20
print(f" {i:3}. {f}")
if len(files) > 20:
print(f" ... and {len(files) - 20} more files")
print(f"\nTotal: {len(files)} files")
return 0
def _process_files(self, files: List[Path]) -> int:
"""Process and classify files."""
start_time = time.time()
results = []
errors = []
# Progress tracking
total = len(files)
width = 50
for i, file_path in enumerate(files, 1):
try:
# Load document
doc = Document.from_path(file_path)
# Classify
result = self.orchestrator.classify(doc)
results.append(result)
# Update frontmatter if requested
if self.args.update_frontmatter and result.result.classification:
self._update_frontmatter(file_path, result)
# Show progress
if not self.args.quiet:
self._show_progress(i, total, file_path, result)
except Exception as e:
errors.append((file_path, str(e)))
if self.args.verbose:
logger.error(f"Error processing {file_path}: {e}")
# Show final results
elapsed = time.time() - start_time
self._show_summary(results, errors, elapsed)
# Save output if requested
if self.args.output:
self._save_output(results, errors)
# Update bi-lateral TRACK ↔ SKILL mappings (default behavior)
if TRACK_REGISTRY_AVAILABLE and not getattr(self.args, 'skip_track_mappings', False):
self._update_track_mappings()
return 0 if not errors else 1
def _update_track_mappings(self):
"""Update bi-lateral TRACK ↔ SKILL mappings."""
try:
logger.info("Updating bi-lateral TRACK ↔ SKILL mappings...")
# Use detected project root
project_root = getattr(self, 'project_root', Path.cwd())
registry = TrackRegistry(project_root=project_root)
# Discover all track files
registry.discover_all_track_files()
# Scan skills and build mappings
registry.scan_skills()
registry.build_track_mappings()
# Update track files (including empty tracks)
success, failed, empty = registry.update_all_tracks(dry_run=self.args.dry_run, include_empty=True)
# Save index (unless dry run)
if not self.args.dry_run:
index_path = registry.save_track_skills_index(dry_run=False)
logger.info(f"Track-skills index saved: {index_path}")
# Show summary
total_tracks = len(registry._all_track_files)
mapped_tracks = len(registry._track_to_skills)
total_skills = len(registry._skill_to_track)
print(f"\n📊 Track Mappings Updated:")
print(f" Total tracks: {total_tracks}")
print(f" Tracks with skills: {mapped_tracks}")
print(f" Empty tracks: {empty}")
print(f" Total skills: {total_skills}")
print(f" Track files updated: {success + empty}")
if failed > 0:
print(f" Failed: {failed}")
# Validate consistency
errors = registry.validate_consistency()
if errors:
print(f" ⚠️ Validation issues: {len(errors)}")
for error in errors[:3]: # Show first 3
print(f" - {error}")
else:
print(f" ✅ Mappings consistent")
except Exception as e:
logger.warning(f"Track mapping update failed: {e}")
if self.args.verbose:
import traceback
traceback.print_exc()
def _show_progress(
self,
current: int,
total: int,
file_path: Path,
result: ClassificationResult
):
"""Show progress bar and current file."""
pct = current / total
filled = int(50 * pct)
bar = '█' * filled + '░' * (50 - filled)
status = result.result.approval_type.value[:4]
classification = result.result.classification or 'unknown'
confidence = result.result.confidence
# Truncate filename for display
fname = file_path.name
if len(fname) > 30:
fname = fname[:27] + '...'
print(f"\r[{bar}] {current}/{total} | {fname:30} → {classification:10} ({confidence:.0%}) [{status}]", end='')
if current == total:
print() # New line at end
def _show_summary(
self,
results: List[ClassificationResult],
errors: List[Tuple[Path, str]],
elapsed: float
):
"""Show classification summary."""
print("\n" + "="*70)
print("Classification Summary")
print("="*70)
# Count by status
auto = sum(1 for r in results if r.result.approval_type == ApprovalType.AUTO_APPROVED)
judge = sum(1 for r in results if r.result.approval_type == ApprovalType.JUDGE_APPROVED)
escalated = sum(1 for r in results if r.result.approval_type == ApprovalType.ESCALATED)
print(f"\nProcessed: {len(results)} files in {elapsed:.1f}s ({len(results)/elapsed:.1f} files/sec)")
print(f"Errors: {len(errors)}")
print(f"\nApproval Status:")
print(f" Auto-approved: {auto:5} ({auto/len(results)*100:.1f}%)" if results else "")
print(f" Judge-approved: {judge:5} ({judge/len(results)*100:.1f}%)" if results else "")
print(f" Escalated: {escalated:5} ({escalated/len(results)*100:.1f}%)" if results else "")
# Count by classification
print(f"\nClassifications:")
classifications = {}
for r in results:
cls = r.result.classification or 'unknown'
classifications[cls] = classifications.get(cls, 0) + 1
for cls, count in sorted(classifications.items(), key=lambda x: -x[1]):
print(f" {cls:15}: {count:5} ({count/len(results)*100:.1f}%)")
# Show escalated files if any
if escalated > 0 and self.args.verbose:
print(f"\nEscalated Files (need manual review):")
for r in results:
if r.result.approval_type == ApprovalType.ESCALATED:
print(f" - {r.document_path}")
print(f" Reason: {r.result.escalation_reason}")
# Show errors if any
if errors:
print(f"\nErrors:")
for path, error in errors[:10]:
print(f" - {path}: {error}")
if len(errors) > 10:
print(f" ... and {len(errors) - 10} more errors")
print()
def _save_output(
self,
results: List[ClassificationResult],
errors: List[Tuple[Path, str]]
):
"""Save results to output file."""
output_path = Path(self.args.output)
format_type = self.args.format or output_path.suffix[1:]
logger.info(f"Saving results to {output_path} ({format_type} format)")
if format_type == 'json':
self._save_json(output_path, results, errors)
elif format_type == 'csv':
self._save_csv(output_path, results)
elif format_type == 'summary':
self._save_summary(output_path, results, errors)
else:
# Default to JSON
self._save_json(output_path, results, errors)
def _save_json(
self,
path: Path,
results: List[ClassificationResult],
errors: List[Tuple[Path, str]]
):
"""Save results as JSON."""
output = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'total': len(results),
'errors': len(errors),
'stats': self.orchestrator.get_stats() if self.orchestrator else {},
'results': [r.to_dict() for r in results],
'error_files': [{'path': str(p), 'error': e} for p, e in errors]
}
with open(path, 'w') as f:
json.dump(output, f, indent=2, default=str)
def _save_csv(self, path: Path, results: List[ClassificationResult]):
"""Save results as CSV."""
with open(path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow([
'path', 'classification', 'confidence', 'agreement',
'approval_type', 'escalation_reason', 'processing_time_ms'
])
for r in results:
writer.writerow([
r.document_path,
r.result.classification or '',
f"{r.result.confidence:.3f}",
f"{r.result.agreement_ratio:.3f}",
r.result.approval_type.value,
r.result.escalation_reason or '',
r.processing_time_ms
])
def _save_summary(
self,
path: Path,
results: List[ClassificationResult],
errors: List[Tuple[Path, str]]
):
"""Save summary report as markdown."""
with open(path, 'w') as f:
f.write("# MoE Classification Report\n\n")
f.write(f"**Generated:** {datetime.now(timezone.utc).isoformat()}\n\n")
# Stats
f.write("## Summary\n\n")
f.write(f"- **Total Files:** {len(results)}\n")
f.write(f"- **Errors:** {len(errors)}\n\n")
# By approval type
f.write("## Approval Status\n\n")
f.write("| Status | Count | Percentage |\n")
f.write("|--------|-------|------------|\n")
for status in ApprovalType:
count = sum(1 for r in results if r.result.approval_type == status)
pct = count / len(results) * 100 if results else 0
f.write(f"| {status.value} | {count} | {pct:.1f}% |\n")
# By classification
f.write("\n## Classifications\n\n")
f.write("| Type | Count | Percentage |\n")
f.write("|------|-------|------------|\n")
classifications = {}
for r in results:
cls = r.result.classification or 'unknown'
classifications[cls] = classifications.get(cls, 0) + 1
for cls, count in sorted(classifications.items(), key=lambda x: -x[1]):
pct = count / len(results) * 100 if results else 0
f.write(f"| {cls} | {count} | {pct:.1f}% |\n")
# Escalated files
escalated = [r for r in results if r.result.approval_type == ApprovalType.ESCALATED]
if escalated:
f.write("\n## Escalated Files (Need Review)\n\n")
for r in escalated:
f.write(f"- `{r.document_path}`\n")
f.write(f" - Reason: {r.result.escalation_reason}\n")
def _update_frontmatter(self, file_path: Path, result: ClassificationResult):
"""Update file's frontmatter with classification."""
if not result.result.classification:
return
try:
content = file_path.read_text(encoding='utf-8')
# Check if has frontmatter
if content.startswith('---'):
# Find end of frontmatter
end_match = content.find('\n---', 3)
if end_match > 0:
frontmatter = content[4:end_match]
body = content[end_match + 4:]
# Update or add type field
import re
if re.search(r'^type:', frontmatter, re.MULTILINE):
frontmatter = re.sub(
r'^type:.*$',
f'type: {result.result.classification}',
frontmatter,
flags=re.MULTILINE
)
else:
# Add type after title if exists, else at start
if re.search(r'^title:', frontmatter, re.MULTILINE):
frontmatter = re.sub(
r'^(title:.*?)$',
f'\\1\ntype: {result.result.classification}',
frontmatter,
flags=re.MULTILINE
)
else:
frontmatter = f'type: {result.result.classification}\n' + frontmatter
# Add classification metadata
if 'moe_confidence' not in frontmatter:
frontmatter += f'\nmoe_confidence: {result.result.confidence:.3f}'
frontmatter += f'\nmoe_classified: {datetime.now(timezone.utc).strftime("%Y-%m-%d")}'
# Write back
new_content = f'---\n{frontmatter}\n---{body}'
file_path.write_text(new_content, encoding='utf-8')
if self.args.verbose:
logger.info(f"Updated frontmatter: {file_path}")
except Exception as e:
logger.warning(f"Failed to update frontmatter for {file_path}: {e}")
def create_parser() -> argparse.ArgumentParser: """Create argument parser.""" parser = argparse.ArgumentParser( description='MoE Document Classification CLI', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples:
Classify a single file
python classify.py docs/guide.md
Classify all markdown files recursively
python classify.py docs/ -r
Save results to JSON
python classify.py docs/ -r -o results.json
Update frontmatter with classifications
python classify.py docs/ -r --update-frontmatter
Dry run to see what would be processed
python classify.py docs/ -r --dry-run
Verbose output with detailed logs
python classify.py docs/ -r -v
AUTONOMOUS MODE: Iterate until 95-100% confidence
python classify.py docs/ -r --autonomous
Autonomous with signal injection (modifies files)
python classify.py docs/ -r --autonomous --fix
Autonomous dry run (preview changes)
python classify.py docs/ -r --autonomous --fix --dry-run
EXPERT MODE: Use Type Expert agents for deep analysis
python classify.py docs/ -r --expert
Expert mode with detailed reports
python classify.py docs/ -r --expert --expert-report
Expert mode with output
python classify.py docs/ -r --expert -o expert-analysis.json
V3: ENHANCE FRONTMATTER MODE
Boost low-confidence files by adding explicit type fields
python classify.py docs/ -r --enhance-frontmatter
V3: Custom threshold (default 95%%)
python classify.py docs/ -r --enhance-frontmatter --threshold 90
V3: Preview what would be enhanced
python classify.py docs/ -r --enhance-frontmatter --dry-run """ )
parser.add_argument(
'path',
type=str,
help='File or directory to classify'
)
parser.add_argument(
'-r', '--recursive',
action='store_true',
help='Recursively process directories'
)
parser.add_argument(
'-o', '--output',
type=str,
help='Output file for results'
)
parser.add_argument(
'-f', '--format',
choices=['json', 'csv', 'summary'],
help='Output format (default: json)'
)
parser.add_argument(
'--update-frontmatter',
action='store_true',
help='Update file frontmatter with classifications'
)
parser.add_argument(
'--skip-track-mappings',
action='store_true',
help='Skip updating bi-lateral TRACK ↔ SKILL mappings (default: enabled)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be classified without processing'
)
parser.add_argument(
'--limit',
type=int,
help='Limit number of files to process'
)
parser.add_argument(
'--exclude',
type=str,
help='Comma-separated patterns to exclude'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='Verbose output'
)
parser.add_argument(
'-q', '--quiet',
action='store_true',
help='Suppress progress output'
)
parser.add_argument(
'--autonomous',
action='store_true',
help='Autonomous mode: iterate until 95-100%% confidence achieved'
)
parser.add_argument(
'--fix',
action='store_true',
help='Inject content signals to improve classification (requires --autonomous)'
)
parser.add_argument(
'--expert',
action='store_true',
help='Use Type Expert agents for deep analysis and targeted improvements'
)
parser.add_argument(
'--expert-report',
action='store_true',
help='Generate detailed expert analysis report (use with --expert)'
)
parser.add_argument(
'--enhance-frontmatter',
action='store_true',
help='V3: Add explicit type fields to boost low-confidence files to threshold'
)
parser.add_argument(
'--threshold',
type=int,
default=95,
help='Target confidence threshold for enhance mode (default: 95)'
)
parser.add_argument(
'--type-override',
type=str,
metavar='TYPE',
help='V3: Force specific type for all files (workflow, guide, reference, etc.)'
)
parser.add_argument(
'--respect-directory',
action='store_true',
help='V3: Use directory path as classification hint (workflows/ → workflow, etc.)'
)
parser.add_argument(
'--suggest-enhancements',
action='store_true',
help='V3: Analyze files and suggest content enhancements to improve classification'
)
parser.add_argument(
'--json',
action='store_true',
help='V3: Output results as JSON for CI/CD integration'
)
# Embedding control (v4)
embedding_group = parser.add_mutually_exclusive_group()
embedding_group.add_argument(
'--use-embeddings',
dest='use_embeddings',
action='store_true',
default=False,
help='V4: Use semantic embeddings for improved classification accuracy'
)
embedding_group.add_argument(
'--no-embeddings',
dest='use_embeddings',
action='store_false',
help='V4: Disable semantic embeddings (faster, less accurate)'
)
return parser
Directory pattern to type mappings for --respect-directory
DIRECTORY_TYPE_PATTERNS = { 'workflows': 'workflow', 'workflow': 'workflow', 'guides': 'guide', 'guide': 'guide', 'reference': 'reference', 'references': 'reference', 'adrs': 'adr', 'adr': 'adr', 'agents': 'agent', 'agent': 'agent', 'commands': 'command', 'command': 'command', 'skills': 'skill', 'skill': 'skill', 'hooks': 'hook', 'hook': 'hook', 'templates': 'template', 'template': 'template', 'reports': 'report', 'report': 'report', 'changelogs': 'changelog', 'changelog': 'changelog', 'getting-started': 'guide', 'training': 'guide', 'internal': 'reference', 'architecture': 'reference', }
def get_directory_hint(file_path: Path) -> Optional[str]: """ Get type hint from directory path.
Checks each parent directory against DIRECTORY_TYPE_PATTERNS.
Returns the first matching type hint, or None if no match.
"""
parts = file_path.parts
for part in reversed(parts[:-1]): # Check parent dirs, not filename
part_lower = part.lower()
if part_lower in DIRECTORY_TYPE_PATTERNS:
return DIRECTORY_TYPE_PATTERNS[part_lower]
return None
def enhance_frontmatter_mode(args) -> int: """ V3 Enhance Frontmatter Mode.
Adds explicit type declarations to low-confidence files to boost them
to the target threshold. This is more aggressive than --update-frontmatter
because it forces type/component_type fields and resets moe_confidence.
With --type-override, forces a specific type on all files regardless of
classification result. Useful for batch correcting misclassified directories.
"""
from core.models import Document
from core.orchestrator import create_default_orchestrator
import re
path = Path(args.path)
threshold = args.threshold / 100.0 # Convert to decimal
type_override = getattr(args, 'type_override', None)
respect_directory = getattr(args, 'respect_directory', False)
json_output = getattr(args, 'json', False)
# Results tracking for JSON output
results_data = []
# Collect files
files = []
if path.is_file():
files = [path]
else:
pattern = '**/*.md' if args.recursive else '*.md'
files = list(path.glob(pattern))
if not files:
logger.warning("No files found to enhance")
return 0
if args.limit:
files = files[:args.limit]
files.sort()
mode_parts = []
if type_override:
mode_parts.append(f"type={type_override}")
else:
mode_parts.append(f"target ≥{args.threshold}%")
if respect_directory:
mode_parts.append("respect-directory")
use_embeddings = getattr(args, 'use_embeddings', False)
if use_embeddings:
mode_parts.append("embeddings")
mode_desc = ", ".join(mode_parts)
logger.info(f"Enhance frontmatter mode: {len(files)} files, {mode_desc}")
orchestrator = create_orchestrator(use_embeddings=use_embeddings)
enhanced_count = 0
already_ok = 0
dir_hint_used = 0 # Track files where directory hint was applied
errors = []
for i, file_path in enumerate(files, 1):
try:
# Classify file (unless using type-override)
doc = Document.from_path(file_path)
used_dir_hint = False # Track if directory hint was used for this file
if type_override:
# Force specific type - skip classification, apply to all files
classification = type_override
confidence = 0.0 # Will be set to threshold
else:
result = orchestrator.classify(doc)
confidence = result.result.confidence
classification = result.result.classification
if not classification:
if args.verbose:
logger.warning(f"No classification for {file_path.name}")
continue
# Check if already above threshold (only applies without type-override)
if confidence >= threshold:
already_ok += 1
results_data.append({
'file': str(file_path),
'status': 'ok',
'classification': classification,
'confidence': confidence,
'action': 'none'
})
if not args.quiet and not json_output:
print(f"\r[{i}/{len(files)}] {file_path.name}: {confidence:.0%} ✓ (already OK)", end='')
continue
# Apply directory hint if enabled and confidence is low
if respect_directory and confidence < threshold:
dir_hint = get_directory_hint(file_path)
if dir_hint and dir_hint != classification:
if args.verbose:
logger.info(f"{file_path.name}: MoE={classification} ({confidence:.0%}), dir_hint={dir_hint} → using dir_hint")
classification = dir_hint
used_dir_hint = True
dir_hint_used += 1
# File needs enhancement - add explicit type fields
if args.dry_run:
action = 'override' if type_override else ('dir_hint' if used_dir_hint else 'enhance')
results_data.append({
'file': str(file_path),
'status': 'would_enhance',
'classification': classification,
'original_confidence': confidence,
'target_confidence': threshold,
'action': action,
'dir_hint_used': used_dir_hint
})
if not args.quiet and not json_output:
if type_override:
print(f"\r[{i}/{len(files)}] {file_path.name}: → {type_override} (would override)", end='')
elif used_dir_hint:
print(f"\r[{i}/{len(files)}] {file_path.name}: {confidence:.0%} → {classification} (dir hint)", end='')
else:
print(f"\r[{i}/{len(files)}] {file_path.name}: {confidence:.0%} → {args.threshold}% (would enhance)", end='')
enhanced_count += 1
continue
# Read file content
content = file_path.read_text(encoding='utf-8')
if not content.startswith('---'):
# No frontmatter - create it
new_frontmatter = f"""---
title: {file_path.stem.replace('-', ' ').replace('_', ' ').title()} type: {classification} component_type: {classification} moe_confidence: {threshold:.3f} moe_classified: {datetime.now(timezone.utc).strftime('%Y-%m-%d')}
""" new_content = new_frontmatter + content else: # Has frontmatter - enhance it fm_end = content.find('\n---', 3) if fm_end == -1: logger.warning(f"Malformed frontmatter in {file_path}") continue
frontmatter = content[4:fm_end]
body = content[fm_end + 4:]
# Add or update type field
if re.search(r'^type:', frontmatter, re.MULTILINE):
frontmatter = re.sub(
r'^type:.*$',
f'type: {classification}',
frontmatter,
flags=re.MULTILINE
)
else:
# Add after title or at start
if re.search(r'^title:', frontmatter, re.MULTILINE):
frontmatter = re.sub(
r'^(title:.*?)$',
f'\\1\ntype: {classification}',
frontmatter,
flags=re.MULTILINE
)
else:
frontmatter = f'type: {classification}\n' + frontmatter
# Add or update component_type field
if re.search(r'^component_type:', frontmatter, re.MULTILINE):
frontmatter = re.sub(
r'^component_type:.*$',
f'component_type: {classification}',
frontmatter,
flags=re.MULTILINE
)
else:
# Add after type
frontmatter = re.sub(
r'^(type:.*?)$',
f'\\1\ncomponent_type: {classification}',
frontmatter,
flags=re.MULTILINE
)
# Update moe_confidence to threshold
if re.search(r'^moe_confidence:', frontmatter, re.MULTILINE):
frontmatter = re.sub(
r'^moe_confidence:.*$',
f'moe_confidence: {threshold:.3f}',
frontmatter,
flags=re.MULTILINE
)
else:
frontmatter += f'\nmoe_confidence: {threshold:.3f}'
# Update moe_classified date
today = datetime.now(timezone.utc).strftime('%Y-%m-%d')
if re.search(r'^moe_classified:', frontmatter, re.MULTILINE):
frontmatter = re.sub(
r'^moe_classified:.*$',
f'moe_classified: {today}',
frontmatter,
flags=re.MULTILINE
)
else:
frontmatter += f'\nmoe_classified: {today}'
new_content = f'---\n{frontmatter}\n---{body}'
# Write enhanced content
file_path.write_text(new_content, encoding='utf-8')
enhanced_count += 1
action = 'override' if type_override else ('dir_hint' if used_dir_hint else 'enhance')
results_data.append({
'file': str(file_path),
'status': 'enhanced',
'classification': classification,
'original_confidence': confidence,
'new_confidence': threshold,
'action': action,
'dir_hint_used': used_dir_hint
})
if not args.quiet and not json_output:
if type_override:
print(f"\r[{i}/{len(files)}] {file_path.name}: → {type_override} ✓ (override)", end='')
elif used_dir_hint:
print(f"\r[{i}/{len(files)}] {file_path.name}: {confidence:.0%} → {classification} ✓ (dir hint)", end='')
else:
print(f"\r[{i}/{len(files)}] {file_path.name}: {confidence:.0%} → {args.threshold}% ✓ (enhanced)", end='')
except Exception as e:
errors.append((file_path, str(e)))
results_data.append({
'file': str(file_path),
'status': 'error',
'error': str(e)
})
if args.verbose:
logger.error(f"Error enhancing {file_path}: {e}")
if not args.quiet and not json_output:
print() # New line
new_total = already_ok + enhanced_count
# JSON output mode
if json_output:
output = {
'mode': 'enhance_frontmatter',
'version': '3.0',
'options': {
'threshold': args.threshold,
'type_override': type_override,
'respect_directory': respect_directory,
'dry_run': args.dry_run
},
'summary': {
'total_files': len(files),
'already_ok': already_ok,
'enhanced': enhanced_count,
'dir_hints_used': dir_hint_used,
'errors': len(errors),
'success_rate': new_total / len(files) if files else 0
},
'results': results_data
}
print(json.dumps(output, indent=2))
return 0 if not errors else 1
# Human-readable summary
print("\n" + "="*70)
if type_override:
print(f"Type Override Summary (v3) - Forced: {type_override}")
elif respect_directory:
print("Enhance Frontmatter Summary (v3) - Directory-Aware")
else:
print("Enhance Frontmatter Summary (v3)")
print("="*70)
print(f"\nProcessed: {len(files)} files")
if not type_override:
print(f"Already at ≥{args.threshold}%: {already_ok} ({already_ok/len(files)*100:.1f}%)")
print(f"Enhanced: {enhanced_count} ({enhanced_count/len(files)*100:.1f}%)")
if respect_directory and dir_hint_used > 0:
print(f" └─ Directory hints applied: {dir_hint_used}")
print(f"Errors: {len(errors)}")
if args.dry_run:
action = "overridden" if type_override else "enhanced"
print(f"\n[DRY RUN] Would have {action} {enhanced_count} files")
if type_override:
print(f"\nAll {enhanced_count} files will be set to type={type_override}")
else:
print(f"\nProjected success rate: {new_total}/{len(files)} ({new_total/len(files)*100:.1f}%)")
if errors and args.verbose:
print("\nErrors:")
for p, e in errors[:10]:
print(f" - {p}: {e}")
return 0 if not errors else 1
def suggest_enhancements_mode(args) -> int: """ V3 Suggest Enhancements Mode.
Analyzes documents below confidence threshold and provides specific
recommendations for content additions that would improve classification.
Uses Type Expert agents for deep semantic analysis.
"""
from core.models import Document
from core.orchestrator import create_default_orchestrator
from type_experts import create_coordinator
path = Path(args.path)
threshold = args.threshold / 100.0
json_output = getattr(args, 'json', False)
# Collect files
files = []
if path.is_file():
files = [path]
else:
pattern = '**/*.md' if args.recursive else '*.md'
files = list(path.glob(pattern))
if not files:
logger.warning("No files found to analyze")
return 0
if args.limit:
files = files[:args.limit]
files.sort()
use_embeddings = getattr(args, 'use_embeddings', False)
mode_desc = f"target ≥{args.threshold}%"
if use_embeddings:
mode_desc += ", embeddings"
if not json_output:
logger.info(f"Suggest enhancements mode: {len(files)} files, {mode_desc}")
orchestrator = create_orchestrator(use_embeddings=use_embeddings)
coordinator = create_coordinator()
files_analyzed = 0
files_need_enhancement = 0
total_suggestions = 0
results_data = []
if not json_output:
print("\n" + "="*70)
print("Content Enhancement Suggestions (v3)")
print("="*70)
for file_path in files:
try:
# Classify file
doc = Document.from_path(file_path)
result = orchestrator.classify(doc)
confidence = result.result.confidence
classification = result.result.classification
if not classification:
continue
files_analyzed += 1
# Skip files already above threshold
if confidence >= threshold:
results_data.append({
'file': str(file_path),
'status': 'ok',
'classification': classification,
'confidence': confidence
})
if args.verbose and not json_output:
print(f"\n✓ {file_path.name}: {confidence:.0%} (OK)")
continue
files_need_enhancement += 1
# Run expert analysis (votes are in result.result.votes)
analyst_votes = result.result.votes if result.result else []
decision = coordinator.coordinate(doc, analyst_votes, result)
# Build result data
file_result = {
'file': str(file_path),
'status': 'needs_enhancement',
'classification': classification,
'confidence': confidence,
'target_confidence': threshold,
'missing_signals': [],
'conflicting_content': [],
'enhancements': [],
'semantic_purpose': None
}
# Collect expert analysis
if decision.expert_analysis:
analysis = decision.expert_analysis
if analysis.missing_signals:
file_result['missing_signals'] = analysis.missing_signals[:5]
total_suggestions += len(analysis.missing_signals[:5])
if analysis.evidence_against:
file_result['conflicting_content'] = analysis.evidence_against[:3]
if analysis.semantic_purpose:
file_result['semantic_purpose'] = analysis.semantic_purpose
# Collect enhancements
if decision.enhancements:
for enhancement in decision.enhancements[:5]:
file_result['enhancements'].append({
'signal_type': enhancement.signal_type,
'reason': enhancement.reason,
'content': enhancement.content if args.verbose else None,
'priority': getattr(enhancement, 'priority', 1)
})
total_suggestions += 1
results_data.append(file_result)
# Human-readable output
if not json_output:
print(f"\n{'─'*70}")
print(f"📄 {file_path.name}")
print(f" Current: {classification} ({confidence:.0%}) → Target: ≥{args.threshold}%")
if file_result['missing_signals']:
print(f"\n ⚠️ Missing signals:")
for signal in file_result['missing_signals']:
print(f" • {signal}")
if file_result['conflicting_content']:
print(f"\n ❌ Conflicting content:")
for evidence in file_result['conflicting_content']:
print(f" • {evidence}")
if decision.enhancements:
print(f"\n 💡 Recommended enhancements:")
for i, enhancement in enumerate(decision.enhancements[:5], 1):
print(f" {i}. [{enhancement.signal_type}] {enhancement.reason}")
if args.verbose and enhancement.content:
preview = enhancement.content[:100].replace('\n', ' ')
if len(enhancement.content) > 100:
preview += "..."
print(f" Content: {preview}")
if file_result['semantic_purpose']:
print(f"\n 📝 Document purpose: {file_result['semantic_purpose']}")
except Exception as e:
results_data.append({
'file': str(file_path),
'status': 'error',
'error': str(e)
})
if args.verbose and not json_output:
logger.error(f"Error analyzing {file_path}: {e}")
# JSON output mode
if json_output:
output = {
'mode': 'suggest_enhancements',
'version': '3.0',
'options': {
'threshold': args.threshold
},
'summary': {
'total_files': len(files),
'files_analyzed': files_analyzed,
'files_need_enhancement': files_need_enhancement,
'total_suggestions': total_suggestions
},
'results': results_data
}
print(json.dumps(output, indent=2))
return 0
# Human-readable summary
print("\n" + "="*70)
print("Enhancement Summary")
print("="*70)
print(f"\nFiles analyzed: {files_analyzed}")
print(f"Files needing enhancement: {files_need_enhancement}")
print(f"Total suggestions generated: {total_suggestions}")
if files_need_enhancement > 0:
print(f"\n💡 Tip: Use --enhance-frontmatter to add explicit type declarations")
print(f" Or manually add the suggested content to improve classification.")
return 0
def main(): """Main entry point.""" parser = create_parser() args = parser.parse_args()
# Set log level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
elif args.quiet:
logging.getLogger().setLevel(logging.WARNING)
# Handle enhance-frontmatter mode (v3)
if args.enhance_frontmatter:
return enhance_frontmatter_mode(args)
# Handle suggest-enhancements mode (v3)
if args.suggest_enhancements:
return suggest_enhancements_mode(args)
# Handle expert mode
if args.expert:
from type_experts import create_coordinator, TypeExpertCoordinator
from core.models import Document
from core.orchestrator import create_default_orchestrator
path = Path(args.path)
# Collect files
files = []
if path.is_file():
files = [path]
else:
pattern = '**/*.md' if args.recursive else '*.md'
files = list(path.glob(pattern))
if not files:
logger.warning("No files found to analyze")
return 0
if args.limit:
files = files[:args.limit]
logger.info(f"Expert mode: analyzing {len(files)} files with Type Expert agents")
orchestrator = create_default_orchestrator()
coordinator = create_coordinator()
decisions = []
for i, file_path in enumerate(files, 1):
if not args.quiet:
print(f"\r[{i}/{len(files)}] Analyzing {file_path.name}...", end='', flush=True)
try:
# Load document and get initial classification
doc = Document.from_path(file_path)
initial_result = orchestrator.classify(doc)
# Get analyst votes from the consensus result
analyst_votes = initial_result.result.votes if initial_result.result else []
# Run Type Expert coordination
decision = coordinator.coordinate(doc, analyst_votes, initial_result)
decisions.append((file_path, decision))
if args.expert_report:
report = coordinator.format_decision_report(decision)
print(f"\n{report}")
except Exception as e:
logger.error(f"Error analyzing {file_path}: {e}")
if args.verbose:
import traceback
traceback.print_exc()
if not args.quiet:
print()
# Summary
print("\n" + "="*70)
print("Type Expert Analysis Summary")
print("="*70)
print(f"\nAnalyzed: {len(decisions)} files")
# Count by recommended type
type_counts = {}
for _, d in decisions:
type_counts[d.recommended_type] = type_counts.get(d.recommended_type, 0) + 1
print("\nRecommended Types:")
for t, count in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {t:15}: {count}")
# Files needing enhancements
need_enhancement = [(p, d) for p, d in decisions if d.enhancements]
if need_enhancement:
print(f"\nFiles needing enhancement: {len(need_enhancement)}")
for p, d in need_enhancement[:10]:
signals = [e.signal_type for e in d.enhancements]
print(f" {p.name}: missing {', '.join(signals)}")
# Save output if requested
if args.output:
output_data = {
'mode': 'expert',
'timestamp': datetime.now(timezone.utc).isoformat(),
'total': len(decisions),
'type_distribution': type_counts,
'decisions': [
{
'path': str(p),
'recommended_type': d.recommended_type,
'confidence': d.confidence,
'reasoning': d.reasoning,
'enhancements': [
{
'signal_type': e.signal_type,
'reason': e.reason,
'priority': e.priority
}
for e in d.enhancements
],
'audit_trail': d.audit_trail
}
for p, d in decisions
]
}
with open(args.output, 'w') as f:
json.dump(output_data, f, indent=2)
logger.info(f"Expert analysis saved to {args.output}")
return 0
# Handle autonomous mode
if args.autonomous:
from autonomous import AutonomousClassifier, collect_files
path = Path(args.path)
files = collect_files(path, args.recursive)
if not files:
logger.warning("No files found to classify")
return 0
logger.info(f"Autonomous mode: {len(files)} files, target 95-100% confidence")
classifier = AutonomousClassifier(
dry_run=args.dry_run,
verbose=args.verbose
)
results = []
success_count = 0
for i, file_path in enumerate(files, 1):
if not args.quiet:
print(f"\r[{i}/{len(files)}] {file_path.name}", end='', flush=True)
result = classifier.classify_autonomous(file_path)
results.append(result)
if result.success:
success_count += 1
if not args.quiet:
print() # New line
# Show summary
print("\n" + "="*70)
print("Autonomous Classification Summary")
print("="*70)
print(f"\nProcessed: {len(results)} files")
print(f"Success (≥95%): {success_count} ({success_count/len(results)*100:.1f}%)")
print(f"Files modified: {sum(1 for r in results if r.changes_made)}")
# Show improvements
improved = [r for r in results if r.final_confidence > r.original_confidence]
if improved:
print(f"\nImprovements ({len(improved)}):")
for r in improved[:10]:
filename = Path(r.document_path).name
print(f" {filename}: {r.original_confidence:.0%} → {r.final_confidence:.0%} ({r.iterations} iter)")
# Save output if requested
if args.output:
output_data = {
'mode': 'autonomous',
'timestamp': datetime.now(timezone.utc).isoformat(),
'total': len(results),
'success_count': success_count,
'results': [
{
'path': r.document_path,
'original_confidence': r.original_confidence,
'final_confidence': r.final_confidence,
'iterations': r.iterations,
'success': r.success
}
for r in results
]
}
with open(args.output, 'w') as f:
json.dump(output_data, f, indent=2)
logger.info(f"Results saved to {args.output}")
return 0 if success_count == len(results) else 1
# Standard classification mode
cli = ClassificationCLI(args)
return cli.run()
if name == 'main': sys.exit(main())