Data Miner - Architecture & Code Walkthrough¶
A comprehensive guide to the architecture, design decisions, and code flow of Data Miner, a production-grade video mining pipeline for generating large-scale computer vision datasets from YouTube.
Table of Contents¶
- Executive Summary
- Architecture Overview
- Project Structure
- Database Architecture
- Worker System
- Processing Modules
- ML Model Wrappers
- Configuration System
- CLI Interface
- Data Flow
- Comparison with Earlier Versions
- Appendix A: Earlier Versions
Executive Summary¶
Data Miner is a PostgreSQL-backed, supervisor-managed video processing pipeline designed to:
- Search YouTube for videos by keywords/hashtags
- Download highest quality videos with rate limiting
- Extract frames using configurable sampling strategies
- Filter frames using SigLIP2 image-text similarity
- Deduplicate frames using DINOv3/SigLIP2 embeddings with FAISS
- Detect objects using open-set detection models (GroundingDINO, OWLv2)
Key Design Decisions¶
| Aspect | Current Design | Rationale |
|---|---|---|
| State Management | PostgreSQL with SQLModel | ACID guarantees, concurrent access, query flexibility |
| Worker Model | Long-running supervisor processes | Automatic restart, centralized logging, process isolation |
| Concurrency | Row-level locking with heartbeats | Prevents stale locks, supports worker crashes |
| Processing Scope | Central (Video) + Per-Project (ProjectVideo) | Reuse downloads across projects, project-specific filtering |
| Configuration | OmegaConf + Pydantic | YAML merging, validation, type safety |
Architecture Overview¶
flowchart TB
subgraph CLI["CLI (cli.py)"]
POPULATE[populate]
STATUS[status]
WORKERS[workers]
end
subgraph Database["PostgreSQL"]
PROJ[(projects)]
VID[(videos)]
PV[(project_videos)]
end
subgraph Workers["Supervisor-Managed Workers"]
DW[Download Worker]
EW[Extract Worker]
FW[Filter Worker]
CDW[Cross-Dedup Worker]
DTW[Detect Worker]
MON[Monitor Worker]
BKP[Backup Worker]
end
subgraph Modules["Processing Modules"]
DOWN[downloader.py]
EXT[frame_extractor.py]
FILT[frame_filter.py]
DEDUP[deduplicator.py]
DET[detector.py]
end
subgraph Models["ML Models"]
SIGLIP[SigLIPModel]
DINO[DINOv3Model]
GRDINO[GroundingDINO]
end
CLI --> Database
Workers --> Database
DW --> DOWN
EW --> EXT
FW --> FILT
CDW --> DEDUP
DTW --> DET
FILT --> SIGLIP
DEDUP --> DINO
DET --> GRDINO
Processing Pipeline¶
flowchart LR
subgraph Central["Central Pipeline (Video table)"]
direction LR
D[Download] --> E[Extract]
end
subgraph Project["Per-Project Pipeline (ProjectVideo table)"]
direction LR
F[Filter] --> DU[Cross-Dedup] --> DT[Detect]
end
style Central fill:#e1f5fe
style Project fill:#f3e5f5
E --> F
Project Structure¶
data_miner/
├── __init__.py # Package init
├── cli.py # Click CLI (813 lines)
├── logging.py # Loguru + Loki logging
│
├── config/ # Configuration system
│ ├── __init__.py # Exports all config getters
│ ├── config.py # Pydantic models (161 lines)
│ ├── constants.py # Enums, model IDs, status maps (162 lines)
│ ├── loader.py # OmegaConf loader with caching (232 lines)
│ ├── default.yaml # Default configuration
│ └── blocked_hashtags.txt # Hashtag blocklist
│
├── db/ # Database layer
│ ├── __init__.py
│ ├── connection.py # SQLModel engine + session
│ ├── models.py # Project, Video, ProjectVideo (110 lines)
│ └── operations.py # CRUD + claim/release functions (962 lines)
│
├── workers/ # Long-running worker processes
│ ├── __init__.py
│ ├── base.py # Base worker classes (368 lines)
│ ├── download.py # Download worker (66 lines)
│ ├── extract.py # Extraction worker
│ ├── filter.py # Filter worker (89 lines)
│ ├── dedup.py # Cross-dedup worker (128 lines)
│ ├── detect.py # Detection worker (111 lines)
│ ├── monitor.py # Project monitor (196 lines)
│ └── backup.py # Backup worker
│
├── modules/ # Core processing logic
│ ├── __init__.py
│ ├── downloader.py # yt-dlp wrapper
│ ├── frame_extractor.py # PyAV frame extraction
│ ├── frame_filter.py # SigLIP-based filtering (243 lines)
│ ├── deduplicator.py # FAISS-based dedup (470 lines)
│ └── detector.py # Object detection
│
├── models/ # ML model wrappers
│ ├── __init__.py
│ ├── base.py # BaseModel abstract class
│ ├── siglip_model.py # SigLIP2 wrapper
│ ├── dinov3_model.py # DINOv3 wrapper
│ └── detector_models.py # Detection model factory
│
└── utils/ # Utility functions
├── __init__.py
├── device.py # CUDA/CPU device management
├── io.py # File I/O utilities
├── validators.py # Input validation
├── query_generator.py # YouTube query generation
├── db_helper.py # Database utilities
├── migrate_registry.py # YAML → DB migration
└── ssh_helper.py # SSH for remote backup
Database Architecture¶
The database uses SQLModel (Pydantic + SQLAlchemy) with PostgreSQL.
Entity Relationship Diagram¶
erDiagram
PROJECT ||--o{ PROJECT_VIDEO : has
VIDEO ||--o{ PROJECT_VIDEO : referenced_by
PROJECT {
int project_id PK
string name UK
string output_dir
ProjectStatus project_stage
int extracted_frames
int filtered_frames
int unique_frames
string dedup_dir
string detect_dir
datetime created_at
}
VIDEO {
string video_id PK
string url
SourceType source_type
string source_info
string video_path
string frames_dir
int frame_count
string title
VideoStatus status
string error
string locked_by
datetime locked_at
datetime heartbeat_at
bool backed_up
datetime backed_up_at
datetime created_at
datetime updated_at
}
PROJECT_VIDEO {
int id PK
int project_id FK
string video_id FK
string filtered_dir
int passed_frames
ProjectVideoStatus status
string error
string locked_by
datetime locked_at
datetime heartbeat_at
datetime created_at
datetime updated_at
}
Status Enums¶
VideoStatus (Central Stages)¶
| Status | Description |
|---|---|
PENDING |
Ready for download |
DOWNLOADING |
Worker is downloading |
DOWNLOADED |
Download complete |
EXTRACTING |
Worker is extracting frames |
EXTRACTED |
Extraction complete |
FAILED |
Processing failed |
ProjectVideoStatus (Per-Project Filter Stage)¶
| Status | Description |
|---|---|
PENDING |
Ready for filtering |
FILTERING |
Worker is filtering |
FILTERED |
Filtering complete with frames |
FILTERED_EMPTY |
Filtering complete, no frames passed |
FAILED |
Processing failed |
ProjectStatus (Project-Level Stages)¶
| Status | Description |
|---|---|
POPULATING |
Videos being added/processed |
FILTERING |
Filter workers active |
DEDUP_READY |
All videos filtered, ready for cross-dedup |
DEDUPING |
Cross-dedup in progress |
DETECT_READY |
Cross-dedup done, ready for detection |
DETECTING |
Detection in progress |
COMPLETE |
Pipeline finished |
FAILED |
Pipeline failed |
Key Database Operations¶
Claim/Release Pattern¶
All workers use atomic FOR UPDATE SKIP LOCKED for concurrent safety:
# claim_next_video() in operations.py
stmt = (
select(Video)
.where(
and_(
Video.status == status,
or_(
Video.locked_by.is_(None),
Video.heartbeat_at < threshold # Stale lock recovery
)
)
)
.with_for_update(skip_locked=True)
.limit(1)
)
Heartbeat System¶
Workers update heartbeats every 30 seconds to prove liveness:
def update_video_heartbeat(session: Session, video_id: str, worker_id: str) -> bool:
"""Returns False if lock was lost (another worker completed this)."""
video = session.get(Video, video_id)
if video.locked_by != worker_id:
return False # Lost lock!
video.heartbeat_at = datetime.utcnow()
session.commit()
return True
Worker System¶
Worker Hierarchy¶
classDiagram
class _BaseWorker {
<<abstract>>
+stage_name: StageName
+poll_interval: float
+heartbeat_interval: float
+_init_project()
+_start_heartbeat(item_id)
+_stop_heartbeat()
+_heartbeat_loop()*
+run()*
}
class BaseVideoWorker {
<<abstract>>
+input_status()*
+in_progress_status()*
+output_status()*
+process(video: Video)*
+run()
}
class BaseProjectVideosWorker {
<<abstract>>
+input_status()*
+in_progress_status()*
+output_status()*
+process(pv: ProjectVideo, video: Video)*
+run()
}
class BaseProjectStageWorker {
<<abstract>>
+worker_name: str
+claim_project(session)*
+process(project: Project)*
+complete_project(session, project_id, result)*
+fail_project(session, project_id, error)*
+run()
}
_BaseWorker <|-- BaseVideoWorker
_BaseWorker <|-- BaseProjectVideosWorker
_BaseWorker <|-- BaseProjectStageWorker
BaseVideoWorker <|-- DownloadWorker
BaseVideoWorker <|-- ExtractWorker
BaseProjectVideosWorker <|-- FilterWorker
BaseProjectStageWorker <|-- CrossDedupWorker
BaseProjectStageWorker <|-- ProjectDetectWorker
Worker Types¶
| Worker | Base Class | Works On | Responsibility |
|---|---|---|---|
DownloadWorker |
BaseVideoWorker |
Video table | Download YouTube videos |
ExtractWorker |
BaseVideoWorker |
Video table | Extract frames from videos |
FilterWorker |
BaseProjectVideosWorker |
ProjectVideo table | Filter frames with SigLIP2 |
CrossDedupWorker |
BaseProjectStageWorker |
Project level | Cross-video deduplication |
ProjectDetectWorker |
BaseProjectStageWorker |
Project level | Object detection |
ProjectMonitorWorker |
Custom | N/A | Stage transitions, stale lock recovery |
BackupWorker |
Custom | Video table | rsync backup to remote |
Worker Lifecycle¶
sequenceDiagram
participant W as Worker
participant DB as PostgreSQL
participant M as Module
loop Main Loop
W->>DB: claim_next_video(status, worker_id)
alt No work available
W->>W: sleep(poll_interval)
else Work claimed
W->>W: _start_heartbeat(video_id)
W->>M: process(video)
alt Success
W->>DB: release_video(new_status, **updates)
else Failure
W->>DB: mark_video_failed(error)
end
W->>W: _stop_heartbeat()
end
end
Supervisor Integration¶
Workers are managed by supervisord:
[program:download]
command=/path/to/.venv/bin/python -m data_miner.workers.download --config %(ENV_CONFIG)s
numprocs=3
process_name=%(program_name)s_%(process_num)02d
autorestart=true
startsecs=5
stopwaitsecs=30
Generated via data-miner workers setup --config path/to/config.yaml
Processing Modules¶
1. Downloader (downloader.py)¶
Downloads YouTube videos using yt-dlp with configurable rate limiting.
Key Features:
- Format selection (max 1080p by default)
- Rate limiting (sleep intervals between downloads)
- Hashtag blocklist filtering
- Timeout handling
class YouTubeDownloader:
def download_single(self, url: str) -> DownloadResult:
"""Download a single video, returns path and metadata."""
2. Frame Extractor (frame_extractor.py)¶
Extracts frames from videos using PyAV.
Sampling Strategies:
| Strategy | Description |
|----------|-------------|
| interval | Every N frames (default: 30) |
| time | Every N seconds |
| keyframe | Scene change detection |
3. Frame Filter (frame_filter.py)¶
Filters frames based on SigLIP2 image-text similarity.
Two Modes:
- Positive-only: Frame passes if
max(positive_scores) > threshold - Positive + Negative: Also requires
max(positive) - max(negative) > margin
class FrameFilter:
def filter_frames(
self,
frame_paths: list[Path],
video_id: str = "unknown",
) -> FilterResult:
"""
Filter frames based on similarity to text prompts.
Returns FilterResult with passing frames.
"""
4. Deduplicator (deduplicator.py)¶
Removes duplicate frames using embedding-based similarity with FAISS.
Two-Phase Algorithm:
- Phase 1 - Per-Video: Remove temporal duplicates within each video using greedy selection
- Phase 2 - Cross-Video: Use FAISS ANN search to find and remove duplicates across all videos
flowchart LR
subgraph Phase1["Phase 1: Per-Video"]
A[Embeddings] --> B[Cosine Similarity]
B --> C[Greedy Selection]
end
subgraph Phase2["Phase 2: Cross-Video"]
C --> D[FAISS Index]
D --> E[KNN Search]
E --> F[Union-Find Merge]
end
F --> G[Unique Frames]
Supported Models:
- DINOv3 (default): Best quality embeddings
- SigLIP2: Memory-efficient, reuses filter model
5. Detector (detector.py)¶
Runs open-set object detection on frames.
Supported Detectors:
| Detector | Model | Notes |
|----------|-------|-------|
| GroundingDINO | IDEA-Research/grounding-dino-base | Text-guided detection |
| OWLv2 | google/owlv2-base-patch16-ensemble | Open-vocabulary |
ML Model Wrappers¶
Base Model Pattern¶
All ML models follow a consistent pattern:
class BaseModel(ABC):
def __init__(self):
self.model = None
self.processor = None
self._loaded = False
@abstractmethod
def load(self) -> None:
"""Load model to device."""
def unload(self) -> None:
"""Free GPU memory."""
del self.model, self.processor
torch.cuda.empty_cache()
def __enter__(self): self.load(); return self
def __exit__(self, *args): self.unload()
SigLIPModel (siglip_model.py)¶
Wrapper for Google's SigLIP2 for image-text similarity:
class SigLIPModel(BaseModel):
def compute_similarity(
self,
images: list,
texts: list[str],
batch_size: int = 16
) -> np.ndarray:
"""Returns (N_images, N_texts) similarity matrix."""
DINOv3Model (dinov3_model.py)¶
Wrapper for Meta's DINOv3/DINOv2 for image embeddings:
class DINOv3Model(BaseModel):
def get_embeddings(
self,
images: list,
batch_size: int = 32,
normalize: bool = True
) -> np.ndarray:
"""Returns (N_images, embedding_dim) array."""
Configuration System¶
Architecture¶
flowchart LR
A[default.yaml] --> C[OmegaConf Merge]
B[user.yaml] --> C
C --> D[lru_cache]
D --> E["get_*_config()"]
E --> F[Pydantic Model]
Pydantic Config Models¶
| Config | Key Fields |
|---|---|
DownloadConfig |
output_dir, format, max_resolution, sleep_interval, blocked_hashtag_patterns |
ExtractionConfig |
output_dir, strategy, interval_frames, max_frames_per_video |
FilterConfig |
output_dir, model_id, positive_prompts, negative_prompts, threshold, margin_threshold |
DeduplicationConfig |
output_dir, model_type, dino_model_id, threshold, k_neighbors |
DetectionConfig |
output_dir, detector, confidence_threshold, save_visualizations |
DatabaseConfig |
url |
SupervisorConfig |
download_workers, extract_workers, filter_workers, dedup_workers, detect_workers |
MonitorConfig |
poll_interval, stale_threshold_minutes, cleanup_extracted_videos |
BackupConfig |
enabled, remote_dest, delete_after_backup |
Config Loading¶
from data_miner.config import get_filter_config, get_download_config
# Automatically loads from DATA_MINER_CONFIG env var or default.yaml
config = get_filter_config()
print(config.positive_prompts) # Type-safe access
CLI Interface¶
Built with Click, the CLI provides commands for:
Core Commands¶
| Command | Description |
|---|---|
init-db |
Initialize database tables |
populate |
Add videos from config sources (search, URLs, files) |
status |
Show pipeline status |
add-video |
Add a single video URL |
Worker Management¶
| Command | Description |
|---|---|
workers setup |
Generate supervisor config |
workers start |
Start all workers |
workers stop |
Stop all workers |
workers restart |
Restart all workers |
workers status |
Show worker status |
Maintenance¶
| Command | Description |
|---|---|
delete-project |
Delete project and optionally files |
delete-videos |
Delete videos with filters |
cleanup-orphans |
Remove orphaned videos |
force-dedup |
Re-run cross-dedup for project |
force-detect |
Re-run detection for project |
Data Flow¶
Complete Pipeline Flow¶
flowchart TB
subgraph Input["Input Sources"]
YAML[Config YAML] --> POP[populate command]
SEARCH[YouTube Search] --> POP
URLS[URL Files] --> POP
end
subgraph Central["Central Processing"]
POP --> VID[(Video Table)]
VID --> DL[Download Worker]
DL --> EX[Extract Worker]
EX --> VID2[(Video: EXTRACTED)]
end
subgraph Project["Per-Project Processing"]
VID2 --> PV[(ProjectVideo Table)]
PV --> FIL[Filter Worker]
FIL --> PV2[(ProjectVideo: FILTERED)]
PV2 --> MON[Monitor: DEDUP_READY]
MON --> DEDUP[Cross-Dedup Worker]
DEDUP --> DET[Detect Worker]
end
subgraph Output["Outputs"]
DL --> VIDS[videos/]
EX --> RAW[frames_raw/]
FIL --> FILT[frames_filtered/]
DEDUP --> UNIQ[frames_dedup/]
DET --> ANN[detections/annotations.json]
end
Output Directory Structure¶
output/
├── videos/ # Downloaded videos (optional deletion)
│ └── {video_id}.mp4
├── frames_raw/{video_id}/ # All extracted frames
│ └── frame_{N:05d}.jpg
├── frames_filtered/{video_id}/ # Frames passing filter
│ └── frame_{N:05d}.jpg
├── frames_dedup/ # Unique frames (flat)
│ └── {video_id}_frame_{N:05d}.jpg
└── detections/
├── annotations.json # COCO-format annotations
└── visualizations/ # Bounding box images
Comparison with Earlier Versions¶
Key Differences: Current vs V3 (Earlier)¶
| Aspect | Current Version | V3 (Earlier) |
|---|---|---|
| State Storage | PostgreSQL database | YAML file (video_registry.yaml) |
| Worker Model | Supervisor-managed long-running processes | Single CLI command with sequential stages |
| Concurrency | Row-level locking with heartbeats | Thread locks in-memory |
| Multi-Project | Native support (ProjectVideo table) | Single registry per run |
| Scalability | Horizontal (multiple workers) | Vertical (single machine) |
| Fault Tolerance | Worker restart, stale lock recovery | Manual restart required |
| Config | OmegaConf with caching | OmegaConf without caching |
| Status Tracking | Database queries | YAML file reads |
Architecture Evolution¶
timeline
title Data Miner Evolution
V3 : File-based registry
: Sequential pipeline
: Single threaded
: CLI-driven execution
Current : PostgreSQL backend
: Supervisor-managed workers
: Concurrent processing
: Heartbeat-based locking
: Project-level isolation
Migration Benefits¶
- Reliability: Automatic worker restart on failure
- Scalability: Multiple workers for each stage
- Visibility: Database queries for status monitoring
- Isolation: Per-project filtering without re-downloading
- Recovery: Stale lock detection and reset
Appendix A: Earlier Versions¶
V3 Architecture (File-Based)¶
The earlier V3 version used a different architecture:
Key Components¶
- Video Registry (
registry.py): Pydantic-based YAML registry tracking video processing status - Pipeline Orchestrator (
pipeline.py): Sequential stage execution with lazy model loading - CLI Commands:
run-config,validate-config, registry management
Registry Structure¶
metadata:
created: "2024-01-01T00:00:00"
version: "3.0"
videos:
dQw4w9WgXcQ:
video_id: dQw4w9WgXcQ
url: https://youtube.com/watch?v=dQw4w9WgXcQ
status: filtered
stages:
download:
completed: true
path: output/videos/dQw4w9WgXcQ.mp4
extraction:
completed: true
frame_count: 450
Proposed Async Enhancement¶
The original async pipeline implementation plan proposed an async evolution:
asyncioevent loop for orchestrationThreadPoolExecutorfor blocking operationsasyncio.Queuefor stage communicationBaseStageWorkerabstract class
This design influenced the current worker architecture but was adapted for database-backed state management instead of in-memory queues.
Parallel Pipeline Options¶
Two approaches were considered in the parallel pipeline design:
- Per-Video Streaming: Each video flows through all stages concurrently
- Stage-Level Async Queues: Separate workers per stage with folder-based message passing
The current implementation adopted Option 2 (stage-level workers) with database-backed coordination instead of async queues.
Summary¶
Data Miner is a production-grade video mining pipeline featuring:
- PostgreSQL-backed state management with SQLModel for type safety
- Supervisor-managed workers for reliability and scalability
- Heartbeat-based locking for concurrent worker safety
- Project-level isolation enabling reuse of downloads across projects
- Two-phase deduplication with FAISS for O(N log N) scalability
- Flexible configuration with OmegaConf + Pydantic validation
- Multiple ML backends for filtering (SigLIP2) and detection (GroundingDINO, OWLv2)