Skip to main content

Database Schema Analysis and Implementation Plan

1. Core Schema Structure

A. Base Tables

-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS btree_gist;

-- Document management
CREATE TABLE documents (
doc_uuid UUID DEFAULT uuid_generate_v4() PRIMARY KEY,
filename TEXT NOT NULL,
mime_type TEXT NOT NULL,
file_size BIGINT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP WITH TIME ZONE,
status TEXT NOT NULL DEFAULT 'pending',
metadata JSONB,

-- Constraints
CONSTRAINT valid_status CHECK (status IN ('pending', 'processing', 'completed', 'failed')),
CONSTRAINT valid_file_size CHECK (file_size > 0)
);

-- Chunk management
CREATE TABLE chunks (
chunk_uuid UUID DEFAULT uuid_generate_v4() PRIMARY KEY,
doc_uuid UUID NOT NULL REFERENCES documents(doc_uuid),
sequence_num INTEGER NOT NULL,
content TEXT NOT NULL,
start_offset INTEGER NOT NULL,
end_offset INTEGER NOT NULL,
embedding vector(1536),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,

-- Constraints
CONSTRAINT valid_offsets CHECK (end_offset > start_offset),
CONSTRAINT unique_chunk_sequence UNIQUE (doc_uuid, sequence_num)
);

-- Chunk relationships
CREATE TABLE chunk_relationships (
source_chunk_uuid UUID NOT NULL REFERENCES chunks(chunk_uuid),
target_chunk_uuid UUID NOT NULL REFERENCES chunks(chunk_uuid),
relationship_type TEXT NOT NULL,
weight FLOAT NOT NULL DEFAULT 1.0,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (source_chunk_uuid, target_chunk_uuid, relationship_type),
CONSTRAINT valid_relationship CHECK (source_chunk_uuid != target_chunk_uuid),
CONSTRAINT valid_weight CHECK (weight BETWEEN 0.0 AND 1.0)
);

B. Processing State Management

-- Processing state tracking
CREATE TABLE processing_jobs (
job_uuid UUID DEFAULT uuid_generate_v4() PRIMARY KEY,
doc_uuid UUID NOT NULL REFERENCES documents(doc_uuid),
job_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP WITH TIME ZONE,
error_message TEXT,
attempts INTEGER NOT NULL DEFAULT 0,

CONSTRAINT valid_job_status CHECK (status IN ('pending', 'processing', 'completed', 'failed')),
CONSTRAINT valid_attempts CHECK (attempts >= 0)
);

-- Processing metrics
CREATE TABLE processing_metrics (
metric_uuid UUID DEFAULT uuid_generate_v4() PRIMARY KEY,
job_uuid UUID NOT NULL REFERENCES processing_jobs(job_uuid),
metric_type TEXT NOT NULL,
metric_value JSONB NOT NULL,
recorded_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

C. Vector Search Optimization

-- Create optimized vector search index
CREATE INDEX ON chunks USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- Create text search configuration
CREATE TEXT SEARCH CONFIGURATION chunk_text (COPY = english);

-- Create text search index
CREATE INDEX chunks_content_idx ON chunks
USING GIN (to_tsvector('chunk_text', content));

Workflow Checklist

  • Prerequisites verified
  • Configuration applied
  • Process executed
  • Results validated
  • Documentation updated

Workflow Steps

  1. Initialize - Set up the environment
  2. Configure - Apply settings
  3. Execute - Run the process
  4. Validate - Check results
  5. Complete - Finalize workflow

2. State Management Functions

A. Document Processing State

CREATE TYPE document_state AS (
doc_uuid UUID,
status TEXT,
total_chunks INTEGER,
processed_chunks INTEGER,
failed_chunks INTEGER
);

CREATE FUNCTION get_document_state(p_doc_uuid UUID)
RETURNS document_state
LANGUAGE plpgsql
AS $$
DECLARE
result document_state;
BEGIN
SELECT
d.doc_uuid,
d.status,
COUNT(c.chunk_uuid) as total_chunks,
COUNT(c.embedding) as processed_chunks,
COUNT(CASE WHEN j.status = 'failed' THEN 1 END) as failed_chunks
INTO result
FROM documents d
LEFT JOIN chunks c ON d.doc_uuid = c.doc_uuid
LEFT JOIN processing_jobs j ON c.chunk_uuid = j.doc_uuid
WHERE d.doc_uuid = p_doc_uuid
GROUP BY d.doc_uuid, d.status;

RETURN result;
END;
$$;

B. Chunk Relationship Management

CREATE FUNCTION create_chunk_relationship(
p_source_uuid UUID,
p_target_uuid UUID,
p_type TEXT,
p_weight FLOAT
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO chunk_relationships (
source_chunk_uuid,
target_chunk_uuid,
relationship_type,
weight
)
VALUES (
p_source_uuid,
p_target_uuid,
p_type,
p_weight
)
ON CONFLICT (source_chunk_uuid, target_chunk_uuid, relationship_type)
DO UPDATE SET weight = p_weight;
END;
$$;

3. Search Implementation

A. Vector Search Function

CREATE FUNCTION semantic_search(
query_embedding vector,
similarity_threshold FLOAT,
max_results INTEGER
)
RETURNS TABLE (
chunk_uuid UUID,
content TEXT,
similarity FLOAT,
doc_uuid UUID,
sequence_num INTEGER
)
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY
SELECT
c.chunk_uuid,
c.content,
1 - (c.embedding <=> query_embedding) as similarity,
c.doc_uuid,
c.sequence_num
FROM chunks c
WHERE 1 - (c.embedding <=> query_embedding) > similarity_threshold
ORDER BY c.embedding <=> query_embedding
LIMIT max_results;
END;
$$;

B. Hybrid Search Function

CREATE FUNCTION hybrid_search(
query_text TEXT,
query_embedding vector,
similarity_threshold FLOAT,
max_results INTEGER
)
RETURNS TABLE (
chunk_uuid UUID,
content TEXT,
similarity FLOAT,
text_rank FLOAT,
combined_score FLOAT
)
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY
WITH vector_results AS (
SELECT
c.chunk_uuid,
c.content,
1 - (c.embedding <=> query_embedding) as similarity
FROM chunks c
WHERE 1 - (c.embedding <=> query_embedding) > similarity_threshold
),
text_results AS (
SELECT
c.chunk_uuid,
c.content,
ts_rank(to_tsvector('chunk_text', c.content),
plainto_tsquery('chunk_text', query_text)) as text_rank
FROM chunks c
WHERE to_tsvector('chunk_text', c.content) @@
plainto_tsquery('chunk_text', query_text)
)
SELECT
COALESCE(v.chunk_uuid, t.chunk_uuid) as chunk_uuid,
COALESCE(v.content, t.content) as content,
COALESCE(v.similarity, 0) as similarity,
COALESCE(t.text_rank, 0) as text_rank,
COALESCE(v.similarity, 0) * 0.7 + COALESCE(t.text_rank, 0) * 0.3 as combined_score
FROM vector_results v
FULL OUTER JOIN text_results t ON v.chunk_uuid = t.chunk_uuid
ORDER BY combined_score DESC
LIMIT max_results;
END;
$$;

4. Implementation Plan

Phase 1: Core Schema

  1. Initialize extensions
  2. Create base tables
  3. Implement basic constraints
  4. Create initial indices

Phase 2: State Management

  1. Implement processing jobs
  2. Create metrics tracking
  3. Add state management functions
  4. Implement monitoring views

Phase 3: Search Implementation

  1. Create vector indices
  2. Implement basic search
  3. Add hybrid search
  4. Optimize query performance

Phase 4: Relationship Management

  1. Implement relationship tracking
  2. Add graph traversal functions
  3. Create relationship indices
  4. Optimize graph queries

5. Monitoring and Maintenance

A. Health Check Views

CREATE VIEW system_health AS
SELECT
(SELECT COUNT(*) FROM documents) as total_documents,
(SELECT COUNT(*) FROM chunks) as total_chunks,
(SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) as vectorized_chunks,
(SELECT COUNT(*) FROM processing_jobs WHERE status = 'failed') as failed_jobs,
(SELECT AVG(EXTRACT(EPOCH FROM (completed_at - started_at)))
FROM processing_jobs
WHERE status = 'completed') as avg_processing_time;

B. Performance Monitoring

CREATE VIEW performance_metrics AS
SELECT
date_trunc('hour', created_at) as time_bucket,
COUNT(*) as processed_chunks,
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_processing_time,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failures
FROM processing_jobs
GROUP BY date_trunc('hour', created_at)
ORDER BY time_bucket DESC;

Would you like me to:

  1. Detail the implementation of any specific component?
  2. Create the Python classes for database interaction?
  3. Design the monitoring and alerting system?
  4. Develop the testing strategy?

This schema provides a robust foundation for both current needs and future scaling.