Skip to content

Video Miner V3 - Detailed Code Walkthrough

A comprehensive guide to the architecture and code flow of Video Miner V3, a high-performance video mining pipeline for generating large-scale computer vision datasets from YouTube videos.

Pipeline Architecture


Table of Contents

  1. Project Overview
  2. Directory Structure
  3. Architecture Diagram
  4. Entry Points
  5. Configuration System
  6. Pipeline Orchestration
  7. Processing Modules
  8. ML Model Wrappers
  9. Video Registry System
  10. Utility Functions
  11. Data Flow

Project Overview

Video Miner V3 is designed to: - Search YouTube for videos by keyword - Download highest quality videos - Extract frames with configurable sampling - Filter frames using SigLIP2 semantic similarity - Deduplicate frames using DINOv2/v3 or SigLIP2 embeddings - Detect objects using open-set detection models


Directory Structure

video_miner_v3/
├── cli.py                 # Click CLI interface (509 lines)
├── config.py              # Pydantic configuration models (203 lines)
├── config_loader.py       # OmegaConf YAML loading (232 lines)
├── constants.py           # Centralized model IDs & defaults (91 lines)
├── pipeline.py            # Main pipeline orchestrator (688 lines)
├── registry.py            # Video tracking registry (420 lines)
├── search.py              # YouTube search via yt-dlp (281 lines)
├── models/
│   ├── base.py            # BaseModel class, shared utilities
│   ├── siglip_model.py    # SigLIP2 wrapper for filtering
│   ├── dinov3_model.py    # DINOv2/v3 wrapper for dedup
│   └── detector_models.py # Florence2, GroundingDINO, Moondream
├── modules/
│   ├── downloader.py      # YouTube video download
│   ├── frame_extractor.py # Frame extraction with PyAV
│   ├── frame_filter.py    # SigLIP-based filtering
│   ├── deduplicator.py    # Embedding-based dedup with FAISS
│   └── detector.py        # Object detection orchestrator
└── utils/
    ├── device.py          # CUDA/CPU device management
    ├── io.py              # File I/O, video ID extraction
    ├── validators.py      # Input validation
    └── query_generator.py # Query generation utilities

Entry Points

CLI (cli.py)

The main entry point is the video-miner CLI built with Click:

@click.group()
@click.option('--verbose', '-v', is_flag=True)
@click.pass_context
def main(ctx: click.Context, verbose: bool):
    """Video Miner v3 - High-performance video mining pipeline."""
    setup_logging(verbose)

Key Commands

Command Function Description
run-config run_config() Run pipeline from YAML config file(s)
validate-config validate_config_cmd() Validate YAML config without running
registry status registry_status() Show registry statistics
registry list registry_list() List videos in registry
registry export registry_export() Export URLs to file

Pipeline Execution Flow

flowchart TD
    A[run-config command] --> B[Load YAML with OmegaConf]
    B --> C[validate_config]
    C --> D{Valid?}
    D -->|No| E[Print errors & exit]
    D -->|Yes| F[_execute_pipeline]
    F --> G[gather_input_urls]
    G --> H[Create PipelineConfig]
    H --> I[VideoPipeline.run]

The _execute_pipeline() function: 1. Loads/creates the video registry 2. Executes optional search stage 3. Gathers URLs from config, files, or registry 4. Builds PipelineConfig with all stage configurations 5. Instantiates VideoPipeline and calls run()


Configuration System

Pydantic Models (config.py)

All configuration uses Pydantic for validation:

classDiagram
    PipelineConfig *-- DownloadConfig
    PipelineConfig *-- ExtractionConfig
    PipelineConfig *-- FilterConfig
    PipelineConfig *-- DeduplicationConfig
    PipelineConfig *-- DetectionConfig

    class PipelineConfig {
        +urls: list~str~
        +classes: list~str~
        +stages: list~str~
        +device: str
        +use_fp16: bool
        +output_dir: Path
        +get_urls() list~str~
    }

    class DownloadConfig {
        +force: bool
        +output_dir: Path
        +max_concurrent: int
        +timeout: int
    }

    class ExtractionConfig {
        +force: bool
        +output_dir: Path
        +strategy: SamplingStrategy
        +interval: int
        +max_frames: int
        +image_format: str
        +quality: int
    }

    class FilterConfig {
        +force: bool
        +threshold: float
        +model: FilterModel
        +batch_size: int
        +output_dir: Path
        +model_id() str
    }

    class DeduplicationConfig {
        +force: bool
        +threshold: float
        +use_siglip: bool
        +batch_size: int
        +output_dir: Path
        +dino_model_id: str
        +model_type() str
    }

    class DetectionConfig {
        +force: bool
        +detector: DetectorType
        +confidence_threshold: float
        +save_visualizations: bool
        +output_dir: Path
        +batch_size: int
    }

Enums

Enum Values Location
DetectorType dino-x, moondream3, florence2, grounding-dino config.py:35-40
SamplingStrategy interval, time, keyframe config.py:43-47
FilterModel siglip2-so400m, siglip2-giant config.py:50-53

OmegaConf Loader (config_loader.py)

Supports layered configuration merging:

def load_config(user_config, overrides, resolve=True):
    """
    Merge order (later overrides earlier):
    1. config/default.yaml (base defaults)
    2. user_config (user overrides)
    3. overrides dict (CLI/programmatic overrides)
    """

Key functions: - load_config() - Load and merge YAML configs - validate_config() - Validate required fields - print_config() - Pretty print configuration

Constants (constants.py)

Centralized source of truth for all model IDs and defaults:

Category Models Default
SigLIP2 siglip2-so400m, siglip2-giant siglip2-so400m
DINO dinov3-small/base/large/huge/giant, dinov2-base/large dinov2-base
Detectors dino-x, moondream3, florence2, grounding-dino moondream3

Default thresholds: - Filter: 0.25 - Dedup: 0.90 - Detection: 0.3


Pipeline Orchestration

VideoPipeline (pipeline.py)

The main orchestrator coordinates all processing stages:

flowchart LR
    subgraph Stage 1
        A[Download] --> B[DownloadResult]
    end
    subgraph Stage 2
        B --> C[Extract Frames]
        C --> D[ExtractionResult]
    end
    subgraph Stage 3
        D --> E[Filter Frames]
        E --> F[FilterResult]
    end
    subgraph Stage 4
        F --> G[Deduplicate]
        G --> H[DeduplicationResult]
    end
    subgraph Stage 5
        H --> I[Detection]
        I --> J[DetectionBatchResult]
    end
    J --> K[PipelineResult]

Constructor

class VideoPipeline:
    def __init__(self, config: PipelineConfig, registry: Optional["VideoRegistry"] = None):
        self.config = config
        self.registry = registry
        # Lazy-loaded modules
        self._downloader = None
        self._extractor = None
        self._frame_filter = None
        self._deduplicator = None
        self._detector = None

Main Run Method

VideoPipeline.run():

def run(self, show_progress: bool = True) -> PipelineResult:
    stages = self.config.stages  # e.g., ["download", "extract", "filter", "dedup", "detect"]

    if "download" in stages:
        download_results = self._run_download(show_progress)
        self._update_registry_downloads(download_results)

    if "extract" in stages:
        extraction_results = self._run_extraction(download_results, show_progress)
        self._update_registry_extractions(extraction_results)

    if "filter" in stages:
        filter_results = self._run_filter(extraction_results, show_progress)
        self._update_registry_filters(filter_results)

    if "dedup" in stages:
        dedup_result = self._run_deduplication(filter_results, show_progress)
        self._update_registry_deduplication(dedup_result)

    if "detect" in stages:
        detection_result = self._run_detection(dedup_result, show_progress)
        self._update_registry_detections(detection_result)

    return PipelineResult(...)

Registry-Aware Filtering

The pipeline uses _filter_by_registry() to skip already-processed videos:

def _filter_by_registry(self, items, get_video_id, stage_name, force=False):
    """Skip items that have already completed the stage in registry."""
    if force or self.registry is None:
        return items

    filtered = []
    for item in items:
        video_id = get_video_id(item)
        if not self.registry.is_stage_complete(video_id, stage_name):
            filtered.append(item)
    return filtered

Processing Modules

1. Downloader (downloader.py)

Downloads YouTube videos using yt-dlp with concurrent processing.

flowchart TD
    A[URL List] --> B[ThreadPoolExecutor]
    B --> C1[download_single]
    B --> C2[download_single]
    B --> C3[download_single]
    C1 --> D[yt-dlp]
    C2 --> D
    C3 --> D
    D --> E[DownloadResult]

Key Classes: - DownloadResult - Dataclass with url, video_id, success, output_path, title, duration - YouTubeDownloader - Main downloader class

Key Methods: - download_single() - Download one video - download_batch() - Concurrent batch download - gather_input_urls() - Collect URLs from config sources


2. Frame Extractor (frame_extractor.py)

Extracts frames from videos using PyAV with configurable sampling strategies.

Sampling Strategies:

Strategy Description
interval Every N frames (default: 30)
time Every N seconds
keyframe Scene change detection

Key Classes: - FrameInfo - Dataclass with video_path, video_id, frame_number, timestamp, image - ExtractionResult - Dataclass with frame_count, output_paths, output_dir - FrameExtractor - Main extractor class

Key Methods: - iterate_frames() - Generator yielding FrameInfo - extract_video() - Extract and save frames - extract_batch() - Concurrent batch extraction


3. Frame Filter (frame_filter.py)

Filters frames using SigLIP2 text-image similarity.

flowchart LR
    A[Frame Images] --> B[SigLIPModel]
    C[Text Classes] --> B
    B --> D{Score > Threshold?}
    D -->|Yes| E[Keep Frame]
    D -->|No| F[Discard]

Key Classes: - FilteredFrame - Dataclass with source_path, best_class, score, all_scores - FilterResult - Dataclass with total_frames, passed_frames, filtered_frames - FrameFilter - Main filter class

Key Methods: - filter_frames() - Filter single video frames - filter_batch() - Filter multiple videos


4. Deduplicator (deduplicator.py)

Removes duplicate frames using embedding-based similarity with FAISS.

Two-Phase Deduplication:

flowchart TD
    subgraph Phase1["Phase 1: Per-Video"]
        A[All Frames] --> B[Compute Embeddings]
        B --> C[Cosine Similarity]
        C --> D[Greedy Selection]
    end

    subgraph Phase2["Phase 2: Cross-Video"]
        D --> E[FAISS Index]
        E --> F[KNN Search]
        F --> G[Merge Duplicates]
    end

    G --> H[Unique Frames]

Algorithm Flow: 1. Phase 1 - Per-Video: Remove temporal duplicates within each video using greedy selection 2. Phase 2 - Cross-Video: Use FAISS ANN search to find and remove duplicates across all videos

Supported Models: - DINOv2/v3 (default) - Best quality embeddings - SigLIP2 - Memory-efficient, reuses filter model

Key Methods: - deduplicate() - Single batch deduplication - deduplicate_cross_video() - Two-phase cross-video dedup - _faiss_dedup() - FAISS-based O(N log N) dedup


5. Object Detector (detector.py)

Runs open-set object detection on frames.

Supported Detectors:

Detector Model ID Notes
Moondream3 vikhyatk/moondream2 VQA + detection
Florence-2 microsoft/Florence-2-large Multi-task
Grounding DINO IDEA-Research/grounding-dino-base Stable

Key Methods: - detect_single() - Detect in one image - detect_batch() - Batch detection with progress

Output: - annotations.json - COCO-format annotations - visualizations/ - Images with bounding boxes


ML Model Wrappers

Base Model (base.py)

Abstract base class providing common functionality:

class BaseModel(ABC):
    def __init__(self):
        self.model = None
        self.processor = None
        self._loaded = False

    @abstractmethod
    def load(self) -> None: pass

    def unload(self) -> None:
        del self.model, self.processor
        clear_gpu_cache()

    def __enter__(self): self.load(); return self
    def __exit__(self, *args): self.unload()

Utilities: - load_image() - Convert Path/np.array/PIL to RGB Image - create_batch_iterator() - Batch iterator with tqdm - load_model_with_fallback() - Try multiple model IDs


SigLIP Model (siglip_model.py)

Wrapper for Google's SigLIP2 for image-text similarity:

class SigLIPModel(BaseModel):
    def compute_similarity(self, images, texts, batch_size=16):
        """Returns (N_images, N_texts) similarity matrix."""
        # Precompute text features
        text_features = self.model.get_text_features(...)

        # Process images in batches
        for batch in create_batch_iterator(images, batch_size):
            image_features = self.model.get_image_features(...)
            logits = (image_features @ text_features.T) * self.model.logit_scale.exp()
            scores = torch.sigmoid(logits)  # SigLIP uses sigmoid

        return np.vstack(all_scores)

DINOv3 Model (dinov3_model.py)

Wrapper for Meta's DINOv2/v3 for image embeddings:

class DINOv3Model(BaseModel):
    def get_embeddings(self, images, batch_size=32, normalize=True):
        """Returns (N_images, embedding_dim) array."""
        for batch in create_batch_iterator(images, batch_size):
            outputs = self.model(**inputs)
            embeddings = outputs.pooler_output  # or last_hidden_state[:, 0]

        if normalize:
            embeddings /= np.linalg.norm(embeddings, axis=1, keepdims=True)
        return embeddings

Detector Models (detector_models.py)

Unified interface for multiple detection backends:

classDiagram
    BaseDetector <|-- Florence2Detector
    BaseDetector <|-- GroundingDINODetector
    BaseDetector <|-- MoondreamDetector

    class BaseDetector {
        <<abstract>>
        +device_map: str
        +model: Any
        +processor: Any
        +load()*
        +unload()*
        +detect(image, prompt, threshold)* DetectionResult
        -_load_image(image) Image
    }

    class Florence2Detector {
        +model_id: str
        +load()
        +detect(image, prompt, threshold) DetectionResult
    }

    class GroundingDINODetector {
        +model_id: str
        +load()
        +detect(image, prompt, threshold) DetectionResult
    }

    class MoondreamDetector {
        +model_id: str
        +_actual_model_id: str
        +load()
        +detect(image, prompt, threshold) DetectionResult
    }

Factory function:

def get_detector(detector_type: DetectorType, model_id: str, device_map: str):
    if detector_type == DetectorType.FLORENCE2:
        return Florence2Detector(model_id, device_map)
    elif detector_type == DetectorType.GROUNDING_DINO:
        return GroundingDINODetector(model_id, device_map)
    elif detector_type == DetectorType.MOONDREAM3:
        return MoondreamDetector(model_id, device_map)


Video Registry System

VideoRegistry (registry.py)

Pydantic-based YAML registry for tracking video processing status:

classDiagram
    VideoRegistry *-- RegistryMetadata
    VideoRegistry "1" *-- "*" VideoEntry : videos
    VideoEntry *-- PipelineStages
    PipelineStages *-- DownloadStage
    PipelineStages *-- ExtractionStage
    PipelineStages *-- FilterStage
    PipelineStages *-- DeduplicationStage
    PipelineStages *-- DetectionStage

    class VideoRegistry {
        +metadata: RegistryMetadata
        +videos: dict~str VideoEntry~
        +_lock: threading.Lock
        +add_video(video_id, url, title) bool
        +get_pending() list~VideoEntry~
        +get_by_status(status) list~VideoEntry~
        +update_stage(video_id, stage, data)
        +is_stage_complete(video_id, stage) bool
        +save(file_path)
        +load(file_path)$ VideoRegistry
        +load_or_create(file_path)$ VideoRegistry
    }

    class RegistryMetadata {
        +created: str
        +updated: str
        +total_videos: int
        +keywords_searched: list~str~
        +version: str
    }

    class VideoEntry {
        +video_id: str
        +url: str
        +title: str
        +channel: str
        +duration_seconds: int
        +source_keyword: str
        +status: VideoStatus
        +added: str
        +stages: PipelineStages
        +notes: str
        +is_processed() bool
        +get_summary() dict
    }

    class PipelineStages {
        +download: DownloadStage
        +extraction: ExtractionStage
        +filter: FilterStage
        +deduplication: DeduplicationStage
        +detection: DetectionStage
    }

    class DownloadStage {
        +completed: bool
        +path: str
        +size_mb: float
        +duration_seconds: float
        +error: str
    }

    class FilterStage {
        +completed: bool
        +input_frames: int
        +passed_frames: int
        +output_dir: str
        +pass_rate() float
    }

VideoStatus Enum:

class VideoStatus(str, Enum):
    PENDING = "pending"
    DOWNLOADING = "downloading"
    DOWNLOADED = "downloaded"
    EXTRACTING = "extracting"
    EXTRACTED = "extracted"
    FILTERING = "filtering"
    FILTERED = "filtered"
    DEDUPLICATING = "deduplicating"
    DEDUPLICATED = "deduplicated"
    DETECTING = "detecting"
    DETECTED = "detected"
    COMPLETE = "complete"
    FAILED = "failed"
    SKIPPED = "skipped"

Key Methods: - add_video() - Add video to registry - update_stage() - Update stage completion - get_pending() - Get unprocessed videos - save() - Thread-safe YAML save


Utility Functions

Device Management (device.py)

def resolve_device(device_map: str = "auto") -> str:
    """Resolve device_map for HuggingFace model loading."""
    num_gpus = torch.cuda.device_count()
    if device_map == "auto":
        if num_gpus > 1: return "auto"  # Multi-GPU
        elif num_gpus == 1: return "cuda"
        else: return "cpu"

def clear_gpu_cache():
    """Clear GPU cache to free memory."""
    torch.cuda.empty_cache()
    torch.cuda.synchronize()

I/O Utilities (io.py)

Function Description
ensure_dir() Create directory if not exists
save_json() / load_json() JSON file operations
get_video_id() Extract YouTube video ID from URL
get_safe_filename() Sanitize string for filesystem

Data Flow

Complete Pipeline Flow

flowchart TB
    subgraph Input
        A[YAML Config] --> B[load_config]
        C[URL File/List] --> D[gather_input_urls]
        E[Registry] --> D
    end

    subgraph Pipeline
        D --> F[YouTubeDownloader.download_batch]
        F --> G[FrameExtractor.extract_batch]
        G --> H[FrameFilter.filter_batch]
        H --> I[Deduplicator.deduplicate_cross_video]
        I --> J[ObjectDetector.detect_batch]
    end

    subgraph Models
        K[SigLIPModel] --> H
        L[DINOv3Model] --> I
        M[Detector Models] --> J
    end

    subgraph Output
        F --> N[videos/]
        G --> O[frames_raw/]
        H --> P[frames_filtered/]
        I --> Q[frames_deduplicated/]
        J --> R[detections/annotations.json]
        J --> S[detections/visualizations/]
    end

    subgraph Registry
        F --> T[Update download stage]
        G --> T
        H --> T
        I --> T
        J --> T
        T --> U[video_registry.yaml]
    end

Output Directory Structure

output/
├── video_registry.yaml        # Video tracking
├── videos/                    # Downloaded videos
│   └── {video_id}.mp4
├── frames_raw/{video_id}/     # All extracted frames
│   └── frame_00001.jpg
├── frames_filtered/{video_id}/ # Frames passing filter
│   └── frame_00042.jpg
├── frames_deduplicated/       # Unique frames
│   └── frame_00042.jpg
├── detections/
│   ├── annotations.json       # COCO-format
│   └── visualizations/        # Bounding box images
└── pipeline_result.json       # Summary statistics

Summary

Video Miner V3 is a well-architected pipeline with:

  • Clean separation of concerns - Each module handles one stage
  • Pydantic configuration - Type-safe, validated configs
  • Registry tracking - Resume capability, no reprocessing
  • Lazy model loading - Models loaded only when needed
  • FAISS deduplication - O(N log N) scalable dedup
  • Multiple detector backends - Florence2, GroundingDINO, Moondream
  • Multi-GPU support - HuggingFace device_map="auto"