Skip to main content

Technical Design Document (TDD)

Video-to-Knowledge Pipeline System

Version: 1.0
Date: 2026-02-05
Status: Draft


1. Technical Overview

This document details the technical implementation of the Video-to-Knowledge Pipeline, including technology stack, algorithms, data structures, APIs, and integration patterns.


2. Technology Stack

2.1 Core Technologies

LayerTechnologyVersionPurpose
LanguagePython3.11+Primary implementation
Video ProcessingFFmpeg6.0+Audio/video manipulation
Downloadyt-dlp2024+Video acquisition
Computer VisionOpenCV, PIL4.9+, 10+Image processing
ML/AIOpenAI API, Anthropic APILatestTranscription, Vision, LLM
Data StorageSQLite/PostgreSQL-Metadata and relationships
DocumentationMarkdown, Mermaid-Artifact format
Web SearchSerper API-Knowledge expansion

2.2 Python Dependencies

yt-dlp>=2024.1.0
ffmpeg-python>=0.2.0
opencv-python>=4.9.0
Pillow>=10.0.0
imagehash>=4.3.1
scikit-image>=0.22.0
openai>=1.0.0
anthropic>=0.18.0
numpy>=1.24.0
pydantic>=2.0.0
typer>=0.9.0
rich>=13.0.0
aiohttp>=3.9.0
aiosqlite>=0.19.0

3. Algorithms & Data Structures

3.1 Perceptual Image Deduplication

Algorithm: Perceptual Hash (pHash) + Hamming Distance

# Pseudocode
def deduplicate_frames(frames: List[Frame], threshold: int = 10) -> List[Frame]:
"""
Remove visually similar frames using perceptual hashing.

Args:
frames: List of frame objects with image data
threshold: Hamming distance threshold (0-64 for 64-bit hash)
Lower = more strict, Higher = more permissive

Returns:
List of unique frames
"""
unique_frames = []
last_hash = None

for frame in frames:
# Compute perceptual hash
image = Image.open(frame.path)
current_hash = imagehash.phash(image)

if last_hash is None:
# First frame always kept
unique_frames.append(frame)
last_hash = current_hash
else:
# Calculate Hamming distance
distance = current_hash - last_hash

if distance > threshold:
# Content changed significantly
unique_frames.append(frame)
last_hash = current_hash
else:
# Similar to previous, mark as duplicate
frame.is_duplicate = True
frame.similarity_score = 1 - (distance / 64)

return unique_frames

Complexity:

  • Time: O(n) where n = number of frames
  • Space: O(1) additional (streaming)

Alternative Algorithms Considered:

  • SSIM (Structural Similarity): More accurate but slower
  • CNN embeddings: Best accuracy but requires GPU
  • aHash/dHash: Faster but less robust

3.2 Timestamp Alignment

Algorithm: Binary Search + Dynamic Programming

def align_content(
transcript_segments: List[Segment],
frame_analyses: List[ImageAnalysis]
) -> List[AlignedChunk]:
"""
Align transcript segments with visual frames by timestamp.

Creates chunks where transcript content and visual content
overlap temporally.
"""
aligned = []

for segment in transcript_segments:
# Find frames within time window
start_time = segment.start_time
end_time = segment.end_time

# Binary search for relevant frames
relevant_frames = binary_search_frames(
frame_analyses,
start_time - WINDOW_PADDING,
end_time + WINDOW_PADDING
)

# Create aligned chunk
chunk = AlignedChunk(
transcript=segment,
frames=relevant_frames,
start_time=start_time,
end_time=end_time
)
aligned.append(chunk)

return aligned

3.3 Whisper Model Auto-Selection

Algorithm: Hardware-aware model selection with priority-based fallback

def select_whisper_model(
available_memory_gb: float,
priority: str = "balanced",
has_gpu: bool = False
) -> str:
"""
Select optimal Whisper model based on hardware capabilities.

Args:
available_memory_gb: Available system RAM/VRAM in GB
priority: Optimization priority - "speed", "quality", or "balanced"
has_gpu: Whether CUDA GPU is available

Returns:
Model size: "tiny", "base", "small", "medium", or "large-v3"

Model Specifications:
- tiny: 39M params, ~1GB RAM, 32x realtime, 58% accuracy
- base: 74M params, ~1GB RAM, 16x realtime, 68% accuracy
- small: 244M params, ~2GB RAM, 6x realtime, 76% accuracy
- medium: 769M params, ~5GB RAM, 2x realtime, 82% accuracy
- large-v3: 1550M params, ~10GB RAM, 1x realtime, 87% accuracy
"""
models = {
"tiny": {"ram": 0.5, "speed": 32, "accuracy": 58},
"base": {"ram": 1.0, "speed": 16, "accuracy": 68},
"small": {"ram": 2.0, "speed": 6, "accuracy": 76},
"medium": {"ram": 5.0, "speed": 2, "accuracy": 82},
"large-v3": {"ram": 10.0, "speed": 1, "accuracy": 87},
}

# Filter by available memory
viable = {
name: specs for name, specs in models.items()
if specs["ram"] <= available_memory_gb * 0.8 # 80% safety margin
}

if not viable:
return "tiny" # Fallback to smallest

# Select by priority
if priority == "speed":
return max(viable, key=lambda m: viable[m]["speed"])
elif priority == "quality":
return max(viable, key=lambda m: viable[m]["accuracy"])
else: # balanced - optimize for accuracy within memory, weighted by speed
def score(m):
s = viable[m]
return s["accuracy"] * 0.6 + s["speed"] * 2 # Speed bonus
return max(viable, key=score)


class AdaptiveTranscriptionEngine:
"""
Transcription engine with automatic model selection.
"""

def __init__(
self,
model_size: str = "auto",
priority: str = "balanced",
device: str = "auto"
):
if model_size == "auto":
import psutil
memory_gb = psutil.virtual_memory().available / (1024**3)
has_gpu = torch.cuda.is_available()
model_size = select_whisper_model(memory_gb, priority, has_gpu)
logger.info(f"Auto-selected model: {model_size} ({memory_gb:.1f}GB available)")

self.model = whisper.load_model(model_size)
self.model_size = model_size

Complexity:

  • Time: O(1) - constant time lookup
  • Space: O(1) - minimal memory overhead

Configuration Options:

SettingValuesDefaultDescription
model_sizetiny/base/small/medium/large-v3/autoautoSpecific model or auto-select
priorityspeed/quality/balancedbalancedOptimization target
devicecpu/cuda/autoautoCompute device

Environment Overrides:

export WHISPER_MODEL_SIZE=medium      # Force specific model
export WHISPER_PRIORITY=quality # Force quality priority
export WHISPER_DEVICE=cuda # Force GPU usage

3.4 Semantic Chunking

Algorithm: Hierarchical Agglomerative Clustering with Embeddings

def semantic_chunk(
content: SynthesizedContent,
max_chunk_size: int = 4000,
overlap: int = 200
) -> List[ContentChunk]:
"""
Split content into semantically coherent chunks.

Uses sentence embeddings + clustering to find natural
boundaries between topics.
"""
# Step 1: Generate embeddings for sentences
sentences = segment_into_sentences(content)
embeddings = embed_sentences(sentences)

# Step 2: Cluster by similarity
clusters = hierarchical_cluster(embeddings, threshold=0.7)

# Step 3: Merge clusters into chunks respecting max size
chunks = []
current_chunk = []
current_size = 0

for cluster in clusters:
cluster_text = ' '.join([sentences[i] for i in cluster])
cluster_size = len(cluster_text)

if current_size + cluster_size > max_chunk_size:
# Finalize current chunk
chunks.append(create_chunk(current_chunk, overlap))
current_chunk = current_chunk[-overlap:] if overlap else []
current_size = sum(len(s) for s in current_chunk)

current_chunk.append(cluster_text)
current_size += cluster_size

# Add final chunk
if current_chunk:
chunks.append(create_chunk(current_chunk))

return chunks

4. API Specifications

4.1 Internal APIs

VideoDownloader

class VideoDownloader:
"""
Downloads videos from various platforms.
"""

async def download(
self,
url: str,
output_dir: Path,
quality: str = "best",
extract_audio: bool = True
) -> DownloadResult:
"""
Download video from URL.

Args:
url: Video URL (YouTube, Vimeo, etc.)
output_dir: Where to save files
quality: Video quality selector
extract_audio: Whether to extract audio stream

Returns:
DownloadResult with paths and metadata
"""
...

def get_info(self, url: str) -> VideoInfo:
"""Get video metadata without downloading."""
...

TranscriptionEngine

class TranscriptionEngine:
"""
Converts audio to text with timestamps.
"""

SUPPORTED_MODELS = ["whisper-1", "whisper-large-v3"]

async def transcribe(
self,
audio_path: Path,
model: str = "whisper-large-v3",
language: Optional[str] = None,
response_format: str = "verbose_json"
) -> Transcript:
"""
Transcribe audio file.

Args:
audio_path: Path to audio file (mp3, wav, etc.)
model: Whisper model to use
language: ISO-639-1 code or None for auto
response_format: Output format

Returns:
Transcript with segments and metadata
"""
...

def transcribe_with_diarization(
self,
audio_path: Path
) -> Transcript:
"""Transcribe with speaker identification."""
...

VisionAnalyzer

class VisionAnalyzer:
"""
Analyzes images using multimodal LLMs.
"""

SYSTEM_PROMPT = """
Analyze this video frame. Describe:
1. Main visual elements (UI components, diagrams, text)
2. Any code or technical content visible
3. The overall context/purpose shown
4. Relationship to surrounding content if known
"""

async def analyze_frame(
self,
frame_path: Path,
context: Optional[str] = None,
model: str = "gpt-4-vision-preview"
) -> ImageAnalysis:
"""
Analyze a video frame.

Args:
frame_path: Path to image file
context: Optional context from transcript
model: Vision model to use

Returns:
Structured analysis of the frame
"""
...

async def batch_analyze(
self,
frames: List[Path],
max_concurrent: int = 5
) -> List[ImageAnalysis]:
"""Analyze multiple frames with rate limiting."""
...

ArtifactGenerator

class ArtifactGenerator:
"""
Generates structured documentation artifacts.
"""

TEMPLATES = {
"sdd": "sdd_template.j2",
"tdd": "tdd_template.j2",
"adr": "adr_template.j2",
"c4": "c4_template.j2",
"summary": "summary_template.j2",
"outline": "outline_template.j2",
"glossary": "glossary_template.j2"
}

async def generate(
self,
artifact_type: str,
content: SynthesizedContent,
context: Dict[str, Any]
) -> Artifact:
"""
Generate a specific artifact type.

Args:
artifact_type: Type of artifact to generate
content: Synthesized content from pipeline
context: Additional context for generation

Returns:
Generated artifact with metadata
"""
template = self.load_template(artifact_type)
prompt = template.render(content=content, context=context)

response = await self.llm.complete(prompt)
return self.parse_artifact(response, artifact_type)

async def generate_all(
self,
content: SynthesizedContent
) -> ArtifactCollection:
"""Generate all artifact types."""
tasks = [
self.generate(t, content, {})
for t in self.TEMPLATES.keys()
]
artifacts = await asyncio.gather(*tasks)
return ArtifactCollection(artifacts)

5. Database Schema

5.1 SQLite Schema

-- Core tables for pipeline metadata

CREATE TABLE videos (
id TEXT PRIMARY KEY,
source_url TEXT NOT NULL,
title TEXT,
description TEXT,
duration_seconds INTEGER,
local_path TEXT NOT NULL,
downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE audio (
id TEXT PRIMARY KEY,
video_id TEXT REFERENCES videos(id),
format TEXT,
local_path TEXT NOT NULL,
bitrate INTEGER,
sample_rate INTEGER,
extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE transcripts (
id TEXT PRIMARY KEY,
audio_id TEXT REFERENCES audio(id),
full_text TEXT NOT NULL,
segments JSON, -- Array of {start, end, text, confidence}
language TEXT,
model TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE frames (
id TEXT PRIMARY KEY,
video_id TEXT REFERENCES videos(id),
sequence_number INTEGER,
timestamp REAL,
local_path TEXT NOT NULL,
perceptual_hash TEXT,
is_unique BOOLEAN DEFAULT TRUE,
similarity_score REAL,
extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE frame_analyses (
id TEXT PRIMARY KEY,
frame_id TEXT REFERENCES frames(id),
description TEXT,
detected_elements JSON,
ocr_text TEXT,
model TEXT,
analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE content_chunks (
id TEXT PRIMARY KEY,
transcript_id TEXT REFERENCES transcripts(id),
chunk_index INTEGER,
start_time REAL,
end_time REAL,
transcript_text TEXT,
frame_ids JSON, -- Array of frame IDs
summary TEXT,
embedding JSON -- Vector embedding for similarity search
);

CREATE TABLE artifacts (
id TEXT PRIMARY KEY,
chunk_id TEXT REFERENCES content_chunks(id),
artifact_type TEXT NOT NULL, -- sdd, tdd, adr, etc.
title TEXT,
local_path TEXT NOT NULL,
metadata JSON,
generated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Indexes for performance
CREATE INDEX idx_frames_video ON frames(video_id);
CREATE INDEX idx_frames_unique ON frames(video_id, is_unique);
CREATE INDEX idx_analyses_frame ON frame_analyses(frame_id);
CREATE INDEX idx_artifacts_type ON artifacts(artifact_type);
CREATE INDEX idx_chunks_transcript ON content_chunks(transcript_id);

6. Integration Patterns

6.1 External APIs

OpenAI API (Whisper)

# Rate limiting: 50 RPM for Whisper
async def transcribe_with_retry(
audio_path: Path,
max_retries: int = 3
) -> Transcript:
for attempt in range(max_retries):
try:
with open(audio_path, "rb") as f:
response = await openai.audio.transcriptions.create(
model="whisper-large-v3",
file=f,
response_format="verbose_json",
timestamp_granularities=["segment"]
)
return Transcript.from_openai(response)
except RateLimitError:
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
if attempt == max_retries - 1:
raise

Anthropic API (Claude Vision)

# Rate limiting: 40000 TPM for Claude 3.5 Sonnet
async def analyze_with_claude(
image_path: Path,
context: str
) -> str:
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode()

response = await anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4096,
messages=[{
"role": "user",
"content": [
{"type": "text", "text": context},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": image_data
}
}
]
}]
)
return response.content[0].text

Web Search (Serper)

async def expand_with_web_search(
query: str,
num_results: int = 5
) -> List[SearchResult]:
"""Expand content with web search results."""
async with aiohttp.ClientSession() as session:
async with session.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": SERPER_API_KEY},
json={"q": query, "num": num_results}
) as response:
data = await response.json()
return [
SearchResult(
title=r["title"],
snippet=r["snippet"],
url=r["link"]
)
for r in data["organic"]
]

7. Performance Considerations

7.1 Optimization Strategies

BottleneckOptimization
Video downloadParallel downloads, CDN selection
Frame extractionHardware acceleration (GPU)
DeduplicationStreaming algorithm, O(n) time
Vision analysisBatch processing, caching
LLM generationParallel requests, response streaming

7.2 Caching Strategy

# Redis/Memcached for API responses
@cache.memoize(timeout=3600)
async def analyze_frame_cached(frame_hash: str) -> ImageAnalysis:
"""Cache vision analysis by perceptual hash."""
...

# Disk cache for downloads
@diskcache.cache("downloads")
def download_video(url: str) -> Path:
...

7.3 Resource Limits

ResourceLimitAction on Exceed
Memory8GBStream processing
Disk100GBLRU cleanup
API callsRate limitsQueue + backoff
Concurrent10 workersSemaphore

8. Testing Strategy

8.1 Unit Tests

def test_deduplication():
"""Test perceptual deduplication algorithm."""
frames = load_test_frames()
unique = deduplicate_frames(frames, threshold=10)
assert len(unique) < len(frames)

def test_timestamp_alignment():
"""Test transcript-frame alignment."""
segments = [Segment(start=0, end=5, text="Hello")]
frames = [Frame(timestamp=2.5)]
aligned = align_content(segments, frames)
assert len(aligned) == 1
assert len(aligned[0].frames) == 1

8.2 Integration Tests

  • End-to-end pipeline with short video
  • API failure recovery
  • Rate limiting handling
  • Concurrent processing

8.3 Performance Tests

  • Benchmark deduplication with 1000+ frames
  • Measure transcription accuracy
  • LLM token usage optimization

9. Deployment

9.1 Docker Configuration

FROM python:3.11-slim

# Install FFmpeg
RUN apt-get update && apt-get install -y ffmpeg

# Install Python deps
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy application
COPY src/ /app/src/
COPY config/ /app/config/

WORKDIR /app
CMD ["python", "-m", "src.pipeline"]

9.2 Environment Variables

# APIs
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
SERPER_API_KEY=...

# Paths
OUTPUT_DIR=/data/outputs
TEMP_DIR=/tmp/videos

# Pipeline
MAX_WORKERS=10
DEDUP_THRESHOLD=10
FRAME_RATE=0.5

# Database
DATABASE_URL=sqlite:///data/pipeline.db

10. References

  1. Whisper API Documentation
  2. Claude Vision Guide
  3. FFmpeg Documentation
  4. Perceptual Hashing
  5. C4 Model