Skip to main content

Database Role in Lossless Document Processing

1. Core Database Functions

A. Relationship Management

-- Core relationship tables
CREATE TABLE documents (
doc_uuid VARCHAR(36) PRIMARY KEY,
filename VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_chunks INTEGER,
status VARCHAR(50)
);

CREATE TABLE chunks (
chunk_uuid VARCHAR(36) PRIMARY KEY,
doc_uuid VARCHAR(36),
sequence_num INTEGER,
content TEXT,
previous_uuid VARCHAR(36),
next_uuid VARCHAR(36),
processed_at TIMESTAMP,
FOREIGN KEY (doc_uuid) REFERENCES documents(doc_uuid),
FOREIGN KEY (previous_uuid) REFERENCES chunks(chunk_uuid),
FOREIGN KEY (next_uuid) REFERENCES chunks(chunk_uuid)
);

-- Processing status tracking
CREATE TABLE processing_status (
chunk_uuid VARCHAR(36) PRIMARY KEY,
status VARCHAR(50),
attempts INTEGER DEFAULT 0,
last_attempt TIMESTAMP,
error_message TEXT,
FOREIGN KEY (chunk_uuid) REFERENCES chunks(chunk_uuid)
);

B. Transaction Management

class DatabaseTransactionManager:
def __init__(self, db_connection):
self.db = db_connection

async def store_document_with_chunks(self, document: Dict, chunks: List[Dict]):
"""Store document and chunks in a single transaction"""
async with self.db.transaction():
# Store document
await self.db.execute("""
INSERT INTO documents (doc_uuid, filename, total_chunks)
VALUES ($1, $2, $3)
""", document['uuid'], document['filename'], len(chunks))

# Store chunks maintaining relationships
for chunk in chunks:
await self.db.execute("""
INSERT INTO chunks (
chunk_uuid, doc_uuid, sequence_num,
content, previous_uuid, next_uuid
) VALUES ($1, $2, $3, $4, $5, $6)
""", chunk['uuid'], document['uuid'], chunk['sequence'],
chunk['content'], chunk['previous_uuid'], chunk['next_uuid'])

Context

The current situation requires a decision because:

  • Requirement 1
  • Constraint 2
  • Need 3

Status

Accepted | YYYY-MM-DD

2. Key Database Functions

A. State Management

class ProcessingStateManager:
def __init__(self, db_connection):
self.db = db_connection

async def track_processing_state(self, chunk_uuid: str):
"""Track processing state of chunks"""
await self.db.execute("""
INSERT INTO processing_status (chunk_uuid, status, attempts)
VALUES ($1, 'pending', 0)
ON CONFLICT (chunk_uuid)
DO UPDATE SET attempts = processing_status.attempts + 1,
last_attempt = CURRENT_TIMESTAMP
""")

async def get_failed_chunks(self) -> List[Dict]:
"""Retrieve chunks that need reprocessing"""
return await self.db.fetch("""
SELECT c.*, ps.attempts, ps.error_message
FROM chunks c
JOIN processing_status ps ON c.chunk_uuid = ps.chunk_uuid
WHERE ps.status = 'failed' AND ps.attempts < 3
""")

B. Recovery Management

class RecoveryManager:
def __init__(self, db_connection):
self.db = db_connection

async def find_incomplete_documents(self) -> List[Dict]:
"""Find documents with incomplete processing"""
return await self.db.fetch("""
SELECT d.*,
COUNT(c.chunk_uuid) as total_chunks,
COUNT(ps.chunk_uuid) as processed_chunks
FROM documents d
LEFT JOIN chunks c ON d.doc_uuid = c.doc_uuid
LEFT JOIN processing_status ps ON c.chunk_uuid = ps.chunk_uuid
AND ps.status = 'completed'
GROUP BY d.doc_uuid
HAVING COUNT(c.chunk_uuid) > COUNT(ps.chunk_uuid)
""")

async def reconstruct_document_state(self, doc_uuid: str) -> Dict:
"""Reconstruct complete document state"""
chunks = await self.db.fetch("""
SELECT c.*, ps.status, ps.attempts
FROM chunks c
LEFT JOIN processing_status ps ON c.chunk_uuid = ps.chunk_uuid
WHERE c.doc_uuid = $1
ORDER BY c.sequence_num
""", doc_uuid)
return self._rebuild_document_state(chunks)

3. API Integration Benefits

A. Efficient API Processing

class APIProcessingManager:
def __init__(self, db_connection):
self.db = db_connection

async def get_next_chunk_batch(self, batch_size: int = 5) -> List[Dict]:
"""Get next batch of chunks for API processing"""
return await self.db.fetch("""
SELECT c.*
FROM chunks c
LEFT JOIN processing_status ps ON c.chunk_uuid = ps.chunk_uuid
WHERE ps.status IS NULL
OR (ps.status = 'failed' AND ps.attempts < 3)
ORDER BY c.sequence_num
LIMIT $1
""", batch_size)

async def update_chunk_status(self, chunk_uuid: str, status: str, error: str = None):
"""Update processing status after API call"""
await self.db.execute("""
INSERT INTO processing_status (chunk_uuid, status, error_message)
VALUES ($1, $2, $3)
ON CONFLICT (chunk_uuid)
DO UPDATE SET status = $2,
error_message = $3,
last_attempt = CURRENT_TIMESTAMP
""", chunk_uuid, status, error)

B. Response Management

class ResponseManager:
def __init__(self, db_connection):
self.db = db_connection

async def store_api_response(self, chunk_uuid: str, response: Dict):
"""Store API response with chunk relationship"""
await self.db.execute("""
INSERT INTO api_responses (
chunk_uuid, response_content,
processed_at, tokens_used
) VALUES ($1, $2, CURRENT_TIMESTAMP, $3)
""", chunk_uuid, json.dumps(response['content']),
response['usage']['total_tokens'])

async def get_document_responses(self, doc_uuid: str) -> List[Dict]:
"""Retrieve all responses for a document in sequence"""
return await self.db.fetch("""
SELECT c.sequence_num, ar.response_content
FROM chunks c
JOIN api_responses ar ON c.chunk_uuid = ar.chunk_uuid
WHERE c.doc_uuid = $1
ORDER BY c.sequence_num
""", doc_uuid)

4. Key Benefits of Database Integration

  1. Transactional Integrity

    • Ensures atomic operations for document and chunk storage
    • Maintains relationship consistency
    • Prevents orphaned chunks or broken relationships
  2. State Management

    • Tracks processing status of each chunk
    • Enables recovery from failures
    • Maintains processing history
  3. Relationship Preservation

    • Maintains chunk order and relationships
    • Enables document reconstruction
    • Preserves metadata associations
  4. Performance Optimization

    • Efficient batch processing
    • Reduced memory usage
    • Optimized query patterns
  5. Error Recovery

    • Automatic retry mechanism
    • Failed chunk identification
    • Processing state restoration

Would you like me to:

  1. Implement any specific database functionality?
  2. Create the schema with indexes for optimal performance?
  3. Develop the state management system?
  4. Design the recovery mechanisms?

The database serves as the source of truth and enables robust processing while maintaining complete data integrity.