System Architecture Design Document
Part 4: Data Architecture
Workflow Checklist
- Prerequisites verified
- Configuration applied
- Process executed
- Results validated
- Documentation updated
Workflow Steps
- Initialize - Set up the environment
- Configure - Apply settings
- Execute - Run the process
- Validate - Check results
- Complete - Finalize workflow
Workflow Phases
Phase 1: Initialization
Set up prerequisites and validate inputs.
Phase 2: Processing
Execute the main workflow steps.
Phase 3: Verification
Validate outputs and confirm completion.
Phase 4: Finalization
Clean up and generate reports.
1. Data Storage Architecture
1.1 Storage Layer Overview
1.2 Storage Components
A. Primary Storage (PostgreSQL)
-- Core document storage
CREATE TABLE documents (
doc_uuid UUID PRIMARY KEY,
filename TEXT NOT NULL,
mime_type TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
metadata JSONB,
status TEXT NOT NULL,
CONSTRAINT valid_status CHECK (status IN (
'pending', 'processing', 'completed', 'failed'
))
);
-- Chunk storage
CREATE TABLE chunks (
chunk_uuid UUID PRIMARY KEY,
doc_uuid UUID REFERENCES documents(doc_uuid),
content TEXT NOT NULL,
sequence_num INTEGER NOT NULL,
start_offset INTEGER NOT NULL,
end_offset INTEGER NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT valid_offsets CHECK (end_offset > start_offset),
CONSTRAINT unique_chunk_sequence UNIQUE (doc_uuid, sequence_num)
);
-- Relationships
CREATE TABLE chunk_relationships (
source_uuid UUID REFERENCES chunks(chunk_uuid),
target_uuid UUID REFERENCES chunks(chunk_uuid),
relationship_type TEXT NOT NULL,
weight FLOAT NOT NULL,
metadata JSONB,
PRIMARY KEY (source_uuid, target_uuid, relationship_type),
CONSTRAINT valid_weight CHECK (weight BETWEEN 0.0 AND 1.0)
);
B. Vector Storage (pgvector)
-- Enable vector extension
CREATE EXTENSION vector;
-- Vector embeddings
CREATE TABLE chunk_embeddings (
chunk_uuid UUID PRIMARY KEY REFERENCES chunks(chunk_uuid),
embedding vector(1536),
model_version TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
-- Index for similarity search
INDEX vector_idx ON chunk_embeddings
USING ivfflat (embedding vector_cosine_ops)
);
-- Vector search function
CREATE FUNCTION similarity_search(
query_embedding vector,
similarity_threshold float,
max_results integer
) RETURNS TABLE (
chunk_uuid UUID,
similarity float
) LANGUAGE plpgsql AS $$
BEGIN
RETURN QUERY
SELECT
c.chunk_uuid,
1 - (c.embedding <=> query_embedding) as similarity
FROM chunk_embeddings c
WHERE 1 - (c.embedding <=> query_embedding) > similarity_threshold
ORDER BY c.embedding <=> query_embedding
LIMIT max_results;
END;
$$;
C. Time Series Storage (TimescaleDB)
-- Enable TimescaleDB
CREATE EXTENSION timescaledb;
-- Raw metrics
CREATE TABLE raw_metrics (
time TIMESTAMPTZ NOT NULL,
metric_name TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
labels JSONB DEFAULT '{}',
CONSTRAINT valid_metric CHECK (
metric_name ~ '^[a-zA-Z0-9_\.]+$'
)
);
-- Create hypertable
SELECT create_hypertable('raw_metrics', 'time');
-- Aggregated metrics
CREATE TABLE aggregated_metrics (
time TIMESTAMPTZ NOT NULL,
metric_name TEXT NOT NULL,
window TEXT NOT NULL,
min_value DOUBLE PRECISION,
max_value DOUBLE PRECISION,
avg_value DOUBLE PRECISION,
sum_value DOUBLE PRECISION,
count INTEGER,
labels JSONB DEFAULT '{}'
);
-- Create hypertable with compression
SELECT create_hypertable('aggregated_metrics', 'time');
SELECT add_compression_policy('aggregated_metrics', INTERVAL '7 days');
D. Cache Layer (Redis)
Key Patterns:
1. Document Cache
- Format: doc:{uuid}
- TTL: 1 hour
- Value: Document metadata JSON
2. Chunk Cache
- Format: chunk:{uuid}
- TTL: 1 hour
- Value: Chunk content and metadata
3. Search Results Cache
- Format: search:{query_hash}
- TTL: 5 minutes
- Value: Ranked results JSON
4. Metric Cache
- Format: metric:{name}:latest
- TTL: 30 seconds
- Value: Latest metric value
2. Data Access Patterns
2.1 Repository Layer
class BaseRepository:
"""Base repository with common operations"""
async def get(self, id: UUID) -> Optional[Entity]:
"""Get entity by ID"""
pass
async def create(self, entity: Entity) -> Entity:
"""Create new entity"""
pass
async def update(self, entity: Entity) -> Entity:
"""Update existing entity"""
pass
async def delete(self, id: UUID) -> bool:
"""Delete entity by ID"""
pass
class DocumentRepository(BaseRepository):
"""Document-specific repository"""
async def get_with_chunks(
self,
doc_uuid: UUID
) -> Optional[Document]:
"""Get document with all chunks"""
pass
async def find_by_status(
self,
status: str,
limit: int = 10
) -> List[Document]:
"""Find documents by status"""
pass
class VectorRepository(BaseRepository):
"""Vector operations repository"""
async def find_similar(
self,
embedding: List[float],
threshold: float,
limit: int
) -> List[Tuple[UUID, float]]:
"""Find similar vectors"""
pass
async def batch_insert(
self,
vectors: List[Vector]
) -> List[UUID]:
"""Batch insert vectors"""
pass
3. Data Flow Patterns
3.1 Write Patterns
3.2 Read Patterns
4. Data Lifecycle Management
4.1 Retention Policies
-- Raw metrics retention
SELECT add_retention_policy(
'raw_metrics',
INTERVAL '7 days'
);
-- Aggregated metrics retention
SELECT add_retention_policy(
'aggregated_metrics',
INTERVAL '90 days'
);
-- Document retention
CREATE POLICY document_retention_policy ON documents
FOR DELETE
USING (
created_at < NOW() - INTERVAL '1 year'
AND status = 'completed'
);
4.2 Compression Policies
-- Configure TimescaleDB compression
SELECT add_compression_policy(
'raw_metrics',
INTERVAL '2 days'
);
SELECT add_compression_policy(
'aggregated_metrics',
INTERVAL '7 days'
);
5. Data Security
5.1 Encryption
-- At-rest encryption
CREATE EXTENSION pgcrypto;
-- Sensitive data encryption
CREATE TABLE encrypted_document_metadata (
doc_uuid UUID PRIMARY KEY,
encrypted_data bytea,
encryption_key_id TEXT,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
-- Encryption function
CREATE FUNCTION encrypt_metadata(
data JSONB,
key_id TEXT
) RETURNS bytea AS $$
BEGIN
RETURN pgp_sym_encrypt(
data::text,
get_encryption_key(key_id)
);
END;
$$ LANGUAGE plpgsql;
5.2 Access Control
-- Role-based access
CREATE ROLE readonly;
CREATE ROLE datawriter;
CREATE ROLE admin;
-- Grant permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly;
GRANT INSERT, UPDATE ON allowed_tables TO datawriter;
GRANT ALL ON ALL TABLES IN SCHEMA public TO admin;
-- Row-level security
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
CREATE POLICY document_access_policy ON documents
USING (created_by = current_user);
6. Data Monitoring
6.1 Performance Metrics
-- Create monitoring views
CREATE VIEW storage_metrics AS
SELECT
schemaname,
relname,
pg_size_pretty(pg_total_relation_size(relid)) as total_size,
pg_size_pretty(pg_indexes_size(relid)) as index_size,
n_live_tup as row_count
FROM pg_stat_user_tables;
-- Index usage stats
CREATE VIEW index_usage_stats AS
SELECT
schemaname,
relname,
indexrelname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes;
6.2 Health Checks
class DatabaseHealthCheck:
"""Database health monitoring"""
async def check_health(self) -> Dict[str, Any]:
return {
"connection_pool": await self.check_pool(),
"replication_lag": await self.check_replication(),
"blocking_queries": await self.check_blocking(),
"cache_hit_ratio": await self.check_cache()
}
async def check_pool(self) -> Dict[str, int]:
"""Check connection pool status"""
query = """
SELECT count(*) as active_connections
FROM pg_stat_activity
WHERE state = 'active';
"""
return await self.db.fetch_one(query)
Would you like me to:
- Add more detail to any section?
- Create additional data flow diagrams?
- Add specific implementation examples?
- Proceed with Part 5: Summary and Glossary?