Files
ArchiveBox/archivebox/workers/orchestrator.py

781 lines
32 KiB
Python

"""
Orchestrator for managing worker processes.
The Orchestrator polls the Crawl queue and spawns CrawlWorkers as needed.
Architecture:
Orchestrator (polls Crawl queue)
└── CrawlWorker(s) (one per active Crawl)
└── SnapshotWorker(s) (one per Snapshot, up to limit)
└── Hook Processes (sequential, forked by SnapshotWorker)
Usage:
# Default: runs forever (for use as subprocess of server)
orchestrator = Orchestrator(exit_on_idle=False)
orchestrator.runloop()
# Exit when done (for embedded use in other commands)
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
# Or run via CLI
archivebox manage orchestrator # runs forever
archivebox manage orchestrator --exit-on-idle # exits when done
"""
__package__ = 'archivebox.workers'
import os
import time
from typing import Type
from datetime import timedelta
from multiprocessing import Process as MPProcess
from django.utils import timezone
from rich import print
from archivebox.misc.logging_util import log_worker_event
from .worker import Worker, BinaryWorker, CrawlWorker
def _run_orchestrator_process(exit_on_idle: bool) -> None:
"""Top-level function for multiprocessing (must be picklable)."""
from archivebox.config.django import setup_django
setup_django()
orchestrator = Orchestrator(exit_on_idle=exit_on_idle)
orchestrator.runloop()
class Orchestrator:
"""
Manages worker processes by polling queues and spawning workers as needed.
The orchestrator:
1. Polls Crawl queue
2. If crawls exist and fewer than MAX_CRAWL_WORKERS are running, spawns CrawlWorkers
3. Monitors worker health and cleans up stale PIDs
4. Exits when queue is empty (unless daemon mode)
Architecture:
- Orchestrator spawns CrawlWorkers (one per active Crawl)
- Each CrawlWorker spawns SnapshotWorkers (one per Snapshot, up to limit)
- Each SnapshotWorker runs hooks sequentially for its snapshot
"""
# BinaryWorker (singleton daemon) and CrawlWorker - SnapshotWorkers are spawned by CrawlWorker subprocess, not by Orchestrator
WORKER_TYPES: list[Type[Worker]] = [BinaryWorker, CrawlWorker]
# Configuration
POLL_INTERVAL: float = 2.0 # How often to check for new work (seconds)
IDLE_TIMEOUT: int = 3 # Exit after N idle ticks (0 = never exit)
MAX_CRAWL_WORKERS: int = 8 # Max crawls processing simultaneously
MAX_BINARY_WORKERS: int = 1 # Max binaries installing simultaneously (sequential only)
def __init__(self, exit_on_idle: bool = True, crawl_id: str | None = None):
self.exit_on_idle = exit_on_idle
self.crawl_id = crawl_id # If set, only process work for this crawl
self.pid: int = os.getpid()
self.pid_file = None
self.idle_count: int = 0
self._last_cleanup_time: float = 0.0 # For throttling cleanup_stale_running()
# In foreground mode (exit_on_idle=True), limit to 1 CrawlWorker
if self.exit_on_idle:
self.MAX_CRAWL_WORKERS = 1
def __repr__(self) -> str:
return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]'
@classmethod
def is_running(cls) -> bool:
"""Check if an orchestrator is already running."""
from archivebox.machine.models import Process
# Clean up stale processes before counting
Process.cleanup_stale_running()
return Process.get_running_count(process_type=Process.TypeChoices.ORCHESTRATOR) > 0
def on_startup(self) -> None:
"""Called when orchestrator starts."""
from archivebox.machine.models import Process
self.pid = os.getpid()
# Register orchestrator process in database with explicit type
self.db_process = Process.current()
# Ensure the process type is correctly set to ORCHESTRATOR
if self.db_process.process_type != Process.TypeChoices.ORCHESTRATOR:
self.db_process.process_type = Process.TypeChoices.ORCHESTRATOR
self.db_process.save(update_fields=['process_type'])
# Clean up any stale Process records from previous runs
stale_count = Process.cleanup_stale_running()
# Clean up orphaned Chrome processes from previous crashes
chrome_count = Process.cleanup_orphaned_chrome()
# Collect startup metadata
metadata = {
'max_crawl_workers': self.MAX_CRAWL_WORKERS,
'poll_interval': self.POLL_INTERVAL,
}
if stale_count:
metadata['cleaned_stale_pids'] = stale_count
if chrome_count:
metadata['cleaned_orphaned_chrome'] = chrome_count
log_worker_event(
worker_type='Orchestrator',
event='Starting...',
indent_level=0,
pid=self.pid,
metadata=metadata,
)
def terminate_all_workers(self) -> None:
"""Terminate all running worker processes."""
from archivebox.machine.models import Process
import signal
# Get all running worker processes
running_workers = Process.objects.filter(
process_type=Process.TypeChoices.WORKER,
status__in=['running', 'started']
)
for worker_process in running_workers:
try:
# Send SIGTERM to gracefully terminate the worker
os.kill(worker_process.pid, signal.SIGTERM)
except ProcessLookupError:
# Process already dead
pass
except Exception:
# Ignore other errors during shutdown
pass
def on_shutdown(self, error: BaseException | None = None) -> None:
"""Called when orchestrator shuts down."""
# Terminate all worker processes in exit_on_idle mode
if self.exit_on_idle:
self.terminate_all_workers()
# Update Process record status
if hasattr(self, 'db_process') and self.db_process:
# KeyboardInterrupt is a graceful shutdown, not an error
self.db_process.exit_code = 1 if error and not isinstance(error, KeyboardInterrupt) else 0
self.db_process.status = self.db_process.StatusChoices.EXITED
self.db_process.ended_at = timezone.now()
self.db_process.save()
log_worker_event(
worker_type='Orchestrator',
event='Shutting down',
indent_level=0,
pid=self.pid,
error=error if error and not isinstance(error, KeyboardInterrupt) else None,
)
def get_total_worker_count(self) -> int:
"""Get total count of running workers across all types."""
from archivebox.machine.models import Process
import time
# Throttle cleanup to once every 30 seconds to avoid performance issues
CLEANUP_THROTTLE_SECONDS = 30
now = time.time()
if now - self._last_cleanup_time > CLEANUP_THROTTLE_SECONDS:
Process.cleanup_stale_running()
self._last_cleanup_time = now
return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES)
def get_running_workers_for_type(self, WorkerClass: Type[Worker]) -> int:
"""Get count of running workers for a specific worker type."""
return len(WorkerClass.get_running_workers())
def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool:
"""Determine if we should spawn a new worker."""
if queue_count == 0:
return False
# Get appropriate limit based on worker type
if WorkerClass.name == 'crawl':
max_workers = self.MAX_CRAWL_WORKERS
elif WorkerClass.name == 'binary':
max_workers = self.MAX_BINARY_WORKERS # Force sequential: only 1 binary at a time
else:
max_workers = 1 # Default for unknown types
# Check worker limit
running_workers = WorkerClass.get_running_workers()
running_count = len(running_workers)
if running_count >= max_workers:
return False
# Check if we already have enough workers for the queue size
# Spawn more gradually - don't flood with workers
if running_count > 0 and queue_count <= running_count * WorkerClass.MAX_CONCURRENT_TASKS:
return False
return True
def spawn_worker(self, WorkerClass: Type[Worker]) -> int | None:
"""Spawn a new worker process. Returns PID or None if spawn failed."""
try:
print(f'[yellow]DEBUG: Spawning {WorkerClass.name} worker with crawl_id={self.crawl_id}...[/yellow]')
pid = WorkerClass.start(crawl_id=self.crawl_id)
print(f'[yellow]DEBUG: Spawned {WorkerClass.name} worker with PID={pid}[/yellow]')
# CRITICAL: Block until worker registers itself in Process table
# This prevents race condition where orchestrator spawns multiple workers
# before any of them finish on_startup() and register
from archivebox.machine.models import Process
import time
timeout = 5.0 # seconds to wait for worker registration
poll_interval = 0.1 # check every 100ms
elapsed = 0.0
spawn_time = timezone.now()
while elapsed < timeout:
# Check if worker process is registered with strict criteria:
# 1. Correct PID
# 2. WORKER process type
# 3. RUNNING status
# 4. Parent is this orchestrator
# 5. Started recently (within last 10 seconds)
# Debug: Check all processes with this PID first
if elapsed < 0.5:
all_procs = list(Process.objects.filter(pid=pid))
print(f'[yellow]DEBUG spawn_worker: elapsed={elapsed:.1f}s pid={pid} orchestrator_id={self.db_process.id}[/yellow]')
print(f'[yellow] Found {len(all_procs)} Process records for pid={pid}[/yellow]')
for p in all_procs:
print(f'[yellow] -> type={p.process_type} status={p.status} parent_id={p.parent_id} match={p.parent_id == self.db_process.id}[/yellow]')
worker_process = Process.objects.filter(
pid=pid,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
parent_id=self.db_process.id,
started_at__gte=spawn_time - timedelta(seconds=10),
).first()
if worker_process:
# Worker successfully registered!
print(f'[green]DEBUG spawn_worker: Worker registered! Returning pid={pid}[/green]')
return pid
time.sleep(poll_interval)
elapsed += poll_interval
# Timeout - worker failed to register
log_worker_event(
worker_type='Orchestrator',
event='Worker failed to register in time',
indent_level=0,
pid=self.pid,
metadata={'worker_type': WorkerClass.name, 'worker_pid': pid, 'timeout': timeout},
)
return None
except Exception as e:
log_worker_event(
worker_type='Orchestrator',
event='Failed to spawn worker',
indent_level=0,
pid=self.pid,
metadata={'worker_type': WorkerClass.name},
error=e,
)
return None
def check_queues_and_spawn_workers(self) -> dict[str, int]:
"""
Check Binary and Crawl queues and spawn workers as needed.
Returns dict of queue sizes.
"""
from archivebox.crawls.models import Crawl
from archivebox.machine.models import Binary, Machine
queue_sizes = {}
# Check Binary queue
machine = Machine.current()
binary_queue = Binary.objects.filter(
machine=machine,
status=Binary.StatusChoices.QUEUED,
retry_at__lte=timezone.now()
).order_by('retry_at')
binary_count = binary_queue.count()
queue_sizes['binary'] = binary_count
# Spawn BinaryWorker if needed (one worker per binary, up to MAX_BINARY_WORKERS)
if self.should_spawn_worker(BinaryWorker, binary_count):
# Get next binary to process
binary = binary_queue.first()
if binary:
BinaryWorker.start(binary_id=str(binary.id))
# Check if any BinaryWorkers are still running
running_binary_workers = len(BinaryWorker.get_running_workers())
# Check Crawl queue
crawl_queue = Crawl.objects.filter(
retry_at__lte=timezone.now()
).exclude(
status__in=Crawl.FINAL_STATES
)
# Apply crawl_id filter if set
if self.crawl_id:
crawl_queue = crawl_queue.filter(id=self.crawl_id)
crawl_queue = crawl_queue.order_by('retry_at')
crawl_count = crawl_queue.count()
queue_sizes['crawl'] = crawl_count
# CRITICAL: Only spawn CrawlWorkers if binary queue is empty AND no BinaryWorkers running
# This ensures all binaries are installed before snapshots start processing
if binary_count == 0 and running_binary_workers == 0:
# Spawn CrawlWorker if needed
if self.should_spawn_worker(CrawlWorker, crawl_count):
# Claim next crawl
crawl = crawl_queue.first()
if crawl and self._claim_crawl(crawl):
CrawlWorker.start(crawl_id=str(crawl.id))
return queue_sizes
def _claim_crawl(self, crawl) -> bool:
"""Atomically claim a crawl using optimistic locking."""
from archivebox.crawls.models import Crawl
updated = Crawl.objects.filter(
pk=crawl.pk,
retry_at=crawl.retry_at,
).update(
retry_at=timezone.now() + timedelta(hours=24), # Long lock (crawls take time)
)
return updated == 1
def has_pending_work(self, queue_sizes: dict[str, int]) -> bool:
"""Check if any queue has pending work."""
return any(count > 0 for count in queue_sizes.values())
def has_running_workers(self) -> bool:
"""Check if any workers are still running."""
return self.get_total_worker_count() > 0
def has_future_work(self) -> bool:
"""Check if there's work scheduled for the future (retry_at > now) in Crawl queue."""
from archivebox.crawls.models import Crawl
# Build filter for future work, respecting crawl_id if set
qs = Crawl.objects.filter(
retry_at__gt=timezone.now()
).exclude(
status__in=Crawl.FINAL_STATES
)
# Apply crawl_id filter if set
if self.crawl_id:
qs = qs.filter(id=self.crawl_id)
return qs.count() > 0
def on_tick(self, queue_sizes: dict[str, int]) -> None:
"""Called each orchestrator tick. Override for custom behavior."""
# Tick logging suppressed to reduce noise
pass
def on_idle(self) -> None:
"""Called when orchestrator is idle (no work, no workers)."""
# Idle logging suppressed to reduce noise
pass
def should_exit(self, queue_sizes: dict[str, int]) -> bool:
"""Determine if orchestrator should exit."""
if not self.exit_on_idle:
return False
if self.IDLE_TIMEOUT == 0:
return False
# Don't exit if there's pending or future work
if self.has_pending_work(queue_sizes):
return False
if self.has_running_workers():
return False
if self.has_future_work():
return False
# Exit after idle timeout
return self.idle_count >= self.IDLE_TIMEOUT
def runloop(self) -> None:
"""Main orchestrator loop."""
from rich.live import Live
from archivebox.misc.logging import IS_TTY
from archivebox.misc.progress_layout import ArchiveBoxProgressLayout
import sys
import os
# Enable progress layout only in TTY + foreground mode
show_progress = IS_TTY and self.exit_on_idle
self.on_startup()
if not show_progress:
# No progress layout - just run normally
self._run_orchestrator_loop(None)
else:
# Redirect worker subprocess output to /dev/null
devnull_fd = os.open(os.devnull, os.O_WRONLY)
# Save original stdout/stderr (make 2 copies - one for Console, one for restoring)
original_stdout = sys.stdout.fileno()
original_stderr = sys.stderr.fileno()
stdout_for_console = os.dup(original_stdout)
stdout_for_restore = os.dup(original_stdout)
stderr_for_restore = os.dup(original_stderr)
try:
# Redirect stdout/stderr to /dev/null (workers will inherit this)
os.dup2(devnull_fd, original_stdout)
os.dup2(devnull_fd, original_stderr)
# Create Console using saved stdout (not the redirected one)
from rich.console import Console
import archivebox.misc.logging as logging_module
orchestrator_console = Console(file=os.fdopen(stdout_for_console, 'w'), force_terminal=True)
# Update global CONSOLE so orchestrator logs appear too
original_console = logging_module.CONSOLE
logging_module.CONSOLE = orchestrator_console
# Create layout and run with Live display
progress_layout = ArchiveBoxProgressLayout(crawl_id=self.crawl_id)
with Live(
progress_layout.get_layout(),
refresh_per_second=4,
screen=True,
console=orchestrator_console,
):
self._run_orchestrator_loop(progress_layout)
# Restore original console
logging_module.CONSOLE = original_console
finally:
# Restore stdout/stderr
os.dup2(stdout_for_restore, original_stdout)
os.dup2(stderr_for_restore, original_stderr)
# Cleanup
try:
os.close(devnull_fd)
os.close(stdout_for_restore)
os.close(stderr_for_restore)
except:
pass
# stdout_for_console is closed by orchestrator_console
def _run_orchestrator_loop(self, progress_layout):
"""Run the main orchestrator loop with optional progress display."""
last_queue_sizes = {}
last_snapshot_count = None
tick_count = 0
# Track snapshot progress to detect changes
snapshot_progress = {} # snapshot_id -> (total, completed, current_plugin)
try:
while True:
tick_count += 1
# Check queues and spawn workers
queue_sizes = self.check_queues_and_spawn_workers()
# Get worker counts for each type
worker_counts = {
WorkerClass.name: len(WorkerClass.get_running_workers())
for WorkerClass in self.WORKER_TYPES
}
# Update layout if enabled
if progress_layout:
# Get crawl queue and worker counts
crawl_queue_count = queue_sizes.get('crawl', 0)
crawl_workers_count = worker_counts.get('crawl', 0)
# Determine orchestrator status
if crawl_workers_count > 0:
status = "Working"
elif crawl_queue_count > 0:
status = "Spawning"
else:
status = "Idle"
# Update orchestrator status
progress_layout.update_orchestrator_status(
status=status,
crawl_queue_count=crawl_queue_count,
crawl_workers_count=crawl_workers_count,
max_crawl_workers=self.MAX_CRAWL_WORKERS,
)
# Update CrawlWorker logs by tailing Process stdout/stderr
if crawl_workers_count > 0:
from archivebox.machine.models import Process
crawl_worker_process = Process.objects.filter(
process_type=Process.TypeChoices.WORKER,
worker_type='crawl',
status__in=['running', 'started']
).first()
if crawl_worker_process:
progress_layout.update_crawl_worker_logs(crawl_worker_process)
# Log queue size changes
if queue_sizes != last_queue_sizes:
for worker_type, count in queue_sizes.items():
old_count = last_queue_sizes.get(worker_type, 0)
if count != old_count:
if count > old_count:
progress_layout.log_event(
f"{worker_type.capitalize()} queue: {old_count}{count}",
style="yellow"
)
else:
progress_layout.log_event(
f"{worker_type.capitalize()} queue: {old_count}{count}",
style="green"
)
last_queue_sizes = queue_sizes.copy()
# Update snapshot progress
from archivebox.core.models import Snapshot
# Get all started snapshots (optionally filtered by crawl_id)
snapshot_filter = {'status': 'started'}
if self.crawl_id:
snapshot_filter['crawl_id'] = self.crawl_id
else:
# Only if processing all crawls, filter by recent modified_at to avoid stale snapshots
recent_cutoff = timezone.now() - timedelta(minutes=5)
snapshot_filter['modified_at__gte'] = recent_cutoff
active_snapshots = list(Snapshot.objects.filter(**snapshot_filter))
# Log snapshot count changes and details
if len(active_snapshots) != last_snapshot_count:
if last_snapshot_count is not None:
if len(active_snapshots) > last_snapshot_count:
progress_layout.log_event(
f"Active snapshots: {last_snapshot_count}{len(active_snapshots)}",
style="cyan"
)
# Log which snapshots started
for snapshot in active_snapshots[-1:]: # Just show the newest one
progress_layout.log_event(
f"Started: {snapshot.url[:60]}",
style="green"
)
# Log SnapshotWorker count
from archivebox.machine.models import Process
all_workers = Process.objects.filter(
process_type=Process.TypeChoices.WORKER,
status__in=['running', 'started']
).count()
progress_layout.log_event(
f"Workers running: {all_workers} ({crawl_workers_count} CrawlWorkers)",
style="grey53"
)
else:
progress_layout.log_event(
f"Active snapshots: {last_snapshot_count}{len(active_snapshots)}",
style="blue"
)
last_snapshot_count = len(active_snapshots)
# Track which snapshots are still active
active_ids = set()
for snapshot in active_snapshots:
active_ids.add(snapshot.id)
total = snapshot.archiveresult_set.count()
completed = snapshot.archiveresult_set.filter(
status__in=['succeeded', 'skipped', 'failed']
).count()
# Count hooks by status for debugging
queued = snapshot.archiveresult_set.filter(status='queued').count()
started = snapshot.archiveresult_set.filter(status='started').count()
# Find currently running hook (ordered by hook_name to get lowest step number)
current_ar = snapshot.archiveresult_set.filter(status='started').order_by('hook_name').first()
if not current_ar:
# If nothing running, show next queued item (ordered to get next in sequence)
current_ar = snapshot.archiveresult_set.filter(status='queued').order_by('hook_name').first()
current_plugin = ''
if current_ar:
# Use hook_name if available, otherwise plugin name
hook_name = current_ar.hook_name or current_ar.plugin or ''
# Extract just the hook name without path (e.g., "on_Snapshot__50_wget.py" -> "wget")
if hook_name:
# Clean up the name: remove prefix and extension
clean_name = hook_name.split('__')[-1] if '__' in hook_name else hook_name
clean_name = clean_name.replace('.py', '').replace('.sh', '').replace('.bg', '')
current_plugin = clean_name
elif total == 0:
# Snapshot just started, hooks not created yet
current_plugin = "initializing"
elif queued > 0:
# Hooks created but none started yet
current_plugin = "waiting"
# Update snapshot worker (show even if no hooks yet)
# Debug: Log first time we see this snapshot
if snapshot.id not in progress_layout.snapshot_to_worker:
progress_layout.log_event(
f"Assigning to worker: {snapshot.url[:50]}",
style="grey53"
)
# Track progress changes
prev_progress = snapshot_progress.get(snapshot.id, (0, 0, ''))
curr_progress = (total, completed, current_plugin)
if prev_progress != curr_progress:
prev_total, prev_completed, prev_plugin = prev_progress
# Log hooks created
if total > prev_total:
progress_layout.log_event(
f"Hooks created: {total} for {snapshot.url[:40]}",
style="cyan"
)
# Log hook completion
if completed > prev_completed:
progress_layout.log_event(
f"Hook completed: {completed}/{total} for {snapshot.url[:40]}",
style="green"
)
# Log plugin change
if current_plugin and current_plugin != prev_plugin:
progress_layout.log_event(
f"Running: {current_plugin} ({snapshot.url[:40]})",
style="yellow"
)
snapshot_progress[snapshot.id] = curr_progress
# Debug: Every 10 ticks, log detailed status if stuck at initializing
if tick_count % 10 == 0 and total == 0 and current_plugin == "initializing":
progress_layout.log_event(
f"DEBUG: Snapshot stuck at initializing (status={snapshot.status})",
style="red"
)
progress_layout.update_snapshot_worker(
snapshot_id=snapshot.id,
url=snapshot.url,
total=max(total, 1), # Show at least 1 to avoid division by zero
completed=completed,
current_plugin=current_plugin,
)
# Remove snapshots that are no longer active
for snapshot_id in list(progress_layout.snapshot_to_worker.keys()):
if snapshot_id not in active_ids:
progress_layout.log_event(
f"Snapshot completed/removed",
style="blue"
)
progress_layout.remove_snapshot_worker(snapshot_id)
# Also clean up progress tracking
if snapshot_id in snapshot_progress:
del snapshot_progress[snapshot_id]
# Track idle state
has_pending = self.has_pending_work(queue_sizes)
has_running = self.has_running_workers()
if has_pending or has_running:
self.idle_count = 0
self.on_tick(queue_sizes)
else:
self.idle_count += 1
self.on_idle()
# Check if we should exit
if self.should_exit(queue_sizes):
if progress_layout:
progress_layout.log_event("All work complete", style="green")
log_worker_event(
worker_type='Orchestrator',
event='All work complete',
indent_level=0,
pid=self.pid,
)
break
time.sleep(self.POLL_INTERVAL)
except KeyboardInterrupt:
if progress_layout:
progress_layout.log_event("Interrupted by user", style="red")
print() # Newline after ^C
except BaseException as e:
if progress_layout:
progress_layout.log_event(f"Error: {e}", style="red")
self.on_shutdown(error=e)
raise
else:
self.on_shutdown()
def start(self) -> int:
"""
Fork orchestrator as a background process.
Returns the PID of the new process.
"""
# Use module-level function to avoid pickle errors with local functions
proc = MPProcess(
target=_run_orchestrator_process,
args=(self.exit_on_idle,),
name='orchestrator'
)
proc.start()
assert proc.pid is not None
log_worker_event(
worker_type='Orchestrator',
event='Started in background',
indent_level=0,
pid=proc.pid,
)
return proc.pid
@classmethod
def get_or_start(cls, exit_on_idle: bool = True) -> 'Orchestrator':
"""
Get running orchestrator or start a new one.
Used by commands like 'add' to ensure orchestrator is running.
"""
if cls.is_running():
print('[grey53]👨‍✈️ Orchestrator already running[/grey53]')
# Return a placeholder - actual orchestrator is in another process
return cls(exit_on_idle=exit_on_idle)
orchestrator = cls(exit_on_idle=exit_on_idle)
return orchestrator