#!/usr/bin/env python3 """ Semantic Embedding Service for MoE Classification System.
Provides true vector embeddings using sentence-transformers for improved document classification accuracy. Replaces regex-based pattern matching with semantic similarity.
Features:
- Pre-computed exemplar embeddings for each document type
- Efficient similarity calculation via cosine similarity
- Caching support for repeated classifications
- Graceful fallback when model unavailable """
from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple from pathlib import Path import json import hashlib import logging
logger = logging.getLogger(name)
Try to import sentence-transformers, fallback gracefully
try: from sentence_transformers import SentenceTransformer import numpy as np EMBEDDINGS_AVAILABLE = True except ImportError: EMBEDDINGS_AVAILABLE = False logger.warning("sentence-transformers not available. Using fallback pattern matching.")
@dataclass class EmbeddingConfig: """Configuration for embedding service.""" model_name: str = "all-MiniLM-L6-v2" # 80MB, ~14K docs/sec on CPU max_content_length: int = 8000 # Truncate for efficiency cache_enabled: bool = True cache_path: Optional[str] = None min_similarity_threshold: float = 0.3
@dataclass class SimilarityResult: """Result from semantic similarity analysis.""" classification: str confidence: float similarities: Dict[str, float] evidence: List[str] = field(default_factory=list)
Exemplar content for each document type - representative samples
EXEMPLAR_CONTENT = { "agent": [ """You are a specialized AI agent responsible for software architecture and system design. Your capabilities include code review, design patterns, and technical guidance. When to use: complex system design tasks.""", """System Prompt: You are an expert developer agent. Core Responsibilities: Write clean code, review pull requests, debug issues. Technical Expertise: Python, JavaScript, cloud infrastructure.""", """This agent specializes in database optimization and query performance. Capabilities: index analysis, query planning, schema design. Invocation: /agent database-specialist 'optimize queries'""" ], "command": [ """Usage: /git-sync [options] This slash command synchronizes all git submodules. Arguments: --target <all|specific>, --mode <full|quick> Examples: /git-sync --target all --mode full""", """Invocation: /classify A command for classifying documents using the MoE system. System Prompt: Execute document classification workflow. Options: -r recursive, --expert use type experts""", """Command: /export Exports session context to backup storage. Execution Directive: Run immediately when invoked.""" ], "skill": [ """Skill Definition: Git Workflow Automation When to Apply: Multi-repository synchronization tasks Pattern: Bottom-up commit, push from submodules to master Implementation: See scripts/git-sync.py""", """SKILL.md - Code Review Patterns This skill provides reusable patterns for automated code review. When to use: Before merging pull requests. Capability: Static analysis.""", """Skill: Documentation Generation Reusable pattern for generating API documentation. Input: Source code. Output: Markdown docs.""" ], "adr": [ """# ADR-001: Use PostgreSQL for Primary Database ## Status: Accepted ## Context: We need a reliable RDBMS for multi-tenant data. ## Decision: We will use PostgreSQL with pgvector extension. ## Consequences: Need DBA expertise, better query capabilities.""", """Architecture Decision Record - Authentication System Status: Proposed Context: Need secure user authentication Decision: Implement OAuth2 with JWT tokens Alternatives Considered: Session-based auth, API keys""", """ADR-015: Container Orchestration Status: Accepted | Supersedes: ADR-003 Context: Need scalable deployment Decision: Kubernetes on GKE Consequences: Learning curve, operational complexity""" ], "guide": [ """# Getting Started Guide ## Prerequisites: Python 3.10+, Git ## Step 1: Clone the repository ## Step 2: Install dependencies ## Troubleshooting: Common issues and solutions You will learn how to set up the development environment.""", """User Guide: Component Activation This guide explains how to activate framework components. Quick Start: Follow these steps to get started. Best Practices: Always test before deploying.""", """Cookbook: Common Recipes Recipe 1: Create a new agent Recipe 2: Add a slash command Step-by-step instructions with examples.""" ], "workflow": [ """# CI/CD Workflow ## Phase 1: Build ## Phase 2: Test ## Phase 3: Deploy Automation pipeline for continuous integration.""", """Workflow: Document Classification Step 1: Load document Step 2: Run analysts in parallel Step 3: Calculate consensus Step 4: Judge validation Step 5: Output result""", """Process: Sprint Planning Phase 1: Backlog grooming Phase 2: Estimation Phase 3: Commitment Orchestration of agile ceremonies.""" ], "reference": [ """# API Reference ## Endpoints | Method | Path | Description | | GET | /api/v1/users | List users | | POST | /api/v1/users | Create user | ## Parameters - id: string - User identifier""", """Component Reference Complete inventory of framework components. Table of Contents | Quick Navigation System Design: Architecture overview Specification: Technical requirements""", """Architecture Overview How It Works: Request flow diagram Quick Index: Jump to sections Configuration: Parameters and settings""" ], "config": [ """# Configuration File settings: database_url: postgresql://localhost/db cache_enabled: true log_level: INFO""", """Environment Configuration Required Variables: - API_KEY: Authentication key - DATABASE_URL: Connection string Optional Settings: - DEBUG: Enable debug mode""" ], "hook": [ """# Pre-commit Hook Trigger: Before git commit Action: Run linters and tests Script: ./scripts/pre-commit.sh""", """Post-deploy Hook Event: After successful deployment Handler: Send notification to Slack Configuration: hooks/post-deploy.yaml""" ], "script": [ """#!/usr/bin/env python3 '''Automation script for database backup.''' import subprocess def backup_database(): subprocess.run(['pg_dump', '-f', 'backup.sql']) if name == 'main': backup_database()""", """#!/bin/bash # Script: deploy.sh # Purpose: Deploy application to production docker build -t app . docker push registry/app:latest kubectl apply -f k8s/""" ] }
class SemanticEmbeddingService: """ True semantic embedding service for document classification. Uses sentence-transformers for efficient local embeddings. """
def __init__(self, config: Optional[EmbeddingConfig] = None):
self.config = config or EmbeddingConfig()
self.model = None
self.exemplar_embeddings: Dict[str, 'np.ndarray'] = {}
self.embedding_cache: Dict[str, 'np.ndarray'] = {}
self._initialized = False
if EMBEDDINGS_AVAILABLE:
self._initialize()
def _initialize(self):
"""Initialize model and compute exemplar embeddings."""
if not EMBEDDINGS_AVAILABLE:
logger.warning("Embeddings not available - using fallback")
return
try:
logger.info(f"Loading embedding model: {self.config.model_name}")
self.model = SentenceTransformer(self.config.model_name)
# Pre-compute exemplar embeddings
self._compute_exemplar_embeddings()
self._initialized = True
logger.info(f"Embedding service initialized with {len(self.exemplar_embeddings)} types")
except Exception as e:
logger.error(f"Failed to initialize embedding service: {e}")
self._initialized = False
def _compute_exemplar_embeddings(self):
"""Pre-compute embeddings for known document types."""
if not self.model:
return
for doc_type, exemplars in EXEMPLAR_CONTENT.items():
# Compute embeddings for all exemplars
embeddings = self.model.encode(exemplars)
# Store mean embedding as type representative
self.exemplar_embeddings[doc_type] = np.mean(embeddings, axis=0)
logger.info(f"Computed exemplar embeddings for {len(self.exemplar_embeddings)} document types")
def _get_content_hash(self, content: str) -> str:
"""Generate hash for content caching."""
return hashlib.md5(content[:self.config.max_content_length].encode()).hexdigest()
def _get_embedding(self, content: str) -> Optional['np.ndarray']:
"""Get embedding for content, using cache if available."""
if not self._initialized or not self.model:
return None
content_hash = self._get_content_hash(content)
# Check cache
if self.config.cache_enabled and content_hash in self.embedding_cache:
return self.embedding_cache[content_hash]
# Compute embedding
truncated = content[:self.config.max_content_length]
embedding = self.model.encode(truncated)
# Cache result
if self.config.cache_enabled:
self.embedding_cache[content_hash] = embedding
return embedding
def classify(self, content: str) -> SimilarityResult:
"""
Classify document by embedding similarity.
Args:
content: Document content to classify
Returns:
SimilarityResult with classification and confidence
"""
if not self._initialized:
return self._fallback_classify(content)
doc_embedding = self._get_embedding(content)
if doc_embedding is None:
return self._fallback_classify(content)
# Calculate similarities to each type
similarities: Dict[str, float] = {}
for doc_type, exemplar_emb in self.exemplar_embeddings.items():
# Cosine similarity
similarity = float(np.dot(doc_embedding, exemplar_emb) / (
np.linalg.norm(doc_embedding) * np.linalg.norm(exemplar_emb)
))
similarities[doc_type] = similarity
# Get best match
best_type = max(similarities, key=similarities.get)
best_score = similarities[best_type]
# Build evidence
evidence = []
sorted_types = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
evidence.append(f"Top match: {best_type} ({best_score:.1%})")
if len(sorted_types) > 1:
second_type, second_score = sorted_types[1]
evidence.append(f"Second: {second_type} ({second_score:.1%})")
margin = best_score - second_score
if margin > 0.1:
evidence.append(f"Clear margin: {margin:.1%}")
elif margin < 0.05:
evidence.append("Close call - low margin")
return SimilarityResult(
classification=best_type,
confidence=max(0.0, min(1.0, best_score)),
similarities=similarities,
evidence=evidence
)
def _fallback_classify(self, content: str) -> SimilarityResult:
"""Fallback classification when embeddings unavailable."""
# Simple keyword-based fallback
content_lower = content.lower()
scores = {
"agent": 0.0,
"command": 0.0,
"skill": 0.0,
"adr": 0.0,
"guide": 0.0,
"workflow": 0.0,
"reference": 0.0,
"config": 0.0,
"hook": 0.0,
"script": 0.0
}
# Keyword scoring
keywords = {
"agent": ["agent", "specialist", "you are a", "capabilities", "system prompt"],
"command": ["usage:", "invocation:", "/", "command", "arguments"],
"skill": ["skill", "pattern", "when to apply", "skill.md"],
"adr": ["adr", "decision", "status:", "context:", "consequences"],
"guide": ["guide", "step", "tutorial", "getting started", "troubleshooting"],
"workflow": ["workflow", "phase", "pipeline", "process", "automation"],
"reference": ["reference", "api", "specification", "table", "parameters"],
"config": ["config", "settings", "environment", "variables"],
"hook": ["hook", "trigger", "event", "pre-", "post-"],
"script": ["#!/", "script", "subprocess", "def main", "if __name__"]
}
for doc_type, kws in keywords.items():
for kw in kws:
if kw in content_lower:
scores[doc_type] += 0.15
best_type = max(scores, key=scores.get)
best_score = min(0.7, scores[best_type]) # Cap fallback confidence
return SimilarityResult(
classification=best_type,
confidence=best_score,
similarities=scores,
evidence=["Using fallback keyword matching (embeddings unavailable)"]
)
def get_similar_types(self, content: str, top_k: int = 3) -> List[Tuple[str, float]]:
"""Get top-k similar document types."""
result = self.classify(content)
sorted_types = sorted(result.similarities.items(), key=lambda x: x[1], reverse=True)
return sorted_types[:top_k]
def is_available(self) -> bool:
"""Check if embedding service is available."""
return self._initialized
def clear_cache(self):
"""Clear embedding cache."""
self.embedding_cache.clear()
logger.info("Embedding cache cleared")
def get_stats(self) -> Dict:
"""Get service statistics."""
return {
"initialized": self._initialized,
"model": self.config.model_name if self._initialized else None,
"exemplar_types": len(self.exemplar_embeddings),
"cache_size": len(self.embedding_cache),
"embeddings_available": EMBEDDINGS_AVAILABLE
}
Singleton instance
_embedding_service: Optional[SemanticEmbeddingService] = None
def get_embedding_service(config: Optional[EmbeddingConfig] = None) -> SemanticEmbeddingService: """Get or create singleton embedding service.""" global _embedding_service if _embedding_service is None: _embedding_service = SemanticEmbeddingService(config) return _embedding_service
if name == "main": # Test the embedding service logging.basicConfig(level=logging.INFO)
service = get_embedding_service()
print(f"Service stats: {service.get_stats()}")
# Test classification
test_content = """
# Getting Started Guide
## Prerequisites
- Python 3.10+
- Git
## Step 1: Installation
Clone the repository and install dependencies.
## Troubleshooting
Common issues and solutions.
"""
result = service.classify(test_content)
print(f"\nClassification: {result.classification}")
print(f"Confidence: {result.confidence:.1%}")
print(f"Evidence: {result.evidence}")
print(f"\nAll similarities:")
for doc_type, score in sorted(result.similarities.items(), key=lambda x: x[1], reverse=True):
print(f" {doc_type}: {score:.1%}")