Skip to main content

System Architecture Design Document

Part 4: Data Architecture

Workflow Checklist

  • Prerequisites verified
  • Configuration applied
  • Process executed
  • Results validated
  • Documentation updated

Workflow Steps

  1. Initialize - Set up the environment
  2. Configure - Apply settings
  3. Execute - Run the process
  4. Validate - Check results
  5. Complete - Finalize workflow

Workflow Phases

Phase 1: Initialization

Set up prerequisites and validate inputs.

Phase 2: Processing

Execute the main workflow steps.

Phase 3: Verification

Validate outputs and confirm completion.

Phase 4: Finalization

Clean up and generate reports.

1. Data Storage Architecture

1.1 Storage Layer Overview

1.2 Storage Components

A. Primary Storage (PostgreSQL)

-- Core document storage
CREATE TABLE documents (
doc_uuid UUID PRIMARY KEY,
filename TEXT NOT NULL,
mime_type TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
metadata JSONB,
status TEXT NOT NULL,

CONSTRAINT valid_status CHECK (status IN (
'pending', 'processing', 'completed', 'failed'
))
);

-- Chunk storage
CREATE TABLE chunks (
chunk_uuid UUID PRIMARY KEY,
doc_uuid UUID REFERENCES documents(doc_uuid),
content TEXT NOT NULL,
sequence_num INTEGER NOT NULL,
start_offset INTEGER NOT NULL,
end_offset INTEGER NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT valid_offsets CHECK (end_offset > start_offset),
CONSTRAINT unique_chunk_sequence UNIQUE (doc_uuid, sequence_num)
);

-- Relationships
CREATE TABLE chunk_relationships (
source_uuid UUID REFERENCES chunks(chunk_uuid),
target_uuid UUID REFERENCES chunks(chunk_uuid),
relationship_type TEXT NOT NULL,
weight FLOAT NOT NULL,
metadata JSONB,

PRIMARY KEY (source_uuid, target_uuid, relationship_type),
CONSTRAINT valid_weight CHECK (weight BETWEEN 0.0 AND 1.0)
);

B. Vector Storage (pgvector)

-- Enable vector extension
CREATE EXTENSION vector;

-- Vector embeddings
CREATE TABLE chunk_embeddings (
chunk_uuid UUID PRIMARY KEY REFERENCES chunks(chunk_uuid),
embedding vector(1536),
model_version TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,

-- Index for similarity search
INDEX vector_idx ON chunk_embeddings
USING ivfflat (embedding vector_cosine_ops)
);

-- Vector search function
CREATE FUNCTION similarity_search(
query_embedding vector,
similarity_threshold float,
max_results integer
) RETURNS TABLE (
chunk_uuid UUID,
similarity float
) LANGUAGE plpgsql AS $$
BEGIN
RETURN QUERY
SELECT
c.chunk_uuid,
1 - (c.embedding <=> query_embedding) as similarity
FROM chunk_embeddings c
WHERE 1 - (c.embedding <=> query_embedding) > similarity_threshold
ORDER BY c.embedding <=> query_embedding
LIMIT max_results;
END;
$$;

C. Time Series Storage (TimescaleDB)

-- Enable TimescaleDB
CREATE EXTENSION timescaledb;

-- Raw metrics
CREATE TABLE raw_metrics (
time TIMESTAMPTZ NOT NULL,
metric_name TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
labels JSONB DEFAULT '{}',

CONSTRAINT valid_metric CHECK (
metric_name ~ '^[a-zA-Z0-9_\.]+$'
)
);

-- Create hypertable
SELECT create_hypertable('raw_metrics', 'time');

-- Aggregated metrics
CREATE TABLE aggregated_metrics (
time TIMESTAMPTZ NOT NULL,
metric_name TEXT NOT NULL,
window TEXT NOT NULL,
min_value DOUBLE PRECISION,
max_value DOUBLE PRECISION,
avg_value DOUBLE PRECISION,
sum_value DOUBLE PRECISION,
count INTEGER,
labels JSONB DEFAULT '{}'
);

-- Create hypertable with compression
SELECT create_hypertable('aggregated_metrics', 'time');
SELECT add_compression_policy('aggregated_metrics', INTERVAL '7 days');

D. Cache Layer (Redis)

Key Patterns:
1. Document Cache
- Format: doc:{uuid}
- TTL: 1 hour
- Value: Document metadata JSON

2. Chunk Cache
- Format: chunk:{uuid}
- TTL: 1 hour
- Value: Chunk content and metadata

3. Search Results Cache
- Format: search:{query_hash}
- TTL: 5 minutes
- Value: Ranked results JSON

4. Metric Cache
- Format: metric:{name}:latest
- TTL: 30 seconds
- Value: Latest metric value

2. Data Access Patterns

2.1 Repository Layer

class BaseRepository:
"""Base repository with common operations"""
async def get(self, id: UUID) -> Optional[Entity]:
"""Get entity by ID"""
pass

async def create(self, entity: Entity) -> Entity:
"""Create new entity"""
pass

async def update(self, entity: Entity) -> Entity:
"""Update existing entity"""
pass

async def delete(self, id: UUID) -> bool:
"""Delete entity by ID"""
pass

class DocumentRepository(BaseRepository):
"""Document-specific repository"""
async def get_with_chunks(
self,
doc_uuid: UUID
) -> Optional[Document]:
"""Get document with all chunks"""
pass

async def find_by_status(
self,
status: str,
limit: int = 10
) -> List[Document]:
"""Find documents by status"""
pass

class VectorRepository(BaseRepository):
"""Vector operations repository"""
async def find_similar(
self,
embedding: List[float],
threshold: float,
limit: int
) -> List[Tuple[UUID, float]]:
"""Find similar vectors"""
pass

async def batch_insert(
self,
vectors: List[Vector]
) -> List[UUID]:
"""Batch insert vectors"""
pass

3. Data Flow Patterns

3.1 Write Patterns

3.2 Read Patterns

4. Data Lifecycle Management

4.1 Retention Policies

-- Raw metrics retention
SELECT add_retention_policy(
'raw_metrics',
INTERVAL '7 days'
);

-- Aggregated metrics retention
SELECT add_retention_policy(
'aggregated_metrics',
INTERVAL '90 days'
);

-- Document retention
CREATE POLICY document_retention_policy ON documents
FOR DELETE
USING (
created_at < NOW() - INTERVAL '1 year'
AND status = 'completed'
);

4.2 Compression Policies

-- Configure TimescaleDB compression
SELECT add_compression_policy(
'raw_metrics',
INTERVAL '2 days'
);

SELECT add_compression_policy(
'aggregated_metrics',
INTERVAL '7 days'
);

5. Data Security

5.1 Encryption

-- At-rest encryption
CREATE EXTENSION pgcrypto;

-- Sensitive data encryption
CREATE TABLE encrypted_document_metadata (
doc_uuid UUID PRIMARY KEY,
encrypted_data bytea,
encryption_key_id TEXT,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

-- Encryption function
CREATE FUNCTION encrypt_metadata(
data JSONB,
key_id TEXT
) RETURNS bytea AS $$
BEGIN
RETURN pgp_sym_encrypt(
data::text,
get_encryption_key(key_id)
);
END;
$$ LANGUAGE plpgsql;

5.2 Access Control

-- Role-based access
CREATE ROLE readonly;
CREATE ROLE datawriter;
CREATE ROLE admin;

-- Grant permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly;
GRANT INSERT, UPDATE ON allowed_tables TO datawriter;
GRANT ALL ON ALL TABLES IN SCHEMA public TO admin;

-- Row-level security
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

CREATE POLICY document_access_policy ON documents
USING (created_by = current_user);

6. Data Monitoring

6.1 Performance Metrics

-- Create monitoring views
CREATE VIEW storage_metrics AS
SELECT
schemaname,
relname,
pg_size_pretty(pg_total_relation_size(relid)) as total_size,
pg_size_pretty(pg_indexes_size(relid)) as index_size,
n_live_tup as row_count
FROM pg_stat_user_tables;

-- Index usage stats
CREATE VIEW index_usage_stats AS
SELECT
schemaname,
relname,
indexrelname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes;

6.2 Health Checks

class DatabaseHealthCheck:
"""Database health monitoring"""
async def check_health(self) -> Dict[str, Any]:
return {
"connection_pool": await self.check_pool(),
"replication_lag": await self.check_replication(),
"blocking_queries": await self.check_blocking(),
"cache_hit_ratio": await self.check_cache()
}

async def check_pool(self) -> Dict[str, int]:
"""Check connection pool status"""
query = """
SELECT count(*) as active_connections
FROM pg_stat_activity
WHERE state = 'active';
"""
return await self.db.fetch_one(query)

Would you like me to:

  1. Add more detail to any section?
  2. Create additional data flow diagrams?
  3. Add specific implementation examples?
  4. Proceed with Part 5: Summary and Glossary?