Skip to main content

ADR-021-v4: Unified Hybrid Graph System (Part 2: Technical)

Document: ADR-021-v4-unified-hybrid-graph-system-part2-technical
Version: 2.0.0
Purpose: Complete technical implementation for unified hybrid graph system
Audience: Developers, AI agents implementing graph system
Date Created: 2025-08-30
Date Modified: 2025-09-01
QA Reviewed: Pending
Status: DRAFT
Supersedes: v1.0.0

Table of Contents​

1. Document Information 🔴 REQUIRED​

FieldValue
ADR NumberADR-021
TitleUnified Hybrid Graph System - Technical
StatusDraft
Date Created2025-08-30
Last Modified2025-09-01
Version2.0.0
DependenciesFoundationDB 7.1+, Qdrant 1.7+, Rust 1.75+

2. Implementation Overview 🔴 REQUIRED​

2.1 Dependencies​

# cargo.toml
[dependencies]
foundationdb = "0.8.0"
qdrant-client = "1.7.0"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
uuid = { version = "1.6", features = ["v4", "serde"] }
anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] }
tracing = "0.1"

2.2 Module Structure​

// src/graph/mod.rs
pub mod traits;
pub mod engine;
pub mod domains;
pub mod storage;
pub mod query;

↑ Back to Top

3. Core Graph Abstraction 🔴 REQUIRED​

Core Abstraction: UnifiedGraphSystem<T>​

//! Universal hybrid graph system for any knowledge domain
use anyhow::Result;
use foundationdb as fdb;
use qdrant_client::prelude::*;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;

pub trait GraphNodeTrait: Clone + Serialize + for<'de> Deserialize<'de> {
fn id(&self) -> Uuid;
fn content_for_embedding(&self) -> String;
fn node_type(&self) -> &str;
fn metadata(&self) -> &HashMap<String, serde_json::Value>;
}

pub struct UnifiedGraphSystem<T: GraphNodeTrait> {
fdb: Arc<fdb::Database>,
qdrant: Arc<QdrantClient>,
tenant_id: Uuid,
graph_type: String,
collection_name: String,
embedder: Arc<dyn Embedder>,
config: GraphConfig,
_phantom: std::marker::PhantomData<T>,
}

#[derive(Clone)]
pub struct GraphConfig {
pub embedding_dimension: usize,
pub cache_size: usize,
pub batch_size: usize,
pub semantic_threshold: f64,
}

Concrete Implementations​

1. Log Graph System​

use chrono::{DateTime, Utc};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogNode {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub level: LogLevel,
pub message: String,
pub service: String,
pub metadata: HashMap<String, serde_json::Value>,
}

impl GraphNodeTrait for LogNode {
fn id(&self) -> Uuid { self.id }
fn content_for_embedding(&self) -> String {
format!("{} {} {}", self.message, self.service,
self.metadata.get("error_code").unwrap_or(&json!("")).as_str().unwrap_or(""))
}
fn node_type(&self) -> &str { "log_entry" }
fn metadata(&self) -> &HashMap<String, serde_json::Value> { &self.metadata }
}

pub type LogGraphSystem = UnifiedGraphSystem<LogNode>;

impl LogGraphSystem {
pub async fn find_error_patterns(&self, time_window: Duration) -> Result<Vec<ErrorPattern>> {
let query = GraphQuery::new()
.time_range(Utc::now() - time_window, Utc::now())
.node_filter("level", vec!["Error", "Critical"])
.semantic_clustering(0.8, 10);

self.query_with_clustering(query).await
}

pub async fn predict_failures(&self, service: &str) -> Result<Vec<FailurePrediction>> {
let recent_patterns = self.find_error_patterns(Duration::hours(24)).await?;
let historical_chains = self.find_escalation_chains(service, Duration::days(30)).await?;

// ML logic for prediction based on patterns
Ok(vec![])
}
}

2. Prompt Graph System​

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PromptNode {
pub id: Uuid,
pub template: String,
pub techniques: Vec<String>,
pub domain: String,
pub success_rate: f64,
pub token_efficiency: f64,
pub metadata: HashMap<String, serde_json::Value>,
}

impl GraphNodeTrait for PromptNode {
fn id(&self) -> Uuid { self.id }
fn content_for_embedding(&self) -> String {
format!("{} {} {}", self.template, self.domain, self.techniques.join(" "))
}
fn node_type(&self) -> &str { "prompt_template" }
fn metadata(&self) -> &HashMap<String, serde_json::Value> { &self.metadata }
}

pub type PromptGraphSystem = UnifiedGraphSystem<PromptNode>;

impl PromptGraphSystem {
pub async fn optimize_for_intent(&self, intent: &str, domain: &str) -> Result<PromptNode> {
let query = GraphQuery::new()
.semantic_search(intent, 20)
.metadata_filter("domain", domain)
.min_performance(0.8);

let candidates = self.hybrid_search(query).await?;
self.select_optimal_prompt(candidates).await
}

pub async fn learn_from_feedback(&self, prompt_id: Uuid, success: bool, tokens_used: usize) -> Result<()> {
self.update_performance_metrics(prompt_id, success, tokens_used).await?;
self.trigger_pattern_learning(prompt_id).await?;
Ok(())
}
}

3. Code Graph System​

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CodeNode {
pub id: Uuid,
pub file_path: String,
pub function_name: Option<String>,
pub dependencies: Vec<Uuid>,
pub complexity_score: f64,
pub change_frequency: f64,
pub metadata: HashMap<String, serde_json::Value>,
}

impl GraphNodeTrait for CodeNode {
fn id(&self) -> Uuid { self.id }
fn content_for_embedding(&self) -> String {
format!("{} {} {}",
self.file_path,
self.function_name.as_ref().unwrap_or(&"".to_string()),
self.metadata.get("description").unwrap_or(&json!("")).as_str().unwrap_or(""))
}
fn node_type(&self) -> &str { "code_component" }
fn metadata(&self) -> &HashMap<String, serde_json::Value> { &self.metadata }
}

pub type CodeGraphSystem = UnifiedGraphSystem<CodeNode>;

impl CodeGraphSystem {
pub async fn analyze_change_impact(&self, file_path: &str) -> Result<ChangeImpactAnalysis> {
let node = self.find_by_path(file_path).await?;
let dependencies = self.get_transitive_dependencies(&node.id, 5).await?;
let similar_changes = self.find_similar_historical_changes(&node).await?;

Ok(ChangeImpactAnalysis {
affected_components: dependencies.len(),
risk_score: self.calculate_risk_score(&dependencies, &similar_changes),
similar_changes,
recommendations: self.generate_change_recommendations(&dependencies).await?,
})
}
}

Universal Graph Operations​

impl<T: GraphNodeTrait + Send + Sync> UnifiedGraphSystem<T> {
pub async fn new(
fdb: Arc<fdb::Database>,
qdrant_config: QdrantConfig,
tenant_id: Uuid,
graph_type: &str,
) -> Result<Self> {
let qdrant = Arc::new(QdrantClient::from_url(&qdrant_config.url)
.with_api_key(&qdrant_config.api_key)
.build()?);

let collection_name = format!("{}_{}", graph_type, tenant_id);

// Create Qdrant collection optimized for graph type
let vector_config = match graph_type {
"logs" => VectorParams { size: 384, distance: Distance::Cosine.into(), ..Default::default() },
"prompts" => VectorParams { size: 1536, distance: Distance::Cosine.into(), ..Default::default() },
"code" => VectorParams { size: 768, distance: Distance::Cosine.into(), ..Default::default() },
_ => VectorParams { size: 1536, distance: Distance::Cosine.into(), ..Default::default() },
};

qdrant.create_collection(&CreateCollection {
collection_name: collection_name.clone(),
vectors_config: Some(VectorsConfig {
config: Some(Config::Params(vector_config)),
}),
..Default::default()
}).await.ok();

Ok(Self {
fdb, qdrant, tenant_id,
graph_type: graph_type.to_string(),
collection_name,
embedder: Arc::new(DefaultEmbedder::new()),
config: GraphConfig::for_type(graph_type),
_phantom: std::marker::PhantomData,
})
}

pub async fn add_node(&self, mut node: T) -> Result<()> {
// Generate embedding
let embedding = self.embedder.embed(&node.content_for_embedding()).await?;

// Store in FDB (structured)
let node_key = self.build_node_key(&node.id());
let node_data = bincode::serialize(&node)?;

self.fdb.transact(|tx| {
tx.set(&node_key, &node_data);
// Create indexes based on node metadata
for (key, value) in node.metadata() {
let index_key = self.build_index_key(key, value, &node.id());
tx.set(&index_key, &[]);
}
Ok(())
}).await?;

// Store in Qdrant (semantic)
let point = PointStruct::new(
node.id().to_string(),
embedding,
json!({
"tenant_id": self.tenant_id.to_string(),
"node_type": node.node_type(),
"metadata": node.metadata(),
}),
);

self.qdrant.upsert_points_blocking(&self.collection_name, vec![point], None).await?;

Ok(())
}

pub async fn cross_domain_search(&self, query: &str, other_graphs: &[String]) -> Result<CrossDomainResults> {
let mut results = CrossDomainResults::new();

// Search in current domain
let local_results = self.semantic_search(query, 20).await?;
results.add_domain_results(&self.graph_type, local_results);

// Search in other domains (requires different graph instances)
for graph_type in other_graphs {
let cross_results = self.search_cross_domain(query, graph_type).await?;
results.add_domain_results(graph_type, cross_results);
}

// Find cross-domain patterns
results.analyze_cross_patterns().await?;

Ok(results)
}
}

Factory for Different Graph Types​

pub struct GraphSystemFactory;

impl GraphSystemFactory {
pub async fn create_log_system(
fdb: Arc<fdb::Database>,
qdrant_config: QdrantConfig,
tenant_id: Uuid,
) -> Result<LogGraphSystem> {
UnifiedGraphSystem::new(fdb, qdrant_config, tenant_id, "logs").await
}

pub async fn create_prompt_system(
fdb: Arc<fdb::Database>,
qdrant_config: QdrantConfig,
tenant_id: Uuid,
) -> Result<PromptGraphSystem> {
UnifiedGraphSystem::new(fdb, qdrant_config, tenant_id, "prompts").await
}

pub async fn create_code_system(
fdb: Arc<fdb::Database>,
qdrant_config: QdrantConfig,
tenant_id: Uuid,
) -> Result<CodeGraphSystem> {
UnifiedGraphSystem::new(fdb, qdrant_config, tenant_id, "code").await
}
}

API Integration​

REST Endpoints​

// Add to api.rs
.service(
web::scope("/graphs")
.route("/{graph_type}/search", web::post().to(handlers::graph::semantic_search))
.route("/{graph_type}/cross-search", web::post().to(handlers::graph::cross_domain_search))
.route("/{graph_type}/nodes", web::post().to(handlers::graph::add_node))
.route("/{graph_type}/patterns", web::get().to(handlers::graph::find_patterns))
.route("/insights/cross-domain", web::post().to(handlers::graph::cross_domain_insights))
)

WebSocket Integration​

// Add to websocket_server_with_memory.rs
"graph_search" => {
let graph_type = msg.data.get("graph_type").unwrap().as_str().unwrap();
let query = msg.data.get("query").unwrap().as_str().unwrap();

let results = match graph_type {
"logs" => state.log_graph.semantic_search(query, 20).await?,
"prompts" => state.prompt_graph.semantic_search(query, 20).await?,
"code" => state.code_graph.semantic_search(query, 20).await?,
_ => return Err(anyhow::anyhow!("Unknown graph type")),
};

send_success_response(&mut socket, msg.id, json!({ "results": results })).await?;
}

Performance Optimizations​

Multi-Level Caching​

pub struct GraphCache<T> {
l1_memory: Arc<RwLock<HashMap<Uuid, T>>>, // Hot data
l2_fdb: Arc<fdb::Database>, // Structured cache
l3_qdrant: Arc<QdrantClient>, // Semantic cache
}

impl<T: GraphNodeTrait> GraphCache<T> {
pub async fn get(&self, id: &Uuid) -> Result<Option<T>> {
// L1: Memory cache
if let Some(node) = self.l1_memory.read().await.get(id) {
return Ok(Some(node.clone()));
}

// L2: FDB cache with 10ms SLA
if let Some(cached) = self.get_from_fdb_cache(id).await? {
self.l1_memory.write().await.insert(*id, cached.clone());
return Ok(Some(cached));
}

// L3: Full reconstruction from Qdrant + FDB
self.reconstruct_node(id).await
}
}

Batch Operations​

impl<T: GraphNodeTrait> UnifiedGraphSystem<T> {
pub async fn batch_add_nodes(&self, nodes: Vec<T>) -> Result<()> {
let embeddings = self.embedder.batch_embed(
&nodes.iter().map(|n| n.content_for_embedding()).collect::<Vec<_>>()
).await?;

// Batch FDB transaction
self.fdb.transact(|tx| {
for (node, embedding) in nodes.iter().zip(&embeddings) {
let key = self.build_node_key(&node.id());
let value = bincode::serialize(node)?;
tx.set(&key, &value);
}
Ok(())
}).await?;

// Batch Qdrant upsert
let points: Vec<PointStruct> = nodes.iter().zip(&embeddings)
.map(|(node, emb)| PointStruct::new(
node.id().to_string(),
emb.clone(),
self.build_qdrant_payload(node),
))
.collect();

self.qdrant.upsert_points_blocking(&self.collection_name, points, None).await?;

Ok(())
}
}

Deployment Configuration​

FoundationDB Setup​

// config/fdb_graph_schema.rs
pub fn setup_graph_schema(db: &fdb::Database, tenant_id: Uuid) -> Result<()> {
// Create key spaces for each graph type
let graph_types = ["logs", "prompts", "code", "users", "projects"];

for graph_type in &graph_types {
// Node storage: {tenant_id}/graphs/{graph_type}/nodes/{node_id}
// Edge storage: {tenant_id}/graphs/{graph_type}/edges/{from_id}/{edge_type}/{to_id}
// Indexes: {tenant_id}/graphs/{graph_type}/indexes/{field}/{value}/{node_id}
}

Ok(())
}

Qdrant Cloud Setup​

# deployment/qdrant-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: qdrant-config
data:
collections.json: |
{
"logs": { "size": 384, "distance": "Cosine" },
"prompts": { "size": 1536, "distance": "Cosine" },
"code": { "size": 768, "distance": "Cosine" },
"users": { "size": 512, "distance": "Cosine" },
"projects": { "size": 1024, "distance": "Cosine" }
}

Testing Strategy​

Unit Tests (Per Graph Type)​

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_log_graph_error_patterns() {
let system = setup_log_graph().await;

// Add test error logs
let errors = create_test_error_pattern();
for error in errors {
system.add_node(error).await.unwrap();
}

// Find patterns
let patterns = system.find_error_patterns(Duration::hours(1)).await.unwrap();
assert_eq!(patterns.len(), 1);
assert_eq!(patterns[0].similar_logs.len(), 5);
}

#[tokio::test]
async fn test_prompt_graph_optimization() {
let system = setup_prompt_graph().await;

// Test prompt optimization
let optimized = system.optimize_for_intent("debug API error", "engineering").await.unwrap();
assert!(optimized.success_rate > 0.8);
assert!(optimized.techniques.contains(&"chain_of_thought".to_string()));
}
}

Cross-Domain Integration Tests​

#[tokio::test]
async fn test_cross_domain_intelligence() {
let log_system = setup_log_graph().await;
let prompt_system = setup_prompt_graph().await;

// Add authentication errors to logs
let auth_errors = create_auth_error_logs();
for error in auth_errors {
log_system.add_node(error).await.unwrap();
}

// Search for related prompts
let cross_results = prompt_system.cross_domain_search(
"authentication failure debugging",
&["logs".to_string()]
).await.unwrap();

// Should find correlation between log patterns and prompt effectiveness
assert!(!cross_results.cross_domain_insights.is_empty());
}

Performance Targets​

Latency Requirements​

  • Single node retrieval: <5ms
  • Semantic search: <50ms for 10k results
  • Graph traversal: <100ms for 5-hop exploration
  • Cross-domain queries: <500ms for complex insights

Scalability Targets​

  • 100M+ nodes per graph type
  • 10k+ concurrent queries
  • 1TB+ of vector data per tenant
  • Real-time updates with <10ms propagation

Resource Efficiency​

  • Memory usage: <2GB per 10M nodes
  • Storage efficiency: 60% compression vs. separate systems
  • Network optimization: Batch operations reduce calls by 90%

Monitoring and Observability​

Key Metrics​

#[derive(Debug, Serialize)]
pub struct GraphMetrics {
pub nodes_count: u64,
pub edges_count: u64,
pub query_latency_p99: f64,
pub cache_hit_rate: f64,
pub cross_domain_queries_per_sec: f64,
pub semantic_search_accuracy: f64,
}

Health Checks​

pub async fn health_check(&self) -> Result<GraphHealthStatus> {
let fdb_status = self.check_fdb_connectivity().await?;
let qdrant_status = self.check_qdrant_connectivity().await?;
let performance = self.benchmark_operations().await?;

Ok(GraphHealthStatus {
fdb_healthy: fdb_status.is_ok(),
qdrant_healthy: qdrant_status.is_ok(),
query_performance: performance,
last_sync: self.get_last_sync_time().await?,
})
}

Deployment Commands​

# Deploy unified graph system
cargo build --release --bin graph_system
docker build -t coditect-graph-system .
gcloud run deploy graph-system --source .

# Initialize graph collections
curl -X POST /api/graphs/initialize \
-H "Authorization: Bearer $JWT" \
-d '{"graph_types": ["logs", "prompts", "code", "users", "projects"]}'

# Health check
curl /api/graphs/health

This unified system enables CODITECT's breakthrough cross-domain intelligence while maintaining <100ms query performance across all knowledge domains.