Vector Database Integration Analysis
1. Current System Extension Points
Existing Structure
-- Current core tables
CREATE TABLE documents (
doc_uuid VARCHAR(36) PRIMARY KEY,
filename VARCHAR(255),
created_at TIMESTAMP
);
CREATE TABLE chunks (
chunk_uuid VARCHAR(36) PRIMARY KEY,
doc_uuid VARCHAR(36),
content TEXT,
sequence_num INTEGER
);
Workflow Checklist
- Prerequisites verified
- Configuration applied
- Process executed
- Results validated
- Documentation updated
Workflow Steps
- Initialize - Set up the environment
- Configure - Apply settings
- Execute - Run the process
- Validate - Check results
- Complete - Finalize workflow
Workflow Phases
Phase 1: Initialization
Set up prerequisites and validate inputs.
Phase 2: Processing
Execute the main workflow steps.
Phase 3: Verification
Validate outputs and confirm completion.
Phase 4: Finalization
Clean up and generate reports.
2. PostgreSQL + pgvector Approach
A. Schema Extension
-- Enable pgvector extension
CREATE EXTENSION vector;
-- Add vector embeddings to chunks table
ALTER TABLE chunks
ADD COLUMN embedding vector(1536); -- For OpenAI embeddings
ADD COLUMN embedding_model VARCHAR(50);
-- Create vector index
CREATE INDEX ON chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- Add semantic search function
CREATE OR REPLACE FUNCTION semantic_chunk_search(
query_embedding vector,
similarity_threshold float,
max_results int
) RETURNS TABLE (
chunk_uuid VARCHAR,
content TEXT,
similarity float
) LANGUAGE plpgsql AS $$
BEGIN
RETURN QUERY
SELECT
c.chunk_uuid,
c.content,
1 - (c.embedding <=> query_embedding) as similarity
FROM chunks c
WHERE 1 - (c.embedding <=> query_embedding) > similarity_threshold
ORDER BY c.embedding <=> query_embedding
LIMIT max_results;
END;
$$;
B. Implementation
class PgVectorManager:
def __init__(self, db_pool, embedding_model):
self.db = db_pool
self.embedding_model = embedding_model
async def store_chunk_embedding(self, chunk_uuid: str, content: str):
"""Store chunk with its embedding"""
embedding = await self.embedding_model.embed_text(content)
await self.db.execute("""
UPDATE chunks
SET embedding = $1, embedding_model = $2
WHERE chunk_uuid = $3
""", embedding, self.embedding_model.name, chunk_uuid)
async def similar_chunks(self, query: str, threshold: float = 0.7):
"""Find similar chunks using vector similarity"""
query_embedding = await self.embedding_model.embed_text(query)
return await self.db.fetch("""
SELECT * FROM semantic_chunk_search($1, $2, 10)
""", query_embedding, threshold)
Advantages of pgvector
-
Integrated Solution
- Single database for all data
- Transactional consistency
- Familiar PostgreSQL tooling
- Cost-effective
-
Performance
- Efficient for moderate-sized collections
- Good indexing options
- Native SQL integration
Disadvantages of pgvector
- Scaling Limitations
- Less efficient for very large vector collections
- Limited to PostgreSQL's scaling capabilities
- Memory constraints for large embeddings
3. Weaviate Approach
A. Weaviate Schema
class WeaviateManager:
def __init__(self, client):
self.client = client
def create_schema(self):
"""Create Weaviate schema for document chunks"""
class_obj = {
"class": "DocumentChunk",
"vectorizer": "text2vec-openai",
"properties": [
{
"name": "chunk_uuid",
"dataType": ["string"],
"index": True
},
{
"name": "doc_uuid",
"dataType": ["string"],
"index": True
},
{
"name": "content",
"dataType": ["text"],
"vectorizer": "text2vec-openai"
},
{
"name": "sequence_num",
"dataType": ["int"],
"index": True
},
{
"name": "previous_chunk",
"dataType": ["DocumentChunk"],
"index": True
},
{
"name": "next_chunk",
"dataType": ["DocumentChunk"],
"index": True
}
]
}
self.client.schema.create_class(class_obj)
B. Hybrid Implementation
class HybridStorageManager:
def __init__(self, pg_pool, weaviate_client):
self.pg = pg_pool
self.weaviate = weaviate_client
async def store_chunk(self, chunk: Dict):
"""Store chunk in both PostgreSQL and Weaviate"""
# Store in PostgreSQL for transactional data
async with self.pg.transaction():
await self.pg.execute("""
INSERT INTO chunks (chunk_uuid, doc_uuid, content, sequence_num)
VALUES ($1, $2, $3, $4)
""", chunk['uuid'], chunk['doc_uuid'],
chunk['content'], chunk['sequence_num'])
# Store in Weaviate for vector search
self.weaviate.data_object.create(
class_name="DocumentChunk",
data_object={
"chunk_uuid": chunk['uuid'],
"doc_uuid": chunk['doc_uuid'],
"content": chunk['content'],
"sequence_num": chunk['sequence_num']
}
)
async def semantic_search(self, query: str, limit: int = 10):
"""Perform semantic search using Weaviate"""
return (
self.weaviate.query
.get("DocumentChunk", ["chunk_uuid", "content", "sequence_num"])
.with_near_text({"concepts": [query]})
.with_limit(limit)
.do()
)
Advantages of Weaviate
-
Scalability
- Built for vector search
- Efficient large-scale operations
- Advanced semantic capabilities
-
Features
- GraphQL API
- Real-time capabilities
- Advanced filtering
Disadvantages of Weaviate
- Complexity
- Separate system to maintain
- Additional infrastructure
- Learning curve
4. GraphRAG Implementation
A. Graph-Enhanced Retrieval
class GraphRAGManager:
def __init__(self, vector_store, graph_store):
self.vector_store = vector_store
self.graph_store = graph_store
async def enhanced_retrieval(self, query: str, depth: int = 2):
"""Retrieve chunks using both vector and graph relationships"""
# Initial vector search
similar_chunks = await self.vector_store.semantic_search(query)
# Graph expansion
enhanced_results = []
for chunk in similar_chunks:
# Get connected chunks from graph
related_chunks = await self.graph_store.get_connected_chunks(
chunk['chunk_uuid'],
depth=depth
)
enhanced_results.extend(related_chunks)
return self._rank_results(enhanced_results)
def _rank_results(self, chunks: List[Dict]) -> List[Dict]:
"""Rank results using both semantic and graph relevance"""
# Implementation of ranking algorithm
pass
B. Knowledge Graph Integration
class DocumentKnowledgeGraph:
def __init__(self, db_connection):
self.db = db_connection
async def build_graph(self, doc_uuid: str):
"""Build knowledge graph from document chunks"""
chunks = await self.db.fetch("""
SELECT * FROM chunks
WHERE doc_uuid = $1
ORDER BY sequence_num
""", doc_uuid)
for chunk in chunks:
# Extract entities and relationships
entities = self._extract_entities(chunk['content'])
relationships = self._extract_relationships(chunk['content'])
# Store in graph structure
await self._store_graph_elements(
chunk['chunk_uuid'],
entities,
relationships
)
async def get_connected_chunks(self, chunk_uuid: str, depth: int):
"""Get connected chunks from knowledge graph"""
return await self.db.fetch("""
WITH RECURSIVE connected_chunks AS (
SELECT c.*, 1 as depth
FROM chunks c
WHERE c.chunk_uuid = $1
UNION
SELECT c.*, cc.depth + 1
FROM chunks c
JOIN chunk_relationships cr ON c.chunk_uuid = cr.target_chunk_uuid
JOIN connected_chunks cc ON cr.source_chunk_uuid = cc.chunk_uuid
WHERE cc.depth < $2
)
SELECT DISTINCT * FROM connected_chunks
""", chunk_uuid, depth)
5. Recommendations
1. Phased Implementation
2. Recommended Approach
-
Start with pgvector
- Simplest integration
- Maintains data consistency
- Good performance for initial scale
-
Add GraphRAG capabilities
- Enhance retrieval quality
- Maintain chunk relationships
- Enable context-aware search
-
Monitor and Scale
- Track performance metrics
- Evaluate need for Weaviate
- Adjust based on usage patterns
3. Implementation Priority
- Vector embeddings for semantic search
- Basic RAG implementation
- Graph relationships for chunks
- Advanced GraphRAG features
- Scale-based migration decisions
Would you like me to:
- Detail the pgvector implementation?
- Create the GraphRAG structures?
- Design the monitoring system?
- Develop the scaling strategy?