Database Role in Lossless Document Processing
1. Core Database Functions
A. Relationship Management
-- Core relationship tables
CREATE TABLE documents (
doc_uuid VARCHAR(36) PRIMARY KEY,
filename VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_chunks INTEGER,
status VARCHAR(50)
);
CREATE TABLE chunks (
chunk_uuid VARCHAR(36) PRIMARY KEY,
doc_uuid VARCHAR(36),
sequence_num INTEGER,
content TEXT,
previous_uuid VARCHAR(36),
next_uuid VARCHAR(36),
processed_at TIMESTAMP,
FOREIGN KEY (doc_uuid) REFERENCES documents(doc_uuid),
FOREIGN KEY (previous_uuid) REFERENCES chunks(chunk_uuid),
FOREIGN KEY (next_uuid) REFERENCES chunks(chunk_uuid)
);
-- Processing status tracking
CREATE TABLE processing_status (
chunk_uuid VARCHAR(36) PRIMARY KEY,
status VARCHAR(50),
attempts INTEGER DEFAULT 0,
last_attempt TIMESTAMP,
error_message TEXT,
FOREIGN KEY (chunk_uuid) REFERENCES chunks(chunk_uuid)
);
B. Transaction Management
class DatabaseTransactionManager:
def __init__(self, db_connection):
self.db = db_connection
async def store_document_with_chunks(self, document: Dict, chunks: List[Dict]):
"""Store document and chunks in a single transaction"""
async with self.db.transaction():
# Store document
await self.db.execute("""
INSERT INTO documents (doc_uuid, filename, total_chunks)
VALUES ($1, $2, $3)
""", document['uuid'], document['filename'], len(chunks))
# Store chunks maintaining relationships
for chunk in chunks:
await self.db.execute("""
INSERT INTO chunks (
chunk_uuid, doc_uuid, sequence_num,
content, previous_uuid, next_uuid
) VALUES ($1, $2, $3, $4, $5, $6)
""", chunk['uuid'], document['uuid'], chunk['sequence'],
chunk['content'], chunk['previous_uuid'], chunk['next_uuid'])
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Status
Accepted | YYYY-MM-DD
2. Key Database Functions
A. State Management
class ProcessingStateManager:
def __init__(self, db_connection):
self.db = db_connection
async def track_processing_state(self, chunk_uuid: str):
"""Track processing state of chunks"""
await self.db.execute("""
INSERT INTO processing_status (chunk_uuid, status, attempts)
VALUES ($1, 'pending', 0)
ON CONFLICT (chunk_uuid)
DO UPDATE SET attempts = processing_status.attempts + 1,
last_attempt = CURRENT_TIMESTAMP
""")
async def get_failed_chunks(self) -> List[Dict]:
"""Retrieve chunks that need reprocessing"""
return await self.db.fetch("""
SELECT c.*, ps.attempts, ps.error_message
FROM chunks c
JOIN processing_status ps ON c.chunk_uuid = ps.chunk_uuid
WHERE ps.status = 'failed' AND ps.attempts < 3
""")
B. Recovery Management
class RecoveryManager:
def __init__(self, db_connection):
self.db = db_connection
async def find_incomplete_documents(self) -> List[Dict]:
"""Find documents with incomplete processing"""
return await self.db.fetch("""
SELECT d.*,
COUNT(c.chunk_uuid) as total_chunks,
COUNT(ps.chunk_uuid) as processed_chunks
FROM documents d
LEFT JOIN chunks c ON d.doc_uuid = c.doc_uuid
LEFT JOIN processing_status ps ON c.chunk_uuid = ps.chunk_uuid
AND ps.status = 'completed'
GROUP BY d.doc_uuid
HAVING COUNT(c.chunk_uuid) > COUNT(ps.chunk_uuid)
""")
async def reconstruct_document_state(self, doc_uuid: str) -> Dict:
"""Reconstruct complete document state"""
chunks = await self.db.fetch("""
SELECT c.*, ps.status, ps.attempts
FROM chunks c
LEFT JOIN processing_status ps ON c.chunk_uuid = ps.chunk_uuid
WHERE c.doc_uuid = $1
ORDER BY c.sequence_num
""", doc_uuid)
return self._rebuild_document_state(chunks)
3. API Integration Benefits
A. Efficient API Processing
class APIProcessingManager:
def __init__(self, db_connection):
self.db = db_connection
async def get_next_chunk_batch(self, batch_size: int = 5) -> List[Dict]:
"""Get next batch of chunks for API processing"""
return await self.db.fetch("""
SELECT c.*
FROM chunks c
LEFT JOIN processing_status ps ON c.chunk_uuid = ps.chunk_uuid
WHERE ps.status IS NULL
OR (ps.status = 'failed' AND ps.attempts < 3)
ORDER BY c.sequence_num
LIMIT $1
""", batch_size)
async def update_chunk_status(self, chunk_uuid: str, status: str, error: str = None):
"""Update processing status after API call"""
await self.db.execute("""
INSERT INTO processing_status (chunk_uuid, status, error_message)
VALUES ($1, $2, $3)
ON CONFLICT (chunk_uuid)
DO UPDATE SET status = $2,
error_message = $3,
last_attempt = CURRENT_TIMESTAMP
""", chunk_uuid, status, error)
B. Response Management
class ResponseManager:
def __init__(self, db_connection):
self.db = db_connection
async def store_api_response(self, chunk_uuid: str, response: Dict):
"""Store API response with chunk relationship"""
await self.db.execute("""
INSERT INTO api_responses (
chunk_uuid, response_content,
processed_at, tokens_used
) VALUES ($1, $2, CURRENT_TIMESTAMP, $3)
""", chunk_uuid, json.dumps(response['content']),
response['usage']['total_tokens'])
async def get_document_responses(self, doc_uuid: str) -> List[Dict]:
"""Retrieve all responses for a document in sequence"""
return await self.db.fetch("""
SELECT c.sequence_num, ar.response_content
FROM chunks c
JOIN api_responses ar ON c.chunk_uuid = ar.chunk_uuid
WHERE c.doc_uuid = $1
ORDER BY c.sequence_num
""", doc_uuid)
4. Key Benefits of Database Integration
-
Transactional Integrity
- Ensures atomic operations for document and chunk storage
- Maintains relationship consistency
- Prevents orphaned chunks or broken relationships
-
State Management
- Tracks processing status of each chunk
- Enables recovery from failures
- Maintains processing history
-
Relationship Preservation
- Maintains chunk order and relationships
- Enables document reconstruction
- Preserves metadata associations
-
Performance Optimization
- Efficient batch processing
- Reduced memory usage
- Optimized query patterns
-
Error Recovery
- Automatic retry mechanism
- Failed chunk identification
- Processing state restoration
Would you like me to:
- Implement any specific database functionality?
- Create the schema with indexes for optimal performance?
- Develop the state management system?
- Design the recovery mechanisms?
The database serves as the source of truth and enables robust processing while maintaining complete data integrity.