Technical Design Document (TDD)
Video-to-Knowledge Pipeline System
Version: 1.0
Date: 2026-02-05
Status: Draft
1. Technical Overview
This document details the technical implementation of the Video-to-Knowledge Pipeline, including technology stack, algorithms, data structures, APIs, and integration patterns.
2. Technology Stack
2.1 Core Technologies
| Layer | Technology | Version | Purpose |
|---|---|---|---|
| Language | Python | 3.11+ | Primary implementation |
| Video Processing | FFmpeg | 6.0+ | Audio/video manipulation |
| Download | yt-dlp | 2024+ | Video acquisition |
| Computer Vision | OpenCV, PIL | 4.9+, 10+ | Image processing |
| ML/AI | OpenAI API, Anthropic API | Latest | Transcription, Vision, LLM |
| Data Storage | SQLite/PostgreSQL | - | Metadata and relationships |
| Documentation | Markdown, Mermaid | - | Artifact format |
| Web Search | Serper API | - | Knowledge expansion |
2.2 Python Dependencies
yt-dlp>=2024.1.0
ffmpeg-python>=0.2.0
opencv-python>=4.9.0
Pillow>=10.0.0
imagehash>=4.3.1
scikit-image>=0.22.0
openai>=1.0.0
anthropic>=0.18.0
numpy>=1.24.0
pydantic>=2.0.0
typer>=0.9.0
rich>=13.0.0
aiohttp>=3.9.0
aiosqlite>=0.19.0
3. Algorithms & Data Structures
3.1 Perceptual Image Deduplication
Algorithm: Perceptual Hash (pHash) + Hamming Distance
# Pseudocode
def deduplicate_frames(frames: List[Frame], threshold: int = 10) -> List[Frame]:
"""
Remove visually similar frames using perceptual hashing.
Args:
frames: List of frame objects with image data
threshold: Hamming distance threshold (0-64 for 64-bit hash)
Lower = more strict, Higher = more permissive
Returns:
List of unique frames
"""
unique_frames = []
last_hash = None
for frame in frames:
# Compute perceptual hash
image = Image.open(frame.path)
current_hash = imagehash.phash(image)
if last_hash is None:
# First frame always kept
unique_frames.append(frame)
last_hash = current_hash
else:
# Calculate Hamming distance
distance = current_hash - last_hash
if distance > threshold:
# Content changed significantly
unique_frames.append(frame)
last_hash = current_hash
else:
# Similar to previous, mark as duplicate
frame.is_duplicate = True
frame.similarity_score = 1 - (distance / 64)
return unique_frames
Complexity:
- Time: O(n) where n = number of frames
- Space: O(1) additional (streaming)
Alternative Algorithms Considered:
- SSIM (Structural Similarity): More accurate but slower
- CNN embeddings: Best accuracy but requires GPU
- aHash/dHash: Faster but less robust
3.2 Timestamp Alignment
Algorithm: Binary Search + Dynamic Programming
def align_content(
transcript_segments: List[Segment],
frame_analyses: List[ImageAnalysis]
) -> List[AlignedChunk]:
"""
Align transcript segments with visual frames by timestamp.
Creates chunks where transcript content and visual content
overlap temporally.
"""
aligned = []
for segment in transcript_segments:
# Find frames within time window
start_time = segment.start_time
end_time = segment.end_time
# Binary search for relevant frames
relevant_frames = binary_search_frames(
frame_analyses,
start_time - WINDOW_PADDING,
end_time + WINDOW_PADDING
)
# Create aligned chunk
chunk = AlignedChunk(
transcript=segment,
frames=relevant_frames,
start_time=start_time,
end_time=end_time
)
aligned.append(chunk)
return aligned
3.3 Whisper Model Auto-Selection
Algorithm: Hardware-aware model selection with priority-based fallback
def select_whisper_model(
available_memory_gb: float,
priority: str = "balanced",
has_gpu: bool = False
) -> str:
"""
Select optimal Whisper model based on hardware capabilities.
Args:
available_memory_gb: Available system RAM/VRAM in GB
priority: Optimization priority - "speed", "quality", or "balanced"
has_gpu: Whether CUDA GPU is available
Returns:
Model size: "tiny", "base", "small", "medium", or "large-v3"
Model Specifications:
- tiny: 39M params, ~1GB RAM, 32x realtime, 58% accuracy
- base: 74M params, ~1GB RAM, 16x realtime, 68% accuracy
- small: 244M params, ~2GB RAM, 6x realtime, 76% accuracy
- medium: 769M params, ~5GB RAM, 2x realtime, 82% accuracy
- large-v3: 1550M params, ~10GB RAM, 1x realtime, 87% accuracy
"""
models = {
"tiny": {"ram": 0.5, "speed": 32, "accuracy": 58},
"base": {"ram": 1.0, "speed": 16, "accuracy": 68},
"small": {"ram": 2.0, "speed": 6, "accuracy": 76},
"medium": {"ram": 5.0, "speed": 2, "accuracy": 82},
"large-v3": {"ram": 10.0, "speed": 1, "accuracy": 87},
}
# Filter by available memory
viable = {
name: specs for name, specs in models.items()
if specs["ram"] <= available_memory_gb * 0.8 # 80% safety margin
}
if not viable:
return "tiny" # Fallback to smallest
# Select by priority
if priority == "speed":
return max(viable, key=lambda m: viable[m]["speed"])
elif priority == "quality":
return max(viable, key=lambda m: viable[m]["accuracy"])
else: # balanced - optimize for accuracy within memory, weighted by speed
def score(m):
s = viable[m]
return s["accuracy"] * 0.6 + s["speed"] * 2 # Speed bonus
return max(viable, key=score)
class AdaptiveTranscriptionEngine:
"""
Transcription engine with automatic model selection.
"""
def __init__(
self,
model_size: str = "auto",
priority: str = "balanced",
device: str = "auto"
):
if model_size == "auto":
import psutil
memory_gb = psutil.virtual_memory().available / (1024**3)
has_gpu = torch.cuda.is_available()
model_size = select_whisper_model(memory_gb, priority, has_gpu)
logger.info(f"Auto-selected model: {model_size} ({memory_gb:.1f}GB available)")
self.model = whisper.load_model(model_size)
self.model_size = model_size
Complexity:
- Time: O(1) - constant time lookup
- Space: O(1) - minimal memory overhead
Configuration Options:
| Setting | Values | Default | Description |
|---|---|---|---|
model_size | tiny/base/small/medium/large-v3/auto | auto | Specific model or auto-select |
priority | speed/quality/balanced | balanced | Optimization target |
device | cpu/cuda/auto | auto | Compute device |
Environment Overrides:
export WHISPER_MODEL_SIZE=medium # Force specific model
export WHISPER_PRIORITY=quality # Force quality priority
export WHISPER_DEVICE=cuda # Force GPU usage
3.4 Semantic Chunking
Algorithm: Hierarchical Agglomerative Clustering with Embeddings
def semantic_chunk(
content: SynthesizedContent,
max_chunk_size: int = 4000,
overlap: int = 200
) -> List[ContentChunk]:
"""
Split content into semantically coherent chunks.
Uses sentence embeddings + clustering to find natural
boundaries between topics.
"""
# Step 1: Generate embeddings for sentences
sentences = segment_into_sentences(content)
embeddings = embed_sentences(sentences)
# Step 2: Cluster by similarity
clusters = hierarchical_cluster(embeddings, threshold=0.7)
# Step 3: Merge clusters into chunks respecting max size
chunks = []
current_chunk = []
current_size = 0
for cluster in clusters:
cluster_text = ' '.join([sentences[i] for i in cluster])
cluster_size = len(cluster_text)
if current_size + cluster_size > max_chunk_size:
# Finalize current chunk
chunks.append(create_chunk(current_chunk, overlap))
current_chunk = current_chunk[-overlap:] if overlap else []
current_size = sum(len(s) for s in current_chunk)
current_chunk.append(cluster_text)
current_size += cluster_size
# Add final chunk
if current_chunk:
chunks.append(create_chunk(current_chunk))
return chunks
4. API Specifications
4.1 Internal APIs
VideoDownloader
class VideoDownloader:
"""
Downloads videos from various platforms.
"""
async def download(
self,
url: str,
output_dir: Path,
quality: str = "best",
extract_audio: bool = True
) -> DownloadResult:
"""
Download video from URL.
Args:
url: Video URL (YouTube, Vimeo, etc.)
output_dir: Where to save files
quality: Video quality selector
extract_audio: Whether to extract audio stream
Returns:
DownloadResult with paths and metadata
"""
...
def get_info(self, url: str) -> VideoInfo:
"""Get video metadata without downloading."""
...
TranscriptionEngine
class TranscriptionEngine:
"""
Converts audio to text with timestamps.
"""
SUPPORTED_MODELS = ["whisper-1", "whisper-large-v3"]
async def transcribe(
self,
audio_path: Path,
model: str = "whisper-large-v3",
language: Optional[str] = None,
response_format: str = "verbose_json"
) -> Transcript:
"""
Transcribe audio file.
Args:
audio_path: Path to audio file (mp3, wav, etc.)
model: Whisper model to use
language: ISO-639-1 code or None for auto
response_format: Output format
Returns:
Transcript with segments and metadata
"""
...
def transcribe_with_diarization(
self,
audio_path: Path
) -> Transcript:
"""Transcribe with speaker identification."""
...
VisionAnalyzer
class VisionAnalyzer:
"""
Analyzes images using multimodal LLMs.
"""
SYSTEM_PROMPT = """
Analyze this video frame. Describe:
1. Main visual elements (UI components, diagrams, text)
2. Any code or technical content visible
3. The overall context/purpose shown
4. Relationship to surrounding content if known
"""
async def analyze_frame(
self,
frame_path: Path,
context: Optional[str] = None,
model: str = "gpt-4-vision-preview"
) -> ImageAnalysis:
"""
Analyze a video frame.
Args:
frame_path: Path to image file
context: Optional context from transcript
model: Vision model to use
Returns:
Structured analysis of the frame
"""
...
async def batch_analyze(
self,
frames: List[Path],
max_concurrent: int = 5
) -> List[ImageAnalysis]:
"""Analyze multiple frames with rate limiting."""
...
ArtifactGenerator
class ArtifactGenerator:
"""
Generates structured documentation artifacts.
"""
TEMPLATES = {
"sdd": "sdd_template.j2",
"tdd": "tdd_template.j2",
"adr": "adr_template.j2",
"c4": "c4_template.j2",
"summary": "summary_template.j2",
"outline": "outline_template.j2",
"glossary": "glossary_template.j2"
}
async def generate(
self,
artifact_type: str,
content: SynthesizedContent,
context: Dict[str, Any]
) -> Artifact:
"""
Generate a specific artifact type.
Args:
artifact_type: Type of artifact to generate
content: Synthesized content from pipeline
context: Additional context for generation
Returns:
Generated artifact with metadata
"""
template = self.load_template(artifact_type)
prompt = template.render(content=content, context=context)
response = await self.llm.complete(prompt)
return self.parse_artifact(response, artifact_type)
async def generate_all(
self,
content: SynthesizedContent
) -> ArtifactCollection:
"""Generate all artifact types."""
tasks = [
self.generate(t, content, {})
for t in self.TEMPLATES.keys()
]
artifacts = await asyncio.gather(*tasks)
return ArtifactCollection(artifacts)
5. Database Schema
5.1 SQLite Schema
-- Core tables for pipeline metadata
CREATE TABLE videos (
id TEXT PRIMARY KEY,
source_url TEXT NOT NULL,
title TEXT,
description TEXT,
duration_seconds INTEGER,
local_path TEXT NOT NULL,
downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE audio (
id TEXT PRIMARY KEY,
video_id TEXT REFERENCES videos(id),
format TEXT,
local_path TEXT NOT NULL,
bitrate INTEGER,
sample_rate INTEGER,
extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE transcripts (
id TEXT PRIMARY KEY,
audio_id TEXT REFERENCES audio(id),
full_text TEXT NOT NULL,
segments JSON, -- Array of {start, end, text, confidence}
language TEXT,
model TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE frames (
id TEXT PRIMARY KEY,
video_id TEXT REFERENCES videos(id),
sequence_number INTEGER,
timestamp REAL,
local_path TEXT NOT NULL,
perceptual_hash TEXT,
is_unique BOOLEAN DEFAULT TRUE,
similarity_score REAL,
extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE frame_analyses (
id TEXT PRIMARY KEY,
frame_id TEXT REFERENCES frames(id),
description TEXT,
detected_elements JSON,
ocr_text TEXT,
model TEXT,
analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE content_chunks (
id TEXT PRIMARY KEY,
transcript_id TEXT REFERENCES transcripts(id),
chunk_index INTEGER,
start_time REAL,
end_time REAL,
transcript_text TEXT,
frame_ids JSON, -- Array of frame IDs
summary TEXT,
embedding JSON -- Vector embedding for similarity search
);
CREATE TABLE artifacts (
id TEXT PRIMARY KEY,
chunk_id TEXT REFERENCES content_chunks(id),
artifact_type TEXT NOT NULL, -- sdd, tdd, adr, etc.
title TEXT,
local_path TEXT NOT NULL,
metadata JSON,
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for performance
CREATE INDEX idx_frames_video ON frames(video_id);
CREATE INDEX idx_frames_unique ON frames(video_id, is_unique);
CREATE INDEX idx_analyses_frame ON frame_analyses(frame_id);
CREATE INDEX idx_artifacts_type ON artifacts(artifact_type);
CREATE INDEX idx_chunks_transcript ON content_chunks(transcript_id);
6. Integration Patterns
6.1 External APIs
OpenAI API (Whisper)
# Rate limiting: 50 RPM for Whisper
async def transcribe_with_retry(
audio_path: Path,
max_retries: int = 3
) -> Transcript:
for attempt in range(max_retries):
try:
with open(audio_path, "rb") as f:
response = await openai.audio.transcriptions.create(
model="whisper-large-v3",
file=f,
response_format="verbose_json",
timestamp_granularities=["segment"]
)
return Transcript.from_openai(response)
except RateLimitError:
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
if attempt == max_retries - 1:
raise
Anthropic API (Claude Vision)
# Rate limiting: 40000 TPM for Claude 3.5 Sonnet
async def analyze_with_claude(
image_path: Path,
context: str
) -> str:
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode()
response = await anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=[{
"role": "user",
"content": [
{"type": "text", "text": context},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": image_data
}
}
]
}]
)
return response.content[0].text
Web Search (Serper)
async def expand_with_web_search(
query: str,
num_results: int = 5
) -> List[SearchResult]:
"""Expand content with web search results."""
async with aiohttp.ClientSession() as session:
async with session.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": SERPER_API_KEY},
json={"q": query, "num": num_results}
) as response:
data = await response.json()
return [
SearchResult(
title=r["title"],
snippet=r["snippet"],
url=r["link"]
)
for r in data["organic"]
]
7. Performance Considerations
7.1 Optimization Strategies
| Bottleneck | Optimization |
|---|---|
| Video download | Parallel downloads, CDN selection |
| Frame extraction | Hardware acceleration (GPU) |
| Deduplication | Streaming algorithm, O(n) time |
| Vision analysis | Batch processing, caching |
| LLM generation | Parallel requests, response streaming |
7.2 Caching Strategy
# Redis/Memcached for API responses
@cache.memoize(timeout=3600)
async def analyze_frame_cached(frame_hash: str) -> ImageAnalysis:
"""Cache vision analysis by perceptual hash."""
...
# Disk cache for downloads
@diskcache.cache("downloads")
def download_video(url: str) -> Path:
...
7.3 Resource Limits
| Resource | Limit | Action on Exceed |
|---|---|---|
| Memory | 8GB | Stream processing |
| Disk | 100GB | LRU cleanup |
| API calls | Rate limits | Queue + backoff |
| Concurrent | 10 workers | Semaphore |
8. Testing Strategy
8.1 Unit Tests
def test_deduplication():
"""Test perceptual deduplication algorithm."""
frames = load_test_frames()
unique = deduplicate_frames(frames, threshold=10)
assert len(unique) < len(frames)
def test_timestamp_alignment():
"""Test transcript-frame alignment."""
segments = [Segment(start=0, end=5, text="Hello")]
frames = [Frame(timestamp=2.5)]
aligned = align_content(segments, frames)
assert len(aligned) == 1
assert len(aligned[0].frames) == 1
8.2 Integration Tests
- End-to-end pipeline with short video
- API failure recovery
- Rate limiting handling
- Concurrent processing
8.3 Performance Tests
- Benchmark deduplication with 1000+ frames
- Measure transcription accuracy
- LLM token usage optimization
9. Deployment
9.1 Docker Configuration
FROM python:3.11-slim
# Install FFmpeg
RUN apt-get update && apt-get install -y ffmpeg
# Install Python deps
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application
COPY src/ /app/src/
COPY config/ /app/config/
WORKDIR /app
CMD ["python", "-m", "src.pipeline"]
9.2 Environment Variables
# APIs
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
SERPER_API_KEY=...
# Paths
OUTPUT_DIR=/data/outputs
TEMP_DIR=/tmp/videos
# Pipeline
MAX_WORKERS=10
DEDUP_THRESHOLD=10
FRAME_RATE=0.5
# Database
DATABASE_URL=sqlite:///data/pipeline.db