ADR-021-v4: Unified Hybrid Graph System (Part 2: Technical)
Document: ADR-021-v4-unified-hybrid-graph-system-part2-technical
Version: 2.0.0
Purpose: Complete technical implementation for unified hybrid graph system
Audience: Developers, AI agents implementing graph system
Date Created: 2025-08-30
Date Modified: 2025-09-01
QA Reviewed: Pending
Status: DRAFT
Supersedes: v1.0.0
Table of Contents​
- 1. Document Information
- 2. Implementation Overview
- 3. Core Graph Abstraction
- 4. Domain Implementations
- 5. Query Engine
- 6. Storage Integration
- 7. Logging Patterns
- 8. Error Handling
- 9. Testing Implementation
- 10. Deployment Configuration
- 11. Performance Benchmarks
- 12. Security Implementation
- 13. Migration Scripts
- 14. Operational Runbooks
- 15. Monitoring Setup
- 16. QA Review Block
1. Document Information 🔴 REQUIRED​
| Field | Value |
|---|---|
| ADR Number | ADR-021 |
| Title | Unified Hybrid Graph System - Technical |
| Status | Draft |
| Date Created | 2025-08-30 |
| Last Modified | 2025-09-01 |
| Version | 2.0.0 |
| Dependencies | FoundationDB 7.1+, Qdrant 1.7+, Rust 1.75+ |
2. Implementation Overview 🔴 REQUIRED​
2.1 Dependencies​
# cargo.toml
[dependencies]
foundationdb = "0.8.0"
qdrant-client = "1.7.0"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
uuid = { version = "1.6", features = ["v4", "serde"] }
anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] }
tracing = "0.1"
2.2 Module Structure​
// src/graph/mod.rs
pub mod traits;
pub mod engine;
pub mod domains;
pub mod storage;
pub mod query;
3. Core Graph Abstraction 🔴 REQUIRED​
Core Abstraction: UnifiedGraphSystem<T>​
//! Universal hybrid graph system for any knowledge domain
use anyhow::Result;
use foundationdb as fdb;
use qdrant_client::prelude::*;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;
pub trait GraphNodeTrait: Clone + Serialize + for<'de> Deserialize<'de> {
fn id(&self) -> Uuid;
fn content_for_embedding(&self) -> String;
fn node_type(&self) -> &str;
fn metadata(&self) -> &HashMap<String, serde_json::Value>;
}
pub struct UnifiedGraphSystem<T: GraphNodeTrait> {
fdb: Arc<fdb::Database>,
qdrant: Arc<QdrantClient>,
tenant_id: Uuid,
graph_type: String,
collection_name: String,
embedder: Arc<dyn Embedder>,
config: GraphConfig,
_phantom: std::marker::PhantomData<T>,
}
#[derive(Clone)]
pub struct GraphConfig {
pub embedding_dimension: usize,
pub cache_size: usize,
pub batch_size: usize,
pub semantic_threshold: f64,
}
Concrete Implementations​
1. Log Graph System​
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogNode {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub level: LogLevel,
pub message: String,
pub service: String,
pub metadata: HashMap<String, serde_json::Value>,
}
impl GraphNodeTrait for LogNode {
fn id(&self) -> Uuid { self.id }
fn content_for_embedding(&self) -> String {
format!("{} {} {}", self.message, self.service,
self.metadata.get("error_code").unwrap_or(&json!("")).as_str().unwrap_or(""))
}
fn node_type(&self) -> &str { "log_entry" }
fn metadata(&self) -> &HashMap<String, serde_json::Value> { &self.metadata }
}
pub type LogGraphSystem = UnifiedGraphSystem<LogNode>;
impl LogGraphSystem {
pub async fn find_error_patterns(&self, time_window: Duration) -> Result<Vec<ErrorPattern>> {
let query = GraphQuery::new()
.time_range(Utc::now() - time_window, Utc::now())
.node_filter("level", vec!["Error", "Critical"])
.semantic_clustering(0.8, 10);
self.query_with_clustering(query).await
}
pub async fn predict_failures(&self, service: &str) -> Result<Vec<FailurePrediction>> {
let recent_patterns = self.find_error_patterns(Duration::hours(24)).await?;
let historical_chains = self.find_escalation_chains(service, Duration::days(30)).await?;
// ML logic for prediction based on patterns
Ok(vec![])
}
}
2. Prompt Graph System​
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromptNode {
pub id: Uuid,
pub template: String,
pub techniques: Vec<String>,
pub domain: String,
pub success_rate: f64,
pub token_efficiency: f64,
pub metadata: HashMap<String, serde_json::Value>,
}
impl GraphNodeTrait for PromptNode {
fn id(&self) -> Uuid { self.id }
fn content_for_embedding(&self) -> String {
format!("{} {} {}", self.template, self.domain, self.techniques.join(" "))
}
fn node_type(&self) -> &str { "prompt_template" }
fn metadata(&self) -> &HashMap<String, serde_json::Value> { &self.metadata }
}
pub type PromptGraphSystem = UnifiedGraphSystem<PromptNode>;
impl PromptGraphSystem {
pub async fn optimize_for_intent(&self, intent: &str, domain: &str) -> Result<PromptNode> {
let query = GraphQuery::new()
.semantic_search(intent, 20)
.metadata_filter("domain", domain)
.min_performance(0.8);
let candidates = self.hybrid_search(query).await?;
self.select_optimal_prompt(candidates).await
}
pub async fn learn_from_feedback(&self, prompt_id: Uuid, success: bool, tokens_used: usize) -> Result<()> {
self.update_performance_metrics(prompt_id, success, tokens_used).await?;
self.trigger_pattern_learning(prompt_id).await?;
Ok(())
}
}
3. Code Graph System​
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CodeNode {
pub id: Uuid,
pub file_path: String,
pub function_name: Option<String>,
pub dependencies: Vec<Uuid>,
pub complexity_score: f64,
pub change_frequency: f64,
pub metadata: HashMap<String, serde_json::Value>,
}
impl GraphNodeTrait for CodeNode {
fn id(&self) -> Uuid { self.id }
fn content_for_embedding(&self) -> String {
format!("{} {} {}",
self.file_path,
self.function_name.as_ref().unwrap_or(&"".to_string()),
self.metadata.get("description").unwrap_or(&json!("")).as_str().unwrap_or(""))
}
fn node_type(&self) -> &str { "code_component" }
fn metadata(&self) -> &HashMap<String, serde_json::Value> { &self.metadata }
}
pub type CodeGraphSystem = UnifiedGraphSystem<CodeNode>;
impl CodeGraphSystem {
pub async fn analyze_change_impact(&self, file_path: &str) -> Result<ChangeImpactAnalysis> {
let node = self.find_by_path(file_path).await?;
let dependencies = self.get_transitive_dependencies(&node.id, 5).await?;
let similar_changes = self.find_similar_historical_changes(&node).await?;
Ok(ChangeImpactAnalysis {
affected_components: dependencies.len(),
risk_score: self.calculate_risk_score(&dependencies, &similar_changes),
similar_changes,
recommendations: self.generate_change_recommendations(&dependencies).await?,
})
}
}
Universal Graph Operations​
impl<T: GraphNodeTrait + Send + Sync> UnifiedGraphSystem<T> {
pub async fn new(
fdb: Arc<fdb::Database>,
qdrant_config: QdrantConfig,
tenant_id: Uuid,
graph_type: &str,
) -> Result<Self> {
let qdrant = Arc::new(QdrantClient::from_url(&qdrant_config.url)
.with_api_key(&qdrant_config.api_key)
.build()?);
let collection_name = format!("{}_{}", graph_type, tenant_id);
// Create Qdrant collection optimized for graph type
let vector_config = match graph_type {
"logs" => VectorParams { size: 384, distance: Distance::Cosine.into(), ..Default::default() },
"prompts" => VectorParams { size: 1536, distance: Distance::Cosine.into(), ..Default::default() },
"code" => VectorParams { size: 768, distance: Distance::Cosine.into(), ..Default::default() },
_ => VectorParams { size: 1536, distance: Distance::Cosine.into(), ..Default::default() },
};
qdrant.create_collection(&CreateCollection {
collection_name: collection_name.clone(),
vectors_config: Some(VectorsConfig {
config: Some(Config::Params(vector_config)),
}),
..Default::default()
}).await.ok();
Ok(Self {
fdb, qdrant, tenant_id,
graph_type: graph_type.to_string(),
collection_name,
embedder: Arc::new(DefaultEmbedder::new()),
config: GraphConfig::for_type(graph_type),
_phantom: std::marker::PhantomData,
})
}
pub async fn add_node(&self, mut node: T) -> Result<()> {
// Generate embedding
let embedding = self.embedder.embed(&node.content_for_embedding()).await?;
// Store in FDB (structured)
let node_key = self.build_node_key(&node.id());
let node_data = bincode::serialize(&node)?;
self.fdb.transact(|tx| {
tx.set(&node_key, &node_data);
// Create indexes based on node metadata
for (key, value) in node.metadata() {
let index_key = self.build_index_key(key, value, &node.id());
tx.set(&index_key, &[]);
}
Ok(())
}).await?;
// Store in Qdrant (semantic)
let point = PointStruct::new(
node.id().to_string(),
embedding,
json!({
"tenant_id": self.tenant_id.to_string(),
"node_type": node.node_type(),
"metadata": node.metadata(),
}),
);
self.qdrant.upsert_points_blocking(&self.collection_name, vec![point], None).await?;
Ok(())
}
pub async fn cross_domain_search(&self, query: &str, other_graphs: &[String]) -> Result<CrossDomainResults> {
let mut results = CrossDomainResults::new();
// Search in current domain
let local_results = self.semantic_search(query, 20).await?;
results.add_domain_results(&self.graph_type, local_results);
// Search in other domains (requires different graph instances)
for graph_type in other_graphs {
let cross_results = self.search_cross_domain(query, graph_type).await?;
results.add_domain_results(graph_type, cross_results);
}
// Find cross-domain patterns
results.analyze_cross_patterns().await?;
Ok(results)
}
}
Factory for Different Graph Types​
pub struct GraphSystemFactory;
impl GraphSystemFactory {
pub async fn create_log_system(
fdb: Arc<fdb::Database>,
qdrant_config: QdrantConfig,
tenant_id: Uuid,
) -> Result<LogGraphSystem> {
UnifiedGraphSystem::new(fdb, qdrant_config, tenant_id, "logs").await
}
pub async fn create_prompt_system(
fdb: Arc<fdb::Database>,
qdrant_config: QdrantConfig,
tenant_id: Uuid,
) -> Result<PromptGraphSystem> {
UnifiedGraphSystem::new(fdb, qdrant_config, tenant_id, "prompts").await
}
pub async fn create_code_system(
fdb: Arc<fdb::Database>,
qdrant_config: QdrantConfig,
tenant_id: Uuid,
) -> Result<CodeGraphSystem> {
UnifiedGraphSystem::new(fdb, qdrant_config, tenant_id, "code").await
}
}
API Integration​
REST Endpoints​
// Add to api.rs
.service(
web::scope("/graphs")
.route("/{graph_type}/search", web::post().to(handlers::graph::semantic_search))
.route("/{graph_type}/cross-search", web::post().to(handlers::graph::cross_domain_search))
.route("/{graph_type}/nodes", web::post().to(handlers::graph::add_node))
.route("/{graph_type}/patterns", web::get().to(handlers::graph::find_patterns))
.route("/insights/cross-domain", web::post().to(handlers::graph::cross_domain_insights))
)
WebSocket Integration​
// Add to websocket_server_with_memory.rs
"graph_search" => {
let graph_type = msg.data.get("graph_type").unwrap().as_str().unwrap();
let query = msg.data.get("query").unwrap().as_str().unwrap();
let results = match graph_type {
"logs" => state.log_graph.semantic_search(query, 20).await?,
"prompts" => state.prompt_graph.semantic_search(query, 20).await?,
"code" => state.code_graph.semantic_search(query, 20).await?,
_ => return Err(anyhow::anyhow!("Unknown graph type")),
};
send_success_response(&mut socket, msg.id, json!({ "results": results })).await?;
}
Performance Optimizations​
Multi-Level Caching​
pub struct GraphCache<T> {
l1_memory: Arc<RwLock<HashMap<Uuid, T>>>, // Hot data
l2_fdb: Arc<fdb::Database>, // Structured cache
l3_qdrant: Arc<QdrantClient>, // Semantic cache
}
impl<T: GraphNodeTrait> GraphCache<T> {
pub async fn get(&self, id: &Uuid) -> Result<Option<T>> {
// L1: Memory cache
if let Some(node) = self.l1_memory.read().await.get(id) {
return Ok(Some(node.clone()));
}
// L2: FDB cache with 10ms SLA
if let Some(cached) = self.get_from_fdb_cache(id).await? {
self.l1_memory.write().await.insert(*id, cached.clone());
return Ok(Some(cached));
}
// L3: Full reconstruction from Qdrant + FDB
self.reconstruct_node(id).await
}
}
Batch Operations​
impl<T: GraphNodeTrait> UnifiedGraphSystem<T> {
pub async fn batch_add_nodes(&self, nodes: Vec<T>) -> Result<()> {
let embeddings = self.embedder.batch_embed(
&nodes.iter().map(|n| n.content_for_embedding()).collect::<Vec<_>>()
).await?;
// Batch FDB transaction
self.fdb.transact(|tx| {
for (node, embedding) in nodes.iter().zip(&embeddings) {
let key = self.build_node_key(&node.id());
let value = bincode::serialize(node)?;
tx.set(&key, &value);
}
Ok(())
}).await?;
// Batch Qdrant upsert
let points: Vec<PointStruct> = nodes.iter().zip(&embeddings)
.map(|(node, emb)| PointStruct::new(
node.id().to_string(),
emb.clone(),
self.build_qdrant_payload(node),
))
.collect();
self.qdrant.upsert_points_blocking(&self.collection_name, points, None).await?;
Ok(())
}
}
Deployment Configuration​
FoundationDB Setup​
// config/fdb_graph_schema.rs
pub fn setup_graph_schema(db: &fdb::Database, tenant_id: Uuid) -> Result<()> {
// Create key spaces for each graph type
let graph_types = ["logs", "prompts", "code", "users", "projects"];
for graph_type in &graph_types {
// Node storage: {tenant_id}/graphs/{graph_type}/nodes/{node_id}
// Edge storage: {tenant_id}/graphs/{graph_type}/edges/{from_id}/{edge_type}/{to_id}
// Indexes: {tenant_id}/graphs/{graph_type}/indexes/{field}/{value}/{node_id}
}
Ok(())
}
Qdrant Cloud Setup​
# deployment/qdrant-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: qdrant-config
data:
collections.json: |
{
"logs": { "size": 384, "distance": "Cosine" },
"prompts": { "size": 1536, "distance": "Cosine" },
"code": { "size": 768, "distance": "Cosine" },
"users": { "size": 512, "distance": "Cosine" },
"projects": { "size": 1024, "distance": "Cosine" }
}
Testing Strategy​
Unit Tests (Per Graph Type)​
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_log_graph_error_patterns() {
let system = setup_log_graph().await;
// Add test error logs
let errors = create_test_error_pattern();
for error in errors {
system.add_node(error).await.unwrap();
}
// Find patterns
let patterns = system.find_error_patterns(Duration::hours(1)).await.unwrap();
assert_eq!(patterns.len(), 1);
assert_eq!(patterns[0].similar_logs.len(), 5);
}
#[tokio::test]
async fn test_prompt_graph_optimization() {
let system = setup_prompt_graph().await;
// Test prompt optimization
let optimized = system.optimize_for_intent("debug API error", "engineering").await.unwrap();
assert!(optimized.success_rate > 0.8);
assert!(optimized.techniques.contains(&"chain_of_thought".to_string()));
}
}
Cross-Domain Integration Tests​
#[tokio::test]
async fn test_cross_domain_intelligence() {
let log_system = setup_log_graph().await;
let prompt_system = setup_prompt_graph().await;
// Add authentication errors to logs
let auth_errors = create_auth_error_logs();
for error in auth_errors {
log_system.add_node(error).await.unwrap();
}
// Search for related prompts
let cross_results = prompt_system.cross_domain_search(
"authentication failure debugging",
&["logs".to_string()]
).await.unwrap();
// Should find correlation between log patterns and prompt effectiveness
assert!(!cross_results.cross_domain_insights.is_empty());
}
Performance Targets​
Latency Requirements​
- Single node retrieval: <5ms
- Semantic search: <50ms for 10k results
- Graph traversal: <100ms for 5-hop exploration
- Cross-domain queries: <500ms for complex insights
Scalability Targets​
- 100M+ nodes per graph type
- 10k+ concurrent queries
- 1TB+ of vector data per tenant
- Real-time updates with <10ms propagation
Resource Efficiency​
- Memory usage: <2GB per 10M nodes
- Storage efficiency: 60% compression vs. separate systems
- Network optimization: Batch operations reduce calls by 90%
Monitoring and Observability​
Key Metrics​
#[derive(Debug, Serialize)]
pub struct GraphMetrics {
pub nodes_count: u64,
pub edges_count: u64,
pub query_latency_p99: f64,
pub cache_hit_rate: f64,
pub cross_domain_queries_per_sec: f64,
pub semantic_search_accuracy: f64,
}
Health Checks​
pub async fn health_check(&self) -> Result<GraphHealthStatus> {
let fdb_status = self.check_fdb_connectivity().await?;
let qdrant_status = self.check_qdrant_connectivity().await?;
let performance = self.benchmark_operations().await?;
Ok(GraphHealthStatus {
fdb_healthy: fdb_status.is_ok(),
qdrant_healthy: qdrant_status.is_ok(),
query_performance: performance,
last_sync: self.get_last_sync_time().await?,
})
}
Deployment Commands​
# Deploy unified graph system
cargo build --release --bin graph_system
docker build -t coditect-graph-system .
gcloud run deploy graph-system --source .
# Initialize graph collections
curl -X POST /api/graphs/initialize \
-H "Authorization: Bearer $JWT" \
-d '{"graph_types": ["logs", "prompts", "code", "users", "projects"]}'
# Health check
curl /api/graphs/health
This unified system enables CODITECT's breakthrough cross-domain intelligence while maintaining <100ms query performance across all knowledge domains.