Skip to main content

Vector Database Integration Analysis

1. Current System Extension Points

Existing Structure

-- Current core tables
CREATE TABLE documents (
doc_uuid VARCHAR(36) PRIMARY KEY,
filename VARCHAR(255),
created_at TIMESTAMP
);

CREATE TABLE chunks (
chunk_uuid VARCHAR(36) PRIMARY KEY,
doc_uuid VARCHAR(36),
content TEXT,
sequence_num INTEGER
);

Workflow Checklist

  • Prerequisites verified
  • Configuration applied
  • Process executed
  • Results validated
  • Documentation updated

Workflow Steps

  1. Initialize - Set up the environment
  2. Configure - Apply settings
  3. Execute - Run the process
  4. Validate - Check results
  5. Complete - Finalize workflow

Workflow Phases

Phase 1: Initialization

Set up prerequisites and validate inputs.

Phase 2: Processing

Execute the main workflow steps.

Phase 3: Verification

Validate outputs and confirm completion.

Phase 4: Finalization

Clean up and generate reports.

2. PostgreSQL + pgvector Approach

A. Schema Extension

-- Enable pgvector extension
CREATE EXTENSION vector;

-- Add vector embeddings to chunks table
ALTER TABLE chunks
ADD COLUMN embedding vector(1536); -- For OpenAI embeddings
ADD COLUMN embedding_model VARCHAR(50);

-- Create vector index
CREATE INDEX ON chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- Add semantic search function
CREATE OR REPLACE FUNCTION semantic_chunk_search(
query_embedding vector,
similarity_threshold float,
max_results int
) RETURNS TABLE (
chunk_uuid VARCHAR,
content TEXT,
similarity float
) LANGUAGE plpgsql AS $$
BEGIN
RETURN QUERY
SELECT
c.chunk_uuid,
c.content,
1 - (c.embedding <=> query_embedding) as similarity
FROM chunks c
WHERE 1 - (c.embedding <=> query_embedding) > similarity_threshold
ORDER BY c.embedding <=> query_embedding
LIMIT max_results;
END;
$$;

B. Implementation

class PgVectorManager:
def __init__(self, db_pool, embedding_model):
self.db = db_pool
self.embedding_model = embedding_model

async def store_chunk_embedding(self, chunk_uuid: str, content: str):
"""Store chunk with its embedding"""
embedding = await self.embedding_model.embed_text(content)

await self.db.execute("""
UPDATE chunks
SET embedding = $1, embedding_model = $2
WHERE chunk_uuid = $3
""", embedding, self.embedding_model.name, chunk_uuid)

async def similar_chunks(self, query: str, threshold: float = 0.7):
"""Find similar chunks using vector similarity"""
query_embedding = await self.embedding_model.embed_text(query)

return await self.db.fetch("""
SELECT * FROM semantic_chunk_search($1, $2, 10)
""", query_embedding, threshold)

Advantages of pgvector

  1. Integrated Solution

    • Single database for all data
    • Transactional consistency
    • Familiar PostgreSQL tooling
    • Cost-effective
  2. Performance

    • Efficient for moderate-sized collections
    • Good indexing options
    • Native SQL integration

Disadvantages of pgvector

  1. Scaling Limitations
    • Less efficient for very large vector collections
    • Limited to PostgreSQL's scaling capabilities
    • Memory constraints for large embeddings

3. Weaviate Approach

A. Weaviate Schema

class WeaviateManager:
def __init__(self, client):
self.client = client

def create_schema(self):
"""Create Weaviate schema for document chunks"""
class_obj = {
"class": "DocumentChunk",
"vectorizer": "text2vec-openai",
"properties": [
{
"name": "chunk_uuid",
"dataType": ["string"],
"index": True
},
{
"name": "doc_uuid",
"dataType": ["string"],
"index": True
},
{
"name": "content",
"dataType": ["text"],
"vectorizer": "text2vec-openai"
},
{
"name": "sequence_num",
"dataType": ["int"],
"index": True
},
{
"name": "previous_chunk",
"dataType": ["DocumentChunk"],
"index": True
},
{
"name": "next_chunk",
"dataType": ["DocumentChunk"],
"index": True
}
]
}
self.client.schema.create_class(class_obj)

B. Hybrid Implementation

class HybridStorageManager:
def __init__(self, pg_pool, weaviate_client):
self.pg = pg_pool
self.weaviate = weaviate_client

async def store_chunk(self, chunk: Dict):
"""Store chunk in both PostgreSQL and Weaviate"""
# Store in PostgreSQL for transactional data
async with self.pg.transaction():
await self.pg.execute("""
INSERT INTO chunks (chunk_uuid, doc_uuid, content, sequence_num)
VALUES ($1, $2, $3, $4)
""", chunk['uuid'], chunk['doc_uuid'],
chunk['content'], chunk['sequence_num'])

# Store in Weaviate for vector search
self.weaviate.data_object.create(
class_name="DocumentChunk",
data_object={
"chunk_uuid": chunk['uuid'],
"doc_uuid": chunk['doc_uuid'],
"content": chunk['content'],
"sequence_num": chunk['sequence_num']
}
)

async def semantic_search(self, query: str, limit: int = 10):
"""Perform semantic search using Weaviate"""
return (
self.weaviate.query
.get("DocumentChunk", ["chunk_uuid", "content", "sequence_num"])
.with_near_text({"concepts": [query]})
.with_limit(limit)
.do()
)

Advantages of Weaviate

  1. Scalability

    • Built for vector search
    • Efficient large-scale operations
    • Advanced semantic capabilities
  2. Features

    • GraphQL API
    • Real-time capabilities
    • Advanced filtering

Disadvantages of Weaviate

  1. Complexity
    • Separate system to maintain
    • Additional infrastructure
    • Learning curve

4. GraphRAG Implementation

A. Graph-Enhanced Retrieval

class GraphRAGManager:
def __init__(self, vector_store, graph_store):
self.vector_store = vector_store
self.graph_store = graph_store

async def enhanced_retrieval(self, query: str, depth: int = 2):
"""Retrieve chunks using both vector and graph relationships"""
# Initial vector search
similar_chunks = await self.vector_store.semantic_search(query)

# Graph expansion
enhanced_results = []
for chunk in similar_chunks:
# Get connected chunks from graph
related_chunks = await self.graph_store.get_connected_chunks(
chunk['chunk_uuid'],
depth=depth
)
enhanced_results.extend(related_chunks)

return self._rank_results(enhanced_results)

def _rank_results(self, chunks: List[Dict]) -> List[Dict]:
"""Rank results using both semantic and graph relevance"""
# Implementation of ranking algorithm
pass

B. Knowledge Graph Integration

class DocumentKnowledgeGraph:
def __init__(self, db_connection):
self.db = db_connection

async def build_graph(self, doc_uuid: str):
"""Build knowledge graph from document chunks"""
chunks = await self.db.fetch("""
SELECT * FROM chunks
WHERE doc_uuid = $1
ORDER BY sequence_num
""", doc_uuid)

for chunk in chunks:
# Extract entities and relationships
entities = self._extract_entities(chunk['content'])
relationships = self._extract_relationships(chunk['content'])

# Store in graph structure
await self._store_graph_elements(
chunk['chunk_uuid'],
entities,
relationships
)

async def get_connected_chunks(self, chunk_uuid: str, depth: int):
"""Get connected chunks from knowledge graph"""
return await self.db.fetch("""
WITH RECURSIVE connected_chunks AS (
SELECT c.*, 1 as depth
FROM chunks c
WHERE c.chunk_uuid = $1

UNION

SELECT c.*, cc.depth + 1
FROM chunks c
JOIN chunk_relationships cr ON c.chunk_uuid = cr.target_chunk_uuid
JOIN connected_chunks cc ON cr.source_chunk_uuid = cc.chunk_uuid
WHERE cc.depth < $2
)
SELECT DISTINCT * FROM connected_chunks
""", chunk_uuid, depth)

5. Recommendations

1. Phased Implementation

  1. Start with pgvector

    • Simplest integration
    • Maintains data consistency
    • Good performance for initial scale
  2. Add GraphRAG capabilities

    • Enhance retrieval quality
    • Maintain chunk relationships
    • Enable context-aware search
  3. Monitor and Scale

    • Track performance metrics
    • Evaluate need for Weaviate
    • Adjust based on usage patterns

3. Implementation Priority

  1. Vector embeddings for semantic search
  2. Basic RAG implementation
  3. Graph relationships for chunks
  4. Advanced GraphRAG features
  5. Scale-based migration decisions

Would you like me to:

  1. Detail the pgvector implementation?
  2. Create the GraphRAG structures?
  3. Design the monitoring system?
  4. Develop the scaling strategy?