Technical Design Document: Video Analysis Pipeline Implementation
Version: 1.0
Date: 2026-01-19
Related: System Design Document v1.0
1. Implementation Overview
1.1 Technology Foundation
# Core dependencies with version pinning
dependencies = {
'video_processing': {
'yt-dlp': '>=2024.1.0',
'ffmpeg-python': '>=0.2.0',
'opencv-python': '>=4.8.0',
'pillow': '>=10.0.0'
},
'audio_transcription': {
'openai-whisper': '>=20231117', # or 'openai' for API
'pydub': '>=0.25.1',
'librosa': '>=0.10.0'
},
'ocr_vision': {
'pytesseract': '>=0.3.10',
'paddleocr': '>=2.7.0',
'anthropic': '>=0.40.0', # Claude Vision API
'openai': '>=1.0.0' # GPT-4V API
},
'llm_orchestration': {
'langgraph': '>=0.0.40',
'langchain': '>=0.1.0',
'langchain-anthropic': '>=0.1.0'
},
'data_management': {
'sqlalchemy': '>=2.0.0',
'pydantic': '>=2.0.0',
'aiosqlite': '>=0.19.0'
},
'utilities': {
'tenacity': '>=8.2.0', # Retry logic
'prometheus-client': '>=0.19.0', # Metrics
'python-json-logger': '>=2.0.0', # Structured logging
'rich': '>=13.0.0' # CLI progress/output
}
}
2. Core Data Structures
2.1 Domain Models
from pydantic import BaseModel, Field, HttpUrl
from typing import Optional, List, Dict, Any, Literal
from datetime import datetime
from pathlib import Path
from enum import Enum
class ProcessingStatus(str, Enum):
PENDING = "pending"
DOWNLOADING = "downloading"
EXTRACTING = "extracting"
TRANSCRIBING = "transcribing"
ANALYZING = "analyzing"
SYNTHESIZING = "synthesizing"
COMPLETE = "complete"
FAILED = "failed"
class VideoMetadata(BaseModel):
"""Core video information"""
video_id: str = Field(..., description="Unique identifier")
source_url: Optional[HttpUrl] = None
title: str
duration_seconds: float
resolution: str # e.g., "1920x1080"
fps: float
file_size_bytes: int
format: str # e.g., "mp4", "webm"
upload_date: Optional[datetime] = None
channel: Optional[str] = None
description: Optional[str] = None
chapters: List['Chapter'] = Field(default_factory=list)
closed_captions: bool = False
class Chapter(BaseModel):
"""Video chapter/segment"""
title: str
start_time: float # seconds
end_time: float
class AudioSegment(BaseModel):
"""Transcribed audio segment with timing"""
segment_id: int
start_time: float
end_time: float
text: str
confidence: float = Field(ge=0.0, le=1.0)
language: str = "en"
speaker_id: Optional[int] = None # For diarization
words: List['Word'] = Field(default_factory=list)
class Word(BaseModel):
"""Word-level transcription timing"""
word: str
start: float
end: float
probability: float
class ExtractedFrame(BaseModel):
"""Video frame with metadata"""
frame_id: str
timestamp: float # seconds from start
file_path: Path
width: int
height: int
extraction_method: Literal['scene_change', 'fixed_interval', 'slide_detection', 'text_density']
scene_change_score: Optional[float] = None
content_stability_score: Optional[float] = None
class FrameAnalysis(BaseModel):
"""AI analysis of a single frame"""
frame_id: str
timestamp: float
content_type: Literal['slide', 'diagram', 'person', 'text', 'scene', 'mixed']
confidence: float = Field(ge=0.0, le=1.0)
extracted_text: str = ""
ocr_confidence: Optional[float] = None
description: str
detected_objects: List[str] = Field(default_factory=list)
has_presentation_content: bool
slide_number: Optional[int] = None
key_points: List[str] = Field(default_factory=list)
class TopicSegment(BaseModel):
"""Identified topic with temporal bounds"""
topic_id: str
title: str
start_time: float
end_time: float
confidence: float
summary: str
key_terms: List[str]
related_frames: List[str] = Field(default_factory=list) # frame_ids
transcript_segments: List[int] = Field(default_factory=list) # segment_ids
class SynthesizedInsight(BaseModel):
"""Cross-modal insight combining audio and visual"""
insight_id: str
timestamp: float
insight_type: Literal['key_point', 'definition', 'example', 'transition', 'summary']
content: str
confidence: float
sources: Dict[str, List[str]] = Field(
default_factory=lambda: {'audio': [], 'visual': []}
)
related_topics: List[str] = Field(default_factory=list)
class ProcessingJob(BaseModel):
"""Complete processing job state"""
job_id: str
video_id: str
status: ProcessingStatus
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
metadata: VideoMetadata
audio_segments: List[AudioSegment] = Field(default_factory=list)
extracted_frames: List[ExtractedFrame] = Field(default_factory=list)
frame_analyses: Dict[str, FrameAnalysis] = Field(default_factory=dict)
topics: List[TopicSegment] = Field(default_factory=list)
insights: List[SynthesizedInsight] = Field(default_factory=list)
error_message: Optional[str] = None
processing_metrics: 'ProcessingMetrics' = Field(default_factory=lambda: ProcessingMetrics())
class ProcessingMetrics(BaseModel):
"""Performance and cost metrics"""
download_duration_seconds: float = 0.0
transcription_duration_seconds: float = 0.0
frame_extraction_duration_seconds: float = 0.0
analysis_duration_seconds: float = 0.0
synthesis_duration_seconds: float = 0.0
total_duration_seconds: float = 0.0
frames_extracted: int = 0
frames_analyzed: int = 0
transcript_word_count: int = 0
api_calls_vision: int = 0
api_calls_llm: int = 0
tokens_consumed_input: int = 0
tokens_consumed_output: int = 0
estimated_cost_usd: float = 0.0
disk_space_used_mb: float = 0.0
2.2 Configuration Models
class PipelineConfig(BaseModel):
"""Complete pipeline H.P.009-CONFIGuration"""
# Video download settings
download_format: str = "mp4"
max_resolution: str = "1080p"
download_subtitles: bool = True
# Audio extraction settings
audio_format: str = "wav"
audio_sample_rate: int = 16000 # Hz, required for Whisper
# Frame extraction settings
frame_sampling_strategies: List[str] = Field(
default_factory=lambda: ['scene_change', 'fixed_interval', 'slide_detection']
)
fixed_interval_seconds: float = 5.0
scene_change_threshold: float = 0.4
slide_stability_duration: float = 2.0
max_frames_per_video: int = 500
# Transcription settings
transcription_model: str = "whisper-large-v3" # or "openai-api"
transcription_language: Optional[str] = None # Auto-detect if None
enable_word_timestamps: bool = True
enable_speaker_diarization: bool = False
# OCR settings
ocr_engine: Literal['tesseract', 'paddle', 'cloud'] = 'paddle'
ocr_languages: List[str] = Field(default_factory=lambda: ['en'])
# Vision analysis settings
vision_model: Literal['gpt-4v', 'claude-vision', 'gemini-vision'] = 'claude-vision'
vision_batch_size: int = 5 # Frames per API call
vision_detail_level: Literal['low', 'high', 'auto'] = 'auto'
# LLM synthesis settings
synthesis_model: str = "claude-sonnet-4-5"
synthesis_temperature: float = 0.3
synthesis_max_tokens: int = 4000
# Output settings
output_format: List[str] = Field(
default_factory=lambda: ['markdown', 'json', 'timeline']
)
include_timestamps: bool = True
include_extracted_images: bool = True
# Performance settings
max_concurrent_tasks: int = 3
enable_checkpointing: bool = True
checkpoint_interval_seconds: int = 300
# Cost management
max_cost_per_video_usd: float = 5.0
enable_cost_estimation: bool = True
# Storage settings
temp_storage_path: Path = Path("/tmp/video-analysis")
output_storage_path: Path = Path("./output")
cleanup_temp_files: bool = True
retention_days: int = 7
3. Core Components Implementation
3.1 Video Downloader
import yt_dlp
from tenacity import retry, stop_after_attempt, wait_exponential
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class VideoDownloader:
"""Download and validate videos using yt-dlp"""
def __init__(self, H.P.009-CONFIG: PipelineConfig):
self.H.P.009-CONFIG = H.P.009-CONFIG
self.output_path = H.P.009-CONFIG.temp_storage_path / "downloads"
self.output_path.mkdir(parents=True, exist_ok=True)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
reraise=True
)
async def download(self, url: str, video_id: str) -> tuple[Path, VideoMetadata]:
"""
Download video and extract metadata
Returns:
(video_path, metadata): Downloaded file path and video metadata
"""
output_template = str(self.output_path / f"{video_id}.%(ext)s")
ydl_opts = {
'format': f'best[height<={self.H.P.009-CONFIG.max_resolution[:-1]}]',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'writesubtitles': self.H.P.009-CONFIG.download_subtitles,
'writeautomaticsub': self.H.P.009-CONFIG.download_subtitles,
'subtitleslangs': ['en'],
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': self.H.P.009-CONFIG.download_format,
}]
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Extract info without downloading first
info = ydl.extract_info(url, download=False)
# Validate duration
duration = info.get('duration', 0)
if duration > 7200: # 2 hours max
raise ValueError(f"Video too long: {duration}s (max 7200s)")
# Download
logger.info(f"Downloading video: {info.get('title', 'Unknown')}")
info = ydl.extract_info(url, download=True)
# Locate downloaded file
video_path = Path(ydl.prepare_filename(info))
if not video_path.exists():
# Handle extension change from postprocessor
video_path = video_path.with_suffix(f'.{self.H.P.009-CONFIG.download_format}')
if not video_path.exists():
raise FileNotFoundError(f"Downloaded file not found: {video_path}")
# Build metadata
metadata = VideoMetadata(
video_id=video_id,
source_url=url,
title=info.get('title', 'Unknown'),
duration_seconds=info.get('duration', 0),
resolution=f"{info.get('width', 0)}x{info.get('height', 0)}",
fps=info.get('fps', 30),
file_size_bytes=video_path.stat().st_size,
format=info.get('ext', self.H.P.009-CONFIG.download_format),
upload_date=self._parse_upload_date(info.get('upload_date')),
channel=info.get('channel', None),
description=info.get('description', None),
chapters=self._extract_chapters(info.get('chapters', [])),
closed_captions=bool(info.get('subtitles') or info.get('automatic_captions'))
)
logger.info(f"Downloaded successfully: {video_path} ({metadata.file_size_bytes / 1e6:.1f} MB)")
return video_path, metadata
except Exception as e:
logger.error(f"Download failed: {e}")
raise
def _parse_upload_date(self, date_str: Optional[str]) -> Optional[datetime]:
"""Parse yt-dlp date string (YYYYMMDD) to datetime"""
if not date_str:
return None
try:
return datetime.strptime(date_str, '%Y%m%d')
except ValueError:
return None
def _extract_chapters(self, chapters: List[Dict]) -> List[Chapter]:
"""Convert yt-dlp chapters to Chapter models"""
return [
Chapter(
title=ch.get('title', f"Chapter {i+1}"),
start_time=ch.get('start_time', 0),
end_time=ch.get('end_time', 0)
)
for i, ch in enumerate(chapters)
]
3.2 Audio Extraction & Transcription
import ffmpeg
import whisper
from typing import List
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AudioProcessor:
"""Extract and transcribe audio from video"""
def __init__(self, H.P.009-CONFIG: PipelineConfig):
self.H.P.009-CONFIG = H.P.009-CONFIG
self.whisper_model = None
self.executor = ThreadPoolExecutor(max_workers=1) # GPU serialization
def _load_whisper_model(self):
"""Lazy load Whisper model"""
if self.whisper_model is None:
model_name = self.H.P.009-CONFIG.transcription_model.replace('whisper-', '')
logger.info(f"Loading Whisper model: {model_name}")
self.whisper_model = whisper.load_model(model_name)
async def extract_audio(self, video_path: Path, video_id: str) -> Path:
"""
Extract audio track from video using ffmpeg
Returns:
audio_path: Path to extracted WAV file
"""
audio_path = self.H.P.009-CONFIG.temp_storage_path / "audio" / f"{video_id}.wav"
audio_path.parent.mkdir(parents=True, exist_ok=True)
try:
logger.info(f"Extracting audio from {video_path}")
stream = ffmpeg.input(str(video_path))
stream = ffmpeg.output(
stream,
str(audio_path),
acodec='pcm_s16le', # PCM 16-bit
ac=1, # Mono
ar=self.H.P.009-CONFIG.audio_sample_rate
)
# Run asynchronously
await asyncio.get_event_loop().run_in_executor(
self.executor,
lambda: ffmpeg.run(stream, overwrite_output=True, quiet=True)
)
logger.info(f"Audio extracted: {audio_path}")
return audio_path
except ffmpeg.Error as e:
logger.error(f"FFmpeg error: {e.stderr.decode()}")
raise
async def transcribe(self, audio_path: Path, metadata: VideoMetadata) -> List[AudioSegment]:
"""
Transcribe audio using Whisper
Returns:
segments: List of transcribed segments with timing
"""
self._load_whisper_model()
logger.info(f"Transcribing audio: {audio_path}")
# Run Whisper in executor (CPU/GPU intensive)
result = await asyncio.get_event_loop().run_in_executor(
self.executor,
lambda: self.whisper_model.transcribe(
str(audio_path),
language=self.H.P.009-CONFIG.transcription_language,
word_timestamps=self.H.P.009-CONFIG.enable_word_timestamps,
verbose=False
)
)
# Convert Whisper output to AudioSegment models
segments = []
for i, seg in enumerate(result['segments']):
words = []
if self.H.P.009-CONFIG.enable_word_timestamps and 'words' in seg:
words = [
Word(
word=w['word'].strip(),
start=w['start'],
end=w['end'],
probability=w.get('probability', 1.0)
)
for w in seg['words']
]
segments.append(AudioSegment(
segment_id=i,
start_time=seg['start'],
end_time=seg['end'],
text=seg['text'].strip(),
confidence=seg.get('avg_logprob', 0.0), # Convert to probability
language=result.get('language', 'en'),
words=words
))
logger.info(f"Transcribed {len(segments)} segments ({sum(len(s.text.split()) for s in segments)} words)")
return segments
3.3 Frame Extraction
import cv2
import numpy as np
from collections import deque
class FrameExtractor:
"""Extract frames using multiple sampling strategies"""
def __init__(self, H.P.009-CONFIG: PipelineConfig):
self.H.P.009-CONFIG = H.P.009-CONFIG
self.output_path = H.P.009-CONFIG.temp_storage_path / "frames"
self.output_path.mkdir(parents=True, exist_ok=True)
async def extract_frames(
self,
video_path: Path,
video_id: str,
metadata: VideoMetadata
) -> List[ExtractedFrame]:
"""
Extract frames using H.P.009-CONFIGured strategies
Returns:
frames: List of extracted frames with metadata
"""
all_frames = []
# Apply each strategy
for strategy in self.H.P.009-CONFIG.frame_sampling_strategies:
if strategy == 'scene_change':
frames = await self._extract_scene_changes(video_path, video_id, metadata)
elif strategy == 'fixed_interval':
frames = await self._extract_fixed_interval(video_path, video_id, metadata)
elif strategy == 'slide_detection':
frames = await self._extract_slide_content(video_path, video_id, metadata)
elif strategy == 'text_density':
frames = await self._extract_text_dense(video_path, video_id, metadata)
else:
logger.warning(f"Unknown strategy: {strategy}")
continue
all_frames.extend(frames)
# Deduplicate frames by timestamp (within 0.5s tolerance)
all_frames = self._deduplicate_frames(all_frames)
# Apply maximum frame limit
if len(all_frames) > self.H.P.009-CONFIG.max_frames_per_video:
logger.warning(f"Truncating frames: {len(all_frames)} -> {self.H.P.009-CONFIG.max_frames_per_video}")
all_frames = all_frames[:self.H.P.009-CONFIG.max_frames_per_video]
logger.info(f"Extracted {len(all_frames)} frames total")
return all_frames
async def _extract_scene_changes(
self,
video_path: Path,
video_id: str,
metadata: VideoMetadata
) -> List[ExtractedFrame]:
"""Extract frames at scene transitions using ffmpeg"""
frames = []
# Use ffmpeg select filter for scene detection
try:
probe = ffmpeg.probe(str(video_path), select_streams='v:0')
video_info = probe['streams'][0]
# Scene detection filter
scene_filter = f"select='gt(scene,{self.H.P.009-CONFIG.scene_change_threshold})'"
# Extract frames at scene changes
output_pattern = str(self.output_path / video_id / "scene_%04d.jpg")
Path(output_pattern).parent.mkdir(parents=True, exist_ok=True)
stream = ffmpeg.input(str(video_path))
stream = ffmpeg.filter(stream, 'select', scene_filter)
stream = ffmpeg.output(
stream,
output_pattern,
vsync='vfr', # Variable frame rate
q=2 # JPEG quality (2 = high)
)
await asyncio.get_event_loop().run_in_executor(
None,
lambda: ffmpeg.run(stream, overwrite_output=True, quiet=True)
)
# Collect extracted frames
frame_files = sorted(Path(output_pattern).parent.glob("scene_*.jpg"))
for i, frame_file in enumerate(frame_files):
# Estimate timestamp from frame position
timestamp = (i / len(frame_files)) * metadata.duration_seconds
img = cv2.imread(str(frame_file))
height, width = img.shape[:2]
frames.append(ExtractedFrame(
frame_id=f"{video_id}_scene_{i:04d}",
timestamp=timestamp,
file_path=frame_file,
width=width,
height=height,
extraction_method='scene_change',
scene_change_score=self.H.P.009-CONFIG.scene_change_threshold
))
logger.info(f"Extracted {len(frames)} scene change frames")
return frames
except Exception as e:
logger.error(f"Scene detection failed: {e}")
return []
async def _extract_fixed_interval(
self,
video_path: Path,
video_id: str,
metadata: VideoMetadata
) -> List[ExtractedFrame]:
"""Extract frames at fixed time intervals"""
frames = []
cap = cv2.VideoCapture(str(video_path))
try:
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * self.H.P.009-CONFIG.fixed_interval_seconds)
frame_number = 0
extracted_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_number % frame_interval == 0:
timestamp = frame_number / fps
# Save frame
frame_path = self.output_path / video_id / f"fixed_{extracted_count:04d}.jpg"
frame_path.parent.mkdir(parents=True, exist_ok=True)
cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
height, width = frame.shape[:2]
frames.append(ExtractedFrame(
frame_id=f"{video_id}_fixed_{extracted_count:04d}",
timestamp=timestamp,
file_path=frame_path,
width=width,
height=height,
extraction_method='fixed_interval'
))
extracted_count += 1
frame_number += 1
logger.info(f"Extracted {len(frames)} fixed-interval frames")
return frames
finally:
cap.release()
async def _extract_slide_content(
self,
video_path: Path,
video_id: str,
metadata: VideoMetadata
) -> List[ExtractedFrame]:
"""
Extract frames with stable content (likely slides)
Uses frame similarity to detect static content
"""
frames = []
cap = cv2.VideoCapture(str(video_path))
try:
fps = cap.get(cv2.CAP_PROP_FPS)
stability_frames = int(fps * self.H.P.009-CONFIG.slide_stability_duration)
frame_history = deque(maxlen=stability_frames)
frame_number = 0
last_extracted_frame = -1000
while True:
ret, frame = cap.read()
if not ret:
break
# Convert to grayscale and resize for comparison
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
small = cv2.resize(gray, (64, 64))
frame_history.append(small)
# Check if content has been stable
if len(frame_history) == stability_frames:
# Calculate variance across frames
frame_array = np.array(frame_history)
variance = np.std(frame_array, axis=0).mean()
# Low variance = stable content
if variance < 10.0: # Threshold for stability
# Avoid extracting frames too close together
if frame_number - last_extracted_frame > fps * 3: # 3 second gap
timestamp = frame_number / fps
frame_path = self.output_path / video_id / f"slide_{len(frames):04d}.jpg"
frame_path.parent.mkdir(parents=True, exist_ok=True)
cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
height, width = frame.shape[:2]
frames.append(ExtractedFrame(
frame_id=f"{video_id}_slide_{len(frames):04d}",
timestamp=timestamp,
file_path=frame_path,
width=width,
height=height,
extraction_method='slide_detection',
content_stability_score=float(variance)
))
last_extracted_frame = frame_number
frame_history.clear()
frame_number += 1
logger.info(f"Extracted {len(frames)} slide frames")
return frames
finally:
cap.release()
async def _extract_text_dense(
self,
video_path: Path,
video_id: str,
metadata: VideoMetadata
) -> List[ExtractedFrame]:
"""
Extract frames with high text density using edge detection
"""
frames = []
cap = cv2.VideoCapture(str(video_path))
try:
fps = cap.get(cv2.CAP_PROP_FPS)
sample_interval = int(fps * 2) # Sample every 2 seconds
frame_number = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_number % sample_interval == 0:
# Detect edges (text usually has high edge density)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)
edge_density = np.count_nonzero(edges) / edges.size
# High edge density suggests text/diagrams
if edge_density > 0.05: # 5% of pixels are edges
timestamp = frame_number / fps
frame_path = self.output_path / video_id / f"text_{len(frames):04d}.jpg"
frame_path.parent.mkdir(parents=True, exist_ok=True)
cv2.imwrite(str(frame_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
height, width = frame.shape[:2]
frames.append(ExtractedFrame(
frame_id=f"{video_id}_text_{len(frames):04d}",
timestamp=timestamp,
file_path=frame_path,
width=width,
height=height,
extraction_method='text_density'
))
frame_number += 1
logger.info(f"Extracted {len(frames)} text-dense frames")
return frames
finally:
cap.release()
def _deduplicate_frames(self, frames: List[ExtractedFrame]) -> List[ExtractedFrame]:
"""Remove duplicate frames based on timestamp proximity"""
if not frames:
return frames
# Sort by timestamp
frames = sorted(frames, key=lambda f: f.timestamp)
deduplicated = [frames[0]]
for frame in frames[1:]:
if abs(frame.timestamp - deduplicated[-1].timestamp) > 0.5: # 0.5s tolerance
deduplicated.append(frame)
return deduplicated
3.4 Vision Analysis with LLM
from anthropic import AsyncAnthropic
import base64
from typing import List, Dict
class VisionAnalyzer:
"""Analyze frames using vision-capable LLMs"""
def __init__(self, H.P.009-CONFIG: PipelineConfig):
self.H.P.009-CONFIG = H.P.009-CONFIG
self.client = AsyncAnthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
async def analyze_frames(
self,
frames: List[ExtractedFrame],
video_metadata: VideoMetadata
) -> Dict[str, FrameAnalysis]:
"""
Analyze frames in batches using Claude Vision
Returns:
frame_analyses: Dict mapping frame_id to FrameAnalysis
"""
analyses = {}
# Process in batches
for i in range(0, len(frames), self.H.P.009-CONFIG.vision_batch_size):
batch = frames[i:i + self.H.P.009-CONFIG.vision_batch_size]
logger.info(f"Analyzing frame batch {i // self.H.P.009-CONFIG.vision_batch_size + 1}/{(len(frames) + self.H.P.009-CONFIG.vision_batch_size - 1) // self.H.P.009-CONFIG.vision_batch_size}")
batch_analyses = await self._analyze_batch(batch, video_metadata)
analyses.update(batch_analyses)
return analyses
async def _analyze_batch(
self,
frames: List[ExtractedFrame],
video_metadata: VideoMetadata
) -> Dict[str, FrameAnalysis]:
"""Analyze a batch of frames in a single API call"""
# Prepare images
image_contents = []
for frame in frames:
with open(frame.file_path, 'rb') as f:
image_data = base64.standard_b64encode(f.read()).decode('utf-8')
image_contents.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": image_data
}
})
# Build analysis prompt
prompt = self._build_analysis_prompt(frames, video_metadata)
# Call Claude Vision API
try:
message = await self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4000,
temperature=0.3,
messages=[{
"role": "user",
"content": image_contents + [{
"type": "text",
"text": prompt
}]
}]
)
# Parse response
response_text = message.content[0].text
return self._parse_vision_response(response_text, frames)
except Exception as e:
logger.error(f"Vision analysis failed: {e}")
# Return empty analyses with error flag
return {
frame.frame_id: FrameAnalysis(
frame_id=frame.frame_id,
timestamp=frame.timestamp,
content_type='scene',
confidence=0.0,
description=f"Analysis failed: {str(e)}",
has_presentation_content=False
)
for frame in frames
}
def _build_analysis_prompt(
self,
frames: List[ExtractedFrame],
video_metadata: VideoMetadata
) -> str:
"""Build prompt for vision analysis"""
return f"""You are analyzing frames from a video titled: "{video_metadata.title}"
I've provided {len(frames)} frames extracted at different timestamps. For each frame, analyze:
1. **Content Type**: Categorize as one of: slide, diagram, person, text, scene, or mixed
2. **Text Content**: Extract all visible text (OCR)
3. **Description**: Describe what's shown in 1-2 sentences
4. **Presentation Content**: Boolean - does this look like presentation/educational material?
5. **Key Points**: If this is a slide or diagram, list the main points/concepts
Return your analysis as a JSON array with one object per frame:
```json
[
{{
"frame_number": 1,
"timestamp": {frames[0].timestamp},
"content_type": "slide|diagram|person|text|scene|mixed",
"confidence": 0.0-1.0,
"extracted_text": "All visible text...",
"description": "Brief description",
"has_presentation_content": true|false,
"slide_number": null or integer,
"key_points": ["point 1", "point 2"]
}}
]
Be thorough in text extraction. If you see bullet points, numbered lists, or formatted text, preserve the structure. """
def _parse_vision_response(
self,
response_text: str,
frames: List[ExtractedFrame]
) -> Dict[str, FrameAnalysis]:
"""Parse Claude's JSON response into FrameAnalysis objects"""
import json
import re
# Extract JSON from response (handle markdown fences)
json_match = re.search(r'```json\s*(\[.*?\])\s*```', response_text, re.DOTALL)
if json_match:
json_text = json_match.group(1)
else:
json_text = response_text
try:
analyses_data = json.loads(json_text)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse vision response: {e}")
return {}
analyses = {}
for i, (frame, data) in enumerate(zip(frames, analyses_data)):
analyses[frame.frame_id] = FrameAnalysis(
frame_id=frame.frame_id,
timestamp=frame.timestamp,
content_type=data.get('content_type', 'scene'),
confidence=data.get('confidence', 0.8),
extracted_text=data.get('extracted_text', ''),
description=data.get('description', ''),
has_presentation_content=data.get('has_presentation_content', False),
slide_number=data.get('slide_number'),
key_points=data.get('key_points', [])
)
return analyses
---
## 4. Multi-Agent Synthesis Orchestrator
```python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class AnalysisState(TypedDict):
"""Shared state for multi-agent analysis"""
video_metadata: VideoMetadata
audio_segments: List[AudioSegment]
frame_analyses: Dict[str, FrameAnalysis]
topics: Annotated[List[TopicSegment], operator.add]
insights: Annotated[List[SynthesizedInsight], operator.add]
current_task: str
errors: Annotated[List[str], operator.add]
class SynthesisOrchestrator:
"""Coordinate multi-agent analysis and synthesis"""
def __init__(self, H.P.009-CONFIG: PipelineConfig):
self.H.P.009-CONFIG = H.P.009-CONFIG
self.client = AsyncAnthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
self.graph = self._build_graph()
def _build_graph(self) -> StateGraph:
"""Build LangGraph workflow"""
workflow = StateGraph(AnalysisState)
# Define nodes
workflow.add_node("identify_topics", self._identify_topics)
workflow.add_node("extract_key_moments", self._extract_key_moments)
workflow.add_node("correlate_modalities", self._correlate_modalities)
workflow.add_node("generate_insights", self._generate_insights)
# Define edges
workflow.set_entry_point("identify_topics")
workflow.add_edge("identify_topics", "extract_key_moments")
workflow.add_edge("extract_key_moments", "correlate_modalities")
workflow.add_edge("correlate_modalities", "generate_insights")
workflow.add_edge("generate_insights", END)
return workflow.compile()
async def synthesize(
self,
metadata: VideoMetadata,
audio_segments: List[AudioSegment],
frame_analyses: Dict[str, FrameAnalysis]
) -> tuple[List[TopicSegment], List[SynthesizedInsight]]:
"""Run synthesis workflow"""
initial_state: AnalysisState = {
"video_metadata": metadata,
"audio_segments": audio_segments,
"frame_analyses": frame_analyses,
"topics": [],
"insights": [],
"current_task": "synthesis",
"errors": []
}
# Run graph
final_state = await self.graph.ainvoke(initial_state)
return final_state["topics"], final_state["insights"]
async def _identify_topics(self, state: AnalysisState) -> Dict:
"""Agent 1: Identify topics from transcript"""
logger.info("Identifying topics from transcript")
# Build transcript text
transcript_text = "\n".join([
f"[{seg.start_time:.1f}s] {seg.text}"
for seg in state["audio_segments"]
])
prompt = f"""Analyze this video transcript and identify distinct topics/sections.
Video: "{state['video_metadata'].title}"
Duration: {state['video_metadata'].duration_seconds}s
Transcript:
{transcript_text[:10000]} # Truncate if too long
Identify 3-10 distinct topics. For each topic:
1. Give it a clear, descriptive title
2. Note the start and end timestamps
3. Provide a 1-sentence summary
4. List 3-5 key terms
Return as JSON:
```json
[
{{
"title": "Topic Title",
"start_time": 0.0,
"end_time": 120.5,
"summary": "One sentence summary",
"key_terms": ["term1", "term2", "term3"]
}}
]
"""
message = await self.client.messages.create(
model=self.H.P.009-CONFIG.synthesis_model,
max_tokens=self.H.P.009-CONFIG.synthesis_max_tokens,
temperature=self.H.P.009-CONFIG.synthesis_temperature,
messages=[{"role": "user", "content": prompt}]
)
# Parse topics
topics = self._parse_topics_response(message.content[0].text, state)
return {"topics": topics}
async def _extract_key_moments(self, state: AnalysisState) -> Dict:
"""Agent 2: Identify key moments"""
logger.info("Extracting key moments")
# Find frames with presentation content
presentation_frames = [
(frame_id, analysis)
for frame_id, analysis in state["frame_analyses"].items()
if analysis.has_presentation_content
]
# Build summary of key frames
key_frames_summary = "\n".join([
f"[{analysis.timestamp:.1f}s] {analysis.content_type}: {analysis.description}"
for _, analysis in presentation_frames[:20] # Limit to top 20
])
prompt = f"""Identify KEY MOMENTS in this video that represent important concepts, definitions, examples, or transitions.
Video: "{state['video_metadata'].title}"
Presentation Frames Found: {key_frames_summary}
Topics Identified: {[t["title"] for t in state["topics"]]}
For each key moment, specify:
- Type: key_point, definition, example, transition, or summary
- Timestamp (from frame analysis)
- Content: What makes this moment important
- Related topics
Return as JSON array of key moments. """
message = await self.client.messages.create(
model=self.H.P.009-CONFIG.synthesis_model,
max_tokens=self.H.P.009-CONFIG.synthesis_max_tokens,
temperature=self.H.P.009-CONFIG.synthesis_temperature,
messages=[{"role": "user", "content": prompt}]
)
insights = self._parse_insights_response(message.content[0].text)
return {"insights": insights}
async def _correlate_modalities(self, state: AnalysisState) -> Dict:
"""Agent 3: Correlate audio and visual"""
logger.info("Correlating audio and visual content")
# Find temporal overlaps between transcript and frames
correlations = []
for frame_id, analysis in state["frame_analyses"].items():
if not analysis.has_presentation_content:
continue
# Find overlapping transcript segments
overlapping = [
seg for seg in state["audio_segments"]
if seg.start_time <= analysis.timestamp <= seg.end_time
]
if overlapping and analysis.extracted_text:
correlations.append({
"timestamp": analysis.timestamp,
"visual_text": analysis.extracted_text,
"audio_text": overlapping[0].text,
"frame_description": analysis.description
})
# Ask LLM to find meaningful connections
if correlations:
prompt = f"""Analyze these audio-visual correlations to find meaningful connections:
{json.dumps(correlations[:10], indent=2)}
Identify cases where:
- Speaker is referencing what's shown on screen
- Visual reinforces/contradicts audio
- New concept introduced (visual + audio together)
Return insights as JSON array. """
message = await self.client.messages.create(
model=self.H.P.009-CONFIG.synthesis_model,
max_tokens=self.H.P.009-CONFIG.synthesis_max_tokens,
temperature=self.H.P.009-CONFIG.synthesis_temperature,
messages=[{"role": "user", "content": prompt}]
)
correlation_insights = self._parse_insights_response(message.content[0].text)
return {"insights": correlation_insights}
return {"insights": []}
async def _generate_insights(self, state: AnalysisState) -> Dict:
"""Agent 4: Generate final synthesized insights"""
logger.info("Generating synthesized insights")
# Synthesize everything into coherent insights
prompt = f"""Create a comprehensive analysis of this video by synthesizing all findings.
Video: "{state['video_metadata'].title}"
Topics: {len(state['topics'])} Key Moments: {len(state['insights'])} Presentation Frames: {sum(1 for a in state['frame_analyses'].values() if a.has_presentation_content)}
Generate 5-10 high-level insights that capture:
- Main themes and concepts
- How topics flow and connect
- Notable teaching/presentation techniques
- Key takeaways
Return as JSON array of insight objects. """
message = await self.client.messages.create(
model=self.H.P.009-CONFIG.synthesis_model,
max_tokens=self.H.P.009-CONFIG.synthesis_max_tokens,
temperature=self.H.P.009-CONFIG.synthesis_temperature,
messages=[{"role": "user", "content": prompt}]
)
final_insights = self._parse_insights_response(message.content[0].text)
return {"insights": final_insights}
def _parse_topics_response(self, response_text: str, state: AnalysisState) -> List[TopicSegment]:
"""Parse topics from LLM response"""
import json
import re
json_match = re.search(r'```json\s*(\[.*?\])\s*```', response_text, re.DOTALL)
if json_match:
json_text = json_match.group(1)
else:
json_text = response_text
try:
topics_data = json.loads(json_text)
return [
TopicSegment(
topic_id=f"topic_{i}",
title=t["title"],
start_time=t["start_time"],
end_time=t["end_time"],
confidence=0.85,
summary=t["summary"],
key_terms=t["key_terms"]
)
for i, t in enumerate(topics_data)
]
except Exception as e:
logger.error(f"Failed to parse topics: {e}")
return []
def _parse_insights_response(self, response_text: str) -> List[SynthesizedInsight]:
"""Parse insights from LLM response"""
import json
import re
json_match = re.search(r'```json\s*(\[.*?\])\s*```', response_text, re.DOTALL)
if json_match:
json_text = json_match.group(1)
else:
json_text = response_text
try:
insights_data = json.loads(json_text)
return [
SynthesizedInsight(
insight_id=f"insight_{i}",
timestamp=ins.get("timestamp", 0.0),
insight_type=ins.get("type", "key_point"),
content=ins.get("content", ""),
confidence=0.8
)
for i, ins in enumerate(insights_data)
]
except Exception as e:
logger.error(f"Failed to parse insights: {e}")
return []
---
## 5. Output Generation
```python
from jinja2 import Template
import json
from pathlib import Path
class OutputGenerator:
"""Generate structured markdown and other output formats"""
def __init__(self, H.P.009-CONFIG: PipelineConfig):
self.H.P.009-CONFIG = H.P.009-CONFIG
self.output_path = H.P.009-CONFIG.output_storage_path
self.output_path.mkdir(parents=True, exist_ok=True)
async def generate_outputs(self, job: ProcessingJob) -> Dict[str, Path]:
"""Generate all H.P.009-CONFIGured output formats"""
outputs = {}
if 'markdown' in self.H.P.009-CONFIG.output_format:
md_path = await self.generate_markdown(job)
outputs['markdown'] = md_path
if 'json' in self.H.P.009-CONFIG.output_format:
json_path = await self.generate_json(job)
outputs['json'] = json_path
if 'timeline' in self.H.P.009-CONFIG.output_format:
timeline_path = await self.generate_timeline(job)
outputs['timeline'] = timeline_path
return outputs
async def generate_markdown(self, job: ProcessingJob) -> Path:
"""Generate structured markdown documentation"""
template_str = """# {{ metadata.title }}
**Video Analysis Report**
Generated: {{ timestamp }}
Duration: {{ metadata.duration_seconds | int }} seconds ({{ (metadata.duration_seconds / 60) | round(1) }} minutes)
Processing Time: {{ metrics.total_duration_seconds | round(1) }}s
Cost: ${{ metrics.estimated_cost_usd | round(2) }}
---
## 📊 Summary
{{ generate_summary(job) }}
---
## 📑 Table of Contents
{% for topic in topics %}
- [{{ topic.title }}](#{{ topic.title | lower | replace(' ', '-') }}) ({{ topic.start_time | format_time }} - {{ topic.end_time | format_time }})
{% endfor %}
---
## 🎯 Topics
{% for topic in topics %}
### {{ topic.title }}
**Time Range**: {{ topic.start_time | format_time }} - {{ topic.end_time | format_time }}
**Key Terms**: {{ topic.key_terms | join(', ') }}
{{ topic.summary }}
{% if topic.related_frames %}
**Related Visuals**:
{% for frame_id in topic.related_frames[:3] %}
{% set frame_analysis = frame_analyses.get(frame_id) %}
{% if frame_analysis %}
- 
- {{ frame_analysis.description }}
{% endif %}
{% endfor %}
{% endif %}
{% endfor %}
---
## 💡 Key Insights
{% for insight in insights %}
### {{ insight.insight_type | title }}: {{ insight.timestamp | format_time }}
{{ insight.content }}
*Confidence: {{ (insight.confidence * 100) | round(0) }}%*
{% endfor %}
---
## 📝 Full Transcript
{% for segment in audio_segments %}
**[{{ segment.start_time | format_time }}]** {{ segment.text }}
{% endfor %}
---
## 🖼️ Extracted Slides
{% for frame_id, analysis in frame_analyses.items() %}
{% if analysis.has_presentation_content %}
### Slide at {{ analysis.timestamp | format_time }}

{{ analysis.description }}
{% if analysis.extracted_text %}
**Extracted Text**:
{{ analysis.extracted_text }}
{% endif %}
{% if analysis.key_points %}
**Key Points**:
{% for point in analysis.key_points %}
- {{ point }}
{% endfor %}
{% endif %}
---
{% endif %}
{% endfor %}
## 📈 Processing Metrics
- **Frames Extracted**: {{ metrics.frames_extracted }}
- **Frames Analyzed**: {{ metrics.frames_analyzed }}
- **Transcript Words**: {{ metrics.transcript_word_count }}
- **API Calls (Vision)**: {{ metrics.api_calls_vision }}
- **API Calls (LLM)**: {{ metrics.api_calls_llm }}
- **Tokens Consumed**: {{ metrics.tokens_consumed_input | int }} input + {{ metrics.tokens_consumed_output | int }} output
- **Estimated Cost**: ${{ metrics.estimated_cost_usd | round(2) }}
- **Disk Space Used**: {{ metrics.disk_space_used_mb | round(1) }} MB
---
*Generated by Video Analysis Pipeline v1.0*
"""
# Custom filters
def format_time(seconds):
"""Format seconds as MM:SS"""
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes:02d}:{secs:02d}"
def generate_summary(job):
"""Generate executive summary"""
return f"""This {job.metadata.duration_seconds / 60:.1f}-minute video covers {len(job.topics)} main topics.
Key Topics:
{chr(10).join(f'- {t.title}' for t in job.topics[:5])}
The video includes {sum(1 for a in job.frame_analyses.values() if a.has_presentation_content)} slides/diagrams and {len(job.audio_segments)} transcript segments.
"""
# Render template
template = Template(template_str)
template.filters['format_time'] = format_time
markdown = template.render(
metadata=job.metadata,
topics=job.topics,
insights=job.insights,
audio_segments=job.audio_segments,
frame_analyses=job.frame_analyses,
metrics=job.processing_metrics,
timestamp=datetime.utcnow().isoformat(),
generate_summary=generate_summary,
job=job
)
# Save markdown
output_path = self.output_path / job.video_id / "README.md"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(markdown)
# Copy slides
slides_path = output_path.parent / "slides"
slides_path.mkdir(exist_ok=True)
for frame_id, analysis in job.frame_analyses.items():
if analysis.has_presentation_content:
source_frame = next(
(f for f in job.extracted_frames if f.frame_id == frame_id),
None
)
if source_frame:
dest_path = slides_path / f"{frame_id}.jpg"
shutil.copy2(source_frame.file_path, dest_path)
logger.info(f"Generated markdown: {output_path}")
return output_path
async def generate_json(self, job: ProcessingJob) -> Path:
"""Generate JSON export"""
output_path = self.output_path / job.video_id / "analysis.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
# Convert to dict (Pydantic's dict() method)
data = job.model_dump(mode='json')
# Write JSON
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.info(f"Generated JSON: {output_path}")
return output_path
async def generate_timeline(self, job: ProcessingJob) -> Path:
"""Generate interactive timeline (HTML)"""
# TODO: Implement HTML timeline visualization
# Could use libraries like: vis-timeline, timeline.js, or custom D3.js
pass
6. Main Pipeline Orchestrator
import asyncio
from datetime import datetime
import shutil
class VideoPipeline:
"""Main pipeline orchestrator"""
def __init__(self, H.P.009-CONFIG: PipelineConfig):
self.H.P.009-CONFIG = H.P.009-CONFIG
self.downloader = VideoDownloader(H.P.009-CONFIG)
self.audio_processor = AudioProcessor(H.P.009-CONFIG)
self.frame_extractor = FrameExtractor(H.P.009-CONFIG)
self.vision_analyzer = VisionAnalyzer(H.P.009-CONFIG)
self.synthesizer = SynthesisOrchestrator(H.P.009-CONFIG)
self.output_generator = OutputGenerator(H.P.009-CONFIG)
async def process_video(self, video_url: str) -> ProcessingJob:
"""
Main entry point: Process a video end-to-end
Returns:
job: Completed ProcessingJob with all analyses
"""
start_time = datetime.utcnow()
video_id = self._generate_video_id(video_url)
job = ProcessingJob(
job_id=f"job_{video_id}_{int(start_time.timestamp())}",
video_id=video_id,
status=ProcessingStatus.PENDING,
metadata=VideoMetadata(
video_id=video_id,
title="Unknown",
duration_seconds=0,
resolution="0x0",
fps=0,
file_size_bytes=0,
format="unknown"
)
)
try:
# Stage 1: Download
job.status = ProcessingStatus.DOWNLOADING
video_path, metadata = await self.downloader.download(video_url, video_id)
job.metadata = metadata
job.processing_metrics.download_duration_seconds = (
datetime.utcnow() - start_time
).total_seconds()
# Stage 2: Extract audio and frames
job.status = ProcessingStatus.EXTRACTING
extract_start = datetime.utcnow()
audio_path, frames = await asyncio.gather(
self.audio_processor.extract_audio(video_path, video_id),
self.frame_extractor.extract_frames(video_path, video_id, metadata)
)
job.extracted_frames = frames
job.processing_metrics.frame_extraction_duration_seconds = (
datetime.utcnow() - extract_start
).total_seconds()
job.processing_metrics.frames_extracted = len(frames)
# Stage 3: Transcribe audio
job.status = ProcessingStatus.TRANSCRIBING
transcribe_start = datetime.utcnow()
audio_segments = await self.audio_processor.transcribe(audio_path, metadata)
job.audio_segments = audio_segments
job.processing_metrics.transcription_duration_seconds = (
datetime.utcnow() - transcribe_start
).total_seconds()
job.processing_metrics.transcript_word_count = sum(
len(seg.text.split()) for seg in audio_segments
)
# Stage 4: Analyze frames
job.status = ProcessingStatus.ANALYZING
analyze_start = datetime.utcnow()
frame_analyses = await self.vision_analyzer.analyze_frames(frames, metadata)
job.frame_analyses = frame_analyses
job.processing_metrics.analysis_duration_seconds = (
datetime.utcnow() - analyze_start
).total_seconds()
job.processing_metrics.frames_analyzed = len(frame_analyses)
job.processing_metrics.api_calls_vision = (len(frames) + self.H.P.009-CONFIG.vision_batch_size - 1) // self.H.P.009-CONFIG.vision_batch_size
# Stage 5: Synthesize insights
job.status = ProcessingStatus.SYNTHESIZING
synth_start = datetime.utcnow()
topics, insights = await self.synthesizer.synthesize(
metadata, audio_segments, frame_analyses
)
job.topics = topics
job.insights = insights
job.processing_metrics.synthesis_duration_seconds = (
datetime.utcnow() - synth_start
).total_seconds()
# Calculate total metrics
job.processing_metrics.total_duration_seconds = (
datetime.utcnow() - start_time
).total_seconds()
# Estimate costs
job.processing_metrics.estimated_cost_usd = self._estimate_cost(job)
# Stage 6: Generate outputs
await self.output_generator.generate_outputs(job)
# Cleanup if H.P.009-CONFIGured
if self.H.P.009-CONFIG.cleanup_temp_files:
self._cleanup_temp_files(video_id)
job.status = ProcessingStatus.COMPLETE
job.updated_at = datetime.utcnow()
logger.info(f"Pipeline complete: {job.job_id} ({job.processing_metrics.total_duration_seconds:.1f}s)")
return job
except Exception as e:
job.status = ProcessingStatus.FAILED
job.error_message = str(e)
job.updated_at = datetime.utcnow()
logger.error(f"Pipeline failed: {e}")
raise
def _generate_video_id(self, video_url: str) -> str:
"""Generate unique video ID from URL"""
import hashlib
return hashlib.md5(video_url.encode()).hexdigest()[:12]
def _estimate_cost(self, job: ProcessingJob) -> float:
"""Estimate processing cost"""
cost = 0.0
# Transcription cost (if using API)
if 'api' in self.H.P.009-CONFIG.transcription_model:
cost += (job.metadata.duration_seconds / 60) * 0.006 # $0.006/min
# Vision cost
if self.H.P.009-CONFIG.vision_model == 'gpt-4v':
cost += job.processing_metrics.frames_analyzed * 0.005
elif self.H.P.009-CONFIG.vision_model == 'claude-vision':
cost += job.processing_metrics.frames_analyzed * 0.004
elif self.H.P.009-CONFIG.vision_model == 'gemini-vision':
cost += job.processing_metrics.frames_analyzed * 0.002
# LLM synthesis cost (rough estimate)
tokens_estimate = job.processing_metrics.transcript_word_count * 1.3 # Words to tokens
tokens_estimate += job.processing_metrics.frames_analyzed * 100 # Visual descriptions
if 'claude' in self.H.P.009-CONFIG.synthesis_model:
cost += (tokens_estimate / 1_000_000) * 3 # $3/M tokens
elif 'gpt-4' in self.H.P.009-CONFIG.synthesis_model:
cost += (tokens_estimate / 1_000_000) * 10 # $10/M tokens
return cost
def _cleanup_temp_files(self, video_id: str):
"""Remove temporary processing files"""
temp_paths = [
self.H.P.009-CONFIG.temp_storage_path / "downloads" / video_id,
self.H.P.009-CONFIG.temp_storage_path / "audio" / video_id,
self.H.P.009-CONFIG.temp_storage_path / "frames" / video_id
]
for path in temp_paths:
if path.exists():
shutil.rmtree(path)
logger.debug(f"Cleaned up: {path}")
# CLI Entry Point
async def main():
"""CLI interface for video processing"""
import argparse
parser = argparse.ArgumentParser(description="AI-Powered Video Analysis Pipeline")
parser.add_argument('video_url', help='YouTube URL or local video path')
parser.add_argument('--H.P.009-CONFIG', help='Config file path (JSON)', default=None)
parser.add_argument('--output', help='Output directory', default='./output')
args = parser.parse_args()
# Load H.P.009-CONFIG
if args.H.P.009-CONFIG:
with open(args.H.P.009-CONFIG) as f:
H.P.009-CONFIG_data = json.load(f)
H.P.009-CONFIG = PipelineConfig(**H.P.009-CONFIG_data)
else:
H.P.009-CONFIG = PipelineConfig(output_storage_path=Path(args.output))
# Run pipeline
pipeline = VideoPipeline(H.P.009-CONFIG)
try:
job = await pipeline.process_video(args.video_url)
print(f"\n✅ Processing complete!")
print(f"Output: {H.P.009-CONFIG.output_storage_path / job.video_id / 'README.md'}")
print(f"Cost: ${job.processing_metrics.estimated_cost_usd:.2f}")
print(f"Time: {job.processing_metrics.total_duration_seconds:.1f}s")
except Exception as e:
print(f"\n❌ Processing failed: {e}")
return 1
return 0
if __name__ == "__main__":
asyncio.run(main())
7. Testing Strategy
import pytest
from pathlib import Path
@pytest.fixture
def sample_video():
"""Provide sample test video"""
return Path("tests/fixtures/sample_lecture.mp4")
@pytest.fixture
def pipeline_H.P.009-CONFIG():
"""Test H.P.009-CONFIGuration"""
return PipelineConfig(
temp_storage_path=Path("/tmp/test-video-analysis"),
output_storage_path=Path("./test-output"),
max_frames_per_video=50,
cleanup_temp_files=True
)
@pytest.mark.asyncio
async def test_video_download(pipeline_H.P.009-CONFIG):
"""Test video download from YouTube"""
downloader = VideoDownloader(pipeline_H.P.009-CONFIG)
# Use a short test video
test_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" # Example
video_id = "test_video"
video_path, metadata = await downloader.download(test_url, video_id)
assert video_path.exists()
assert metadata.duration_seconds > 0
assert metadata.title != "Unknown"
@pytest.mark.asyncio
async def test_frame_extraction(sample_video, pipeline_H.P.009-CONFIG):
"""Test frame extraction strategies"""
extractor = FrameExtractor(pipeline_H.P.009-CONFIG)
metadata = VideoMetadata(
video_id="test",
title="Test Video",
duration_seconds=60,
resolution="1920x1080",
fps=30,
file_size_bytes=1000000,
format="mp4"
)
frames = await extractor.extract_frames(sample_video, "test", metadata)
assert len(frames) > 0
assert all(f.file_path.exists() for f in frames)
assert all(0 <= f.timestamp <= metadata.duration_seconds for f in frames)
@pytest.mark.asyncio
async def test_transcription(sample_video, pipeline_H.P.009-CONFIG):
"""Test audio transcription"""
processor = AudioProcessor(pipeline_H.P.009-CONFIG)
audio_path = await processor.extract_audio(sample_video, "test")
assert audio_path.exists()
metadata = VideoMetadata(
video_id="test",
title="Test",
duration_seconds=60,
resolution="1920x1080",
fps=30,
file_size_bytes=0,
format="mp4"
)
segments = await processor.transcribe(audio_path, metadata)
assert len(segments) > 0
assert all(seg.text for seg in segments)
assert all(seg.start_time <= seg.end_time for seg in segments)
@pytest.mark.asyncio
async def test_end_to_end_pipeline(sample_video, pipeline_H.P.009-CONFIG):
"""Test complete pipeline"""
pipeline = VideoPipeline(pipeline_H.P.009-CONFIG)
job = await pipeline.process_video(str(sample_video))
assert job.status == ProcessingStatus.COMPLETE
assert len(job.audio_segments) > 0
assert len(job.extracted_frames) > 0
assert len(job.frame_analyses) > 0
assert job.processing_metrics.total_duration_seconds > 0
# Check output files
output_dir = pipeline_H.P.009-CONFIG.output_storage_path / job.video_id
assert (output_dir / "README.md").exists()
assert (output_dir / "analysis.json").exists()
8. Deployment Configuration
# docker-compose.yml
version: '3.8'
services:
video-pipeline:
build: .
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./data:/data
- ./output:/output
command: python -m video_pipeline.main
redis:
image: redis:7-alpine
ports:
- "6379:6379"
worker:
build: .
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
command: celery -A video_pipeline.tasks worker --loglevel=info
deploy:
replicas: 3
# Dockerfile
FROM python:3.11-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
ffmpeg \
tesseract-ocr \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
CMD ["python", "-m", "video_pipeline.main"]
End of Technical Design Document