scripts-batch-processor
#!/usr/bin/env python3 """Batch Corpus Analyzer for MoE Classification."""
import logging from dataclasses import dataclass, field from typing import Dict, List, Optional, Set from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed
logger = logging.getLogger(name)
@dataclass class BatchConfig: max_workers: int = 5 cluster_threshold: float = 0.7
@dataclass class CorpusProfile: document_count: int directory_count: int type_distribution: Dict[str, int] directories: Dict[str, List[str]]
@dataclass class DocumentCluster: cluster_id: str document_paths: List[str] suggested_type: Optional[str] confidence: float
class BatchCorpusAnalyzer: """Analyzes document corpus for cross-document patterns."""
def __init__(self, config: Optional[BatchConfig] = None):
self.config = config or BatchConfig()
def profile_corpus(self, document_paths: List[str]) -> CorpusProfile:
"""Build corpus profile from document paths."""
directories: Dict[str, List[str]] = {}
type_dist: Dict[str, int] = {}
for path_str in document_paths:
path = Path(path_str)
parent = str(path.parent)
directories.setdefault(parent, []).append(path.name)
# Infer type from directory
dir_hints = {'agents': 'agent', 'commands': 'command', 'skills': 'skill',
'guides': 'guide', 'workflows': 'workflow', 'adrs': 'adr', 'reference': 'reference'}
for hint_dir, hint_type in dir_hints.items():
if hint_dir in parent.lower():
type_dist[hint_type] = type_dist.get(hint_type, 0) + 1
break
return CorpusProfile(
document_count=len(document_paths),
directory_count=len(directories),
type_distribution=type_dist,
directories=directories
)
def cluster_documents(self, document_paths: List[str]) -> List[DocumentCluster]:
"""Cluster documents by directory/naming patterns."""
clusters = []
dir_clusters: Dict[str, List[str]] = {}
for path_str in document_paths:
path = Path(path_str)
parent = str(path.parent)
dir_clusters.setdefault(parent, []).append(path_str)
dir_type_map = {'agents': 'agent', 'commands': 'command', 'skills': 'skill',
'guides': 'guide', 'workflows': 'workflow', 'adrs': 'adr',
'reference': 'reference', 'docs': None}
for parent, paths in dir_clusters.items():
suggested = None
conf = 0.5
for hint_dir, hint_type in dir_type_map.items():
if hint_dir in parent.lower() and hint_type:
suggested = hint_type
conf = 0.8
break
clusters.append(DocumentCluster(
cluster_id=parent,
document_paths=paths,
suggested_type=suggested,
confidence=conf
))
return clusters
def analyze_corpus(self, document_paths: List[str], classify_fn=None) -> Dict:
"""Full corpus analysis with optional classification."""
profile = self.profile_corpus(document_paths)
clusters = self.cluster_documents(document_paths)
result = {
'corpus_profile': {
'document_count': profile.document_count,
'directory_count': profile.directory_count,
'type_distribution': profile.type_distribution
},
'clusters': len(clusters),
'cluster_details': [
{'id': c.cluster_id, 'count': len(c.document_paths),
'type': c.suggested_type, 'confidence': c.confidence}
for c in clusters
]
}
if classify_fn:
classifications = []
with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
futures = {executor.submit(classify_fn, p): p for p in document_paths}
for future in as_completed(futures):
path = futures[future]
try:
classifications.append({'path': path, 'result': future.result()})
except Exception as e:
classifications.append({'path': path, 'error': str(e)})
result['classifications'] = classifications
return result
def get_consistency_check(self, clusters: List[DocumentCluster], results: List[Dict]) -> Dict:
"""Check if classifications are consistent within clusters."""
issues = []
for cluster in clusters:
cluster_results = [r for r in results if r.get('path') in cluster.document_paths]
types = set(r.get('result', {}).get('classification') for r in cluster_results if r.get('result'))
if len(types) > 1:
issues.append({
'cluster': cluster.cluster_id,
'expected': cluster.suggested_type,
'found_types': list(types),
'issue': 'Inconsistent types in cluster'
})
return {'consistent': len(issues) == 0, 'issues': issues}
_analyzer: Optional[BatchCorpusAnalyzer] = None
def get_batch_analyzer(config: Optional[BatchConfig] = None) -> BatchCorpusAnalyzer: global _analyzer if _analyzer is None: _analyzer = BatchCorpusAnalyzer(config) return _analyzer