Skip to content

Parallel Pipeline Design Options

Two architectural approaches for parallelizing download → extract → filter stages.


Option 1: Per-Video Streaming Pipeline

Concept: Each video flows through download→extract→filter as a unit, multiple videos processed concurrently.

flowchart LR
    subgraph Parallel Workers
        A1[Video 1] --> B1[Download] --> C1[Extract] --> D1[Filter]
        A2[Video 2] --> B2[Download] --> C2[Extract] --> D2[Filter]
        A3[Video 3] --> B3[Download] --> C3[Extract] --> D3[Filter]
    end

    D1 --> E[Collect Filtered]
    D2 --> E
    D3 --> E
    E --> F[Cross-Video Dedup]
    F --> G[Detection]

Implementation Sketch

from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path

@dataclass
class VideoProcessResult:
    video_id: str
    filtered_dir: Path
    frame_count: int

class StreamingPipeline:
    def __init__(self, config, registry):
        self.config = config
        self.registry = registry
        self.downloader = YouTubeDownloader(config.download)
        self.extractor = FrameExtractor(config.extraction)
        # Shared filter model (thread-safe for inference)
        self.filter = FrameFilter(config.filter)

    def process_single_video(self, url: str) -> VideoProcessResult:
        """Process one video through download → extract → filter."""
        video_id = get_video_id(url)

        # Stage 1: Download
        download_result = self.downloader.download_single(url)
        if not download_result.success:
            return None

        # Stage 2: Extract frames
        extraction_result = self.extractor.extract_video(
            download_result.output_path, video_id
        )

        # Stage 3: Filter frames
        filter_result = self.filter.filter_frames(
            frame_paths=extraction_result.output_paths,
            classes=self.config.classes,
            video_id=video_id,
        )

        # Optional: Delete raw frames to save disk
        if self.config.cleanup_raw_frames:
            shutil.rmtree(extraction_result.output_dir)

        return VideoProcessResult(
            video_id=video_id,
            filtered_dir=self.config.filter.output_dir / video_id,
            frame_count=filter_result.passed_frames,
        )

    def run(self, urls: list[str], max_workers: int = 3):
        """Run parallel video processing."""
        results = []

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(self.process_single_video, url): url
                for url in urls
            }

            for future in as_completed(futures):
                result = future.result()
                if result:
                    results.append(result)
                    # Update registry per-video
                    self.registry.update_video(result.video_id, status="filtered")

        # Cross-video deduplication (after all videos filtered)
        all_filtered_frames = self._collect_filtered_frames(results)
        dedup_result = self.deduplicator.deduplicate_cross_video(all_filtered_frames)

        return dedup_result

Pros/Cons

Pros Cons
Simple mental model Limited by slowest stage
Easy error handling per-video Filter model loaded N times if not shared
Good for small batches Thread contention on GPU

Option 2: Stage-Level Async Queues (Folder-Based)

Concept: Separate workers for each stage, communicate via folder paths in queues. Each stage runs independently.

flowchart TB
    subgraph Download Workers
        DW1[Worker 1]
        DW2[Worker 2]
    end

    subgraph Extract Workers
        EW1[Worker 1]
        EW2[Worker 2]
    end

    subgraph Filter Workers
        FW1[Worker 1 - GPU]
    end

    Q1[(Download Queue)]
    Q2[(Extract Queue)]
    Q3[(Filter Queue)]
    Q4[(Dedup Queue)]

    URLs --> Q1
    Q1 --> DW1 & DW2
    DW1 & DW2 --> Q2
    Q2 --> EW1 & EW2
    EW1 & EW2 --> Q3
    Q3 --> FW1
    FW1 --> Q4
    Q4 --> Dedup[Cross-Video Dedup]

Implementation Sketch

import asyncio
from asyncio import Queue
from dataclasses import dataclass
from pathlib import Path

@dataclass
class StageMessage:
    video_id: str
    folder_path: Path  # Pass folder, not frames
    stage: str

class AsyncQueuePipeline:
    def __init__(self, config):
        self.config = config
        # Async queues between stages
        self.download_queue = Queue()   # Input: URLs
        self.extract_queue = Queue()    # Input: video paths
        self.filter_queue = Queue()     # Input: frame folders
        self.dedup_queue = Queue()      # Input: filtered folders

        self.shutdown = asyncio.Event()

    # =========== DOWNLOAD WORKER ===========
    async def download_worker(self, worker_id: int):
        """Download videos, output video file path to extract queue."""
        downloader = YouTubeDownloader(self.config.download)

        while not self.shutdown.is_set():
            try:
                url = await asyncio.wait_for(
                    self.download_queue.get(), timeout=1.0
                )
            except asyncio.TimeoutError:
                continue

            # Run blocking download in thread pool
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                None, downloader.download_single, url
            )

            if result.success:
                # Pass VIDEO FOLDER to next stage
                await self.extract_queue.put(StageMessage(
                    video_id=result.video_id,
                    folder_path=result.output_path,
                    stage="downloaded",
                ))

            self.download_queue.task_done()

    # =========== EXTRACT WORKER ===========
    async def extract_worker(self, worker_id: int):
        """Extract frames, output frames folder to filter queue."""
        extractor = FrameExtractor(self.config.extraction)

        while not self.shutdown.is_set():
            try:
                msg = await asyncio.wait_for(
                    self.extract_queue.get(), timeout=1.0
                )
            except asyncio.TimeoutError:
                continue

            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                None,
                extractor.extract_video,
                msg.folder_path,
                msg.video_id,
            )

            # Pass FRAMES FOLDER to next stage
            await self.filter_queue.put(StageMessage(
                video_id=msg.video_id,
                folder_path=result.output_dir,  # Folder of frames
                stage="extracted",
            ))

            self.extract_queue.task_done()

    # =========== FILTER WORKER (GPU) ===========
    async def filter_worker(self):
        """Filter frames using GPU. Single worker to avoid GPU contention."""
        filter_model = FrameFilter(self.config.filter)

        while not self.shutdown.is_set():
            try:
                msg = await asyncio.wait_for(
                    self.filter_queue.get(), timeout=1.0
                )
            except asyncio.TimeoutError:
                continue

            # Get all frame paths from folder
            frame_paths = sorted(msg.folder_path.glob("*.jpg"))

            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(
                None,
                filter_model.filter_frames,
                frame_paths,
                self.config.classes,
                msg.video_id,
            )

            # Pass FILTERED FOLDER to dedup
            await self.dedup_queue.put(StageMessage(
                video_id=msg.video_id,
                folder_path=self.config.filter.output_dir / msg.video_id,
                stage="filtered",
            ))

            # Optional: cleanup raw frames
            if self.config.cleanup_raw_frames:
                shutil.rmtree(msg.folder_path)

            self.filter_queue.task_done()

    # =========== MAIN RUNNER ===========
    async def run(self, urls: list[str]):
        """Run the full async pipeline."""
        # Seed the download queue
        for url in urls:
            await self.download_queue.put(url)

        # Start workers
        workers = [
            asyncio.create_task(self.download_worker(1)),
            asyncio.create_task(self.download_worker(2)),
            asyncio.create_task(self.extract_worker(1)),
            asyncio.create_task(self.extract_worker(2)),
            asyncio.create_task(self.filter_worker()),  # Single GPU worker
        ]

        # Wait for all queues to drain
        await self.download_queue.join()
        await self.extract_queue.join()
        await self.filter_queue.join()

        # Signal shutdown
        self.shutdown.set()
        await asyncio.gather(*workers, return_exceptions=True)

        # Collect all filtered folders for dedup
        filtered_folders = {}
        while not self.dedup_queue.empty():
            msg = self.dedup_queue.get_nowait()
            filtered_folders[msg.video_id] = list(msg.folder_path.glob("*.jpg"))

        # Final cross-video dedup
        deduplicator = Deduplicator(self.config.deduplication)
        return deduplicator.deduplicate_cross_video(filtered_folders)

Pros/Cons

Pros Cons
Maximum parallelism More complex error handling
Each stage scales independently Debugging harder
Natural backpressure via queues Need careful queue sizing
GPU worker isolation asyncio learning curve

Comparison Summary

Aspect Option 1 (Per-Video) Option 2 (Async Queues)
Complexity Low High
Parallelism Per-video Per-stage
GPU Utilization Moderate High (dedicated worker)
Memory Higher (multiple videos in flight) Controlled via queue size
Error Recovery Easy (per-video retry) Complex (stage isolation)
Best For < 50 videos 100+ videos

Recommendation

  • Start with Option 1 for simplicity
  • Migrate to Option 2 if you need to process 100+ videos or want better GPU utilization
  • Key insight: Pass folder paths between stages, not frame data, to minimize memory usage