Worker System¶
Data Miner uses supervisor-managed long-running worker processes.
Worker Hierarchy¶
classDiagram
class _BaseWorker {
<<abstract>>
+stage_name: StageName
+poll_interval: float
+heartbeat_interval: float
+run()*
}
class BaseVideoWorker {
<<abstract>>
+process(video: Video)*
}
class BaseProjectVideosWorker {
<<abstract>>
+process(pv: ProjectVideo, video: Video)*
}
class BaseProjectStageWorker {
<<abstract>>
+claim_project(session)*
+process(project)*
+complete_project()*
}
_BaseWorker <|-- BaseVideoWorker
_BaseWorker <|-- BaseProjectVideosWorker
_BaseWorker <|-- BaseProjectStageWorker
BaseVideoWorker <|-- DownloadWorker
BaseVideoWorker <|-- ExtractWorker
BaseProjectVideosWorker <|-- FilterWorker
BaseProjectStageWorker <|-- CrossDedupWorker
BaseProjectStageWorker <|-- DetectWorker
Worker Types¶
| Worker | Base Class | Table | Responsibility |
|---|---|---|---|
DownloadWorker |
BaseVideoWorker |
Video | Download YouTube videos |
ExtractWorker |
BaseVideoWorker |
Video | Extract frames |
FilterWorker |
BaseProjectVideosWorker |
ProjectVideo | Filter with SigLIP2 |
CrossDedupWorker |
BaseProjectStageWorker |
Project | Cross-video dedup |
DetectWorker |
BaseProjectStageWorker |
Project | Object detection |
MonitorWorker |
Custom | — | Stage transitions |
BackupWorker |
Custom | Video | Remote backup |
Worker Lifecycle¶
sequenceDiagram
participant W as Worker
participant DB as PostgreSQL
participant M as Module
loop Main Loop
W->>DB: claim_next_item()
alt No work
W->>W: sleep(poll_interval)
else Work claimed
W->>W: start_heartbeat()
W->>M: process()
W->>DB: release_item()
W->>W: stop_heartbeat()
end
end
Supervisor Configuration¶
Workers are managed by supervisord. Generated via:
Creates /etc/supervisor/conf.d/data_miner.conf:
[program:download]
command=/path/.venv/bin/python -m data_miner.workers.download
numprocs=3
process_name=%(program_name)s_%(process_num)02d
autorestart=true
startsecs=5
stopwaitsecs=30
Heartbeat System¶
Workers update heartbeat timestamps every 30 seconds to prove liveness:
def _heartbeat_loop(self):
while not self._stop_event.wait(30):
with get_session() as session:
still_owner = update_heartbeat(session, item_id, worker_id)
if not still_owner:
os._exit(1) # Supervisor restarts
Stale Lock Recovery: The monitor worker resets locks older than stale_threshold_minutes.
Implementing Custom Workers¶
Per-Video Worker¶
from data_miner.workers.base import BaseVideoWorker
from data_miner.config import StageName
class MyWorker(BaseVideoWorker):
stage_name = StageName.DOWNLOAD # Define stage
def process(self, video: Video) -> dict:
# Do work...
return {"video_path": "/path/to/video.mp4"}
Project-Level Worker¶
from data_miner.workers.base import BaseProjectStageWorker
class MyProjectWorker(BaseProjectStageWorker):
worker_name = "my-stage"
def claim_project(self, session):
# Claim project with specific status
pass
def process(self, project):
# Process entire project
return {"unique_frames": 500}
def complete_project(self, session, project_id, result):
# Update project status
pass
Related Docs¶
- Database Models - Table schemas
- Architecture Overview - Full system design