unified Process source of truth and better screenshot tests

This commit is contained in:
Nick Sweeting
2026-01-02 04:20:34 -08:00
parent 3672174dad
commit dd77511026
44 changed files with 3369 additions and 1919 deletions

View File

@@ -1,15 +1,13 @@
"""
Orchestrator for managing worker processes.
The Orchestrator polls queues for each model type (Crawl, Snapshot, ArchiveResult)
and lazily spawns worker processes when there is work to be done.
The Orchestrator polls the Crawl queue and spawns CrawlWorkers as needed.
Architecture:
Orchestrator (main loop, polls queues)
── CrawlWorker subprocess(es)
── SnapshotWorker subprocess(es)
└── ArchiveResultWorker subprocess(es)
└── Each worker spawns task subprocesses via CLI
Orchestrator (polls Crawl queue)
── CrawlWorker(s) (one per active Crawl)
── SnapshotWorker(s) (one per Snapshot, up to limit)
└── Hook Processes (sequential, forked by SnapshotWorker)
Usage:
# Default: runs forever (for use as subprocess of server)
@@ -38,7 +36,7 @@ from django.utils import timezone
from rich import print
from archivebox.misc.logging_util import log_worker_event
from .worker import Worker, CrawlWorker, SnapshotWorker, ArchiveResultWorker
from .worker import Worker, CrawlWorker
def _run_orchestrator_process(exit_on_idle: bool) -> None:
@@ -52,22 +50,27 @@ def _run_orchestrator_process(exit_on_idle: bool) -> None:
class Orchestrator:
"""
Manages worker processes by polling queues and spawning workers as needed.
The orchestrator:
1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
2. If items exist and fewer than MAX_CONCURRENT workers are running, spawns workers
1. Polls Crawl queue
2. If crawls exist and fewer than MAX_CRAWL_WORKERS are running, spawns CrawlWorkers
3. Monitors worker health and cleans up stale PIDs
4. Exits when all queues are empty (unless daemon mode)
4. Exits when queue is empty (unless daemon mode)
Architecture:
- Orchestrator spawns CrawlWorkers (one per active Crawl)
- Each CrawlWorker spawns SnapshotWorkers (one per Snapshot, up to limit)
- Each SnapshotWorker runs hooks sequentially for its snapshot
"""
WORKER_TYPES: list[Type[Worker]] = [CrawlWorker, SnapshotWorker, ArchiveResultWorker]
# Only CrawlWorker - SnapshotWorkers are spawned by CrawlWorker subprocess, not by Orchestrator
WORKER_TYPES: list[Type[Worker]] = [CrawlWorker]
# Configuration
POLL_INTERVAL: float = 2.0 # How often to check for new work (seconds)
IDLE_TIMEOUT: int = 3 # Exit after N idle ticks (0 = never exit)
MAX_WORKERS_PER_TYPE: int = 8 # Max workers per model type
MAX_TOTAL_WORKERS: int = 24 # Max workers across all types
MAX_CRAWL_WORKERS: int = 8 # Max crawls processing simultaneously
def __init__(self, exit_on_idle: bool = True, crawl_id: str | None = None):
self.exit_on_idle = exit_on_idle
self.crawl_id = crawl_id # If set, only process work for this crawl
@@ -76,11 +79,9 @@ class Orchestrator:
self.idle_count: int = 0
self._last_cleanup_time: float = 0.0 # For throttling cleanup_stale_running()
# In foreground mode (exit_on_idle=True), limit workers but allow enough
# for crawl progression: 1 CrawlWorker + 1 SnapshotWorker + 1 ArchiveResultWorker
# In foreground mode (exit_on_idle=True), limit to 1 CrawlWorker
if self.exit_on_idle:
self.MAX_WORKERS_PER_TYPE = 1
self.MAX_TOTAL_WORKERS = 3 # Allow one worker of each type to run concurrently
self.MAX_CRAWL_WORKERS = 1
def __repr__(self) -> str:
return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]'
@@ -109,14 +110,18 @@ class Orchestrator:
# Clean up any stale Process records from previous runs
stale_count = Process.cleanup_stale_running()
# Clean up orphaned Chrome processes from previous crashes
chrome_count = Process.cleanup_orphaned_chrome()
# Collect startup metadata
metadata = {
'max_workers_per_type': self.MAX_WORKERS_PER_TYPE,
'max_total_workers': self.MAX_TOTAL_WORKERS,
'max_crawl_workers': self.MAX_CRAWL_WORKERS,
'poll_interval': self.POLL_INTERVAL,
}
if stale_count:
metadata['cleaned_stale_pids'] = stale_count
if chrome_count:
metadata['cleaned_orphaned_chrome'] = chrome_count
log_worker_event(
worker_type='Orchestrator',
@@ -126,8 +131,34 @@ class Orchestrator:
metadata=metadata,
)
def terminate_all_workers(self) -> None:
"""Terminate all running worker processes."""
from archivebox.machine.models import Process
import signal
# Get all running worker processes
running_workers = Process.objects.filter(
process_type=Process.TypeChoices.WORKER,
status__in=['running', 'started']
)
for worker_process in running_workers:
try:
# Send SIGTERM to gracefully terminate the worker
os.kill(worker_process.pid, signal.SIGTERM)
except ProcessLookupError:
# Process already dead
pass
except Exception:
# Ignore other errors during shutdown
pass
def on_shutdown(self, error: BaseException | None = None) -> None:
"""Called when orchestrator shuts down."""
# Terminate all worker processes in exit_on_idle mode
if self.exit_on_idle:
self.terminate_all_workers()
# Update Process record status
if hasattr(self, 'db_process') and self.db_process:
# KeyboardInterrupt is a graceful shutdown, not an error
@@ -163,20 +194,15 @@ class Orchestrator:
return len(WorkerClass.get_running_workers())
def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool:
"""Determine if we should spawn a new worker of the given type."""
"""Determine if we should spawn a new CrawlWorker."""
if queue_count == 0:
return False
# Check per-type limit
# Check CrawlWorker limit
running_workers = WorkerClass.get_running_workers()
running_count = len(running_workers)
if running_count >= self.MAX_WORKERS_PER_TYPE:
return False
# Check total limit
total_workers = self.get_total_worker_count()
if total_workers >= self.MAX_TOTAL_WORKERS:
if running_count >= self.MAX_CRAWL_WORKERS:
return False
# Check if we already have enough workers for the queue size
@@ -190,7 +216,7 @@ class Orchestrator:
"""Spawn a new worker process. Returns PID or None if spawn failed."""
try:
print(f'[yellow]DEBUG: Spawning {WorkerClass.name} worker with crawl_id={self.crawl_id}...[/yellow]')
pid = WorkerClass.start(daemon=False, crawl_id=self.crawl_id)
pid = WorkerClass.start(crawl_id=self.crawl_id)
print(f'[yellow]DEBUG: Spawned {WorkerClass.name} worker with PID={pid}[/yellow]')
# CRITICAL: Block until worker registers itself in Process table
@@ -259,24 +285,49 @@ class Orchestrator:
def check_queues_and_spawn_workers(self) -> dict[str, int]:
"""
Check all queues and spawn workers as needed.
Returns dict of queue sizes by worker type.
Check Crawl queue and spawn CrawlWorkers as needed.
Returns dict of queue sizes.
"""
from archivebox.crawls.models import Crawl
queue_sizes = {}
for WorkerClass in self.WORKER_TYPES:
# Get queue for this worker type
# Need to instantiate worker to get queue (for model access)
worker = WorkerClass(worker_id=-1, crawl_id=self.crawl_id) # temp instance just for queue access
queue = worker.get_queue()
queue_count = queue.count()
queue_sizes[WorkerClass.name] = queue_count
# Only check Crawl queue
crawl_queue = Crawl.objects.filter(
retry_at__lte=timezone.now()
).exclude(
status__in=Crawl.FINAL_STATES
)
# Apply crawl_id filter if set
if self.crawl_id:
crawl_queue = crawl_queue.filter(id=self.crawl_id)
crawl_queue = crawl_queue.order_by('retry_at')
crawl_count = crawl_queue.count()
queue_sizes['crawl'] = crawl_count
# Spawn CrawlWorker if needed
if self.should_spawn_worker(CrawlWorker, crawl_count):
# Claim next crawl
crawl = crawl_queue.first()
if crawl and self._claim_crawl(crawl):
CrawlWorker.start(crawl_id=str(crawl.id))
# Spawn worker if needed
if self.should_spawn_worker(WorkerClass, queue_count):
self.spawn_worker(WorkerClass)
return queue_sizes
def _claim_crawl(self, crawl) -> bool:
"""Atomically claim a crawl using optimistic locking."""
from archivebox.crawls.models import Crawl
updated = Crawl.objects.filter(
pk=crawl.pk,
retry_at=crawl.retry_at,
).update(
retry_at=timezone.now() + timedelta(hours=24), # Long lock (crawls take time)
)
return updated == 1
def has_pending_work(self, queue_sizes: dict[str, int]) -> bool:
"""Check if any queue has pending work."""
@@ -287,30 +338,21 @@ class Orchestrator:
return self.get_total_worker_count() > 0
def has_future_work(self) -> bool:
"""Check if there's work scheduled for the future (retry_at > now)."""
for WorkerClass in self.WORKER_TYPES:
worker = WorkerClass(worker_id=-1, crawl_id=self.crawl_id)
Model = worker.get_model()
"""Check if there's work scheduled for the future (retry_at > now) in Crawl queue."""
from archivebox.crawls.models import Crawl
# Build filter for future work, respecting crawl_id if set
qs = Model.objects.filter(
retry_at__gt=timezone.now()
).exclude(
status__in=Model.FINAL_STATES
)
# Build filter for future work, respecting crawl_id if set
qs = Crawl.objects.filter(
retry_at__gt=timezone.now()
).exclude(
status__in=Crawl.FINAL_STATES
)
# Apply crawl_id filter if set
if self.crawl_id:
if WorkerClass.name == 'crawl':
qs = qs.filter(id=self.crawl_id)
elif WorkerClass.name == 'snapshot':
qs = qs.filter(crawl_id=self.crawl_id)
elif WorkerClass.name == 'archiveresult':
qs = qs.filter(snapshot__crawl_id=self.crawl_id)
# Apply crawl_id filter if set
if self.crawl_id:
qs = qs.filter(id=self.crawl_id)
if qs.count() > 0:
return True
return False
return qs.count() > 0
def on_tick(self, queue_sizes: dict[str, int]) -> None:
"""Called each orchestrator tick. Override for custom behavior."""
@@ -345,20 +387,20 @@ class Orchestrator:
def runloop(self) -> None:
"""Main orchestrator loop."""
from rich.progress import Progress, BarColumn, TextColumn, TaskProgressColumn
from archivebox.misc.logging import IS_TTY, CONSOLE
from rich.live import Live
from archivebox.misc.logging import IS_TTY
from archivebox.misc.progress_layout import ArchiveBoxProgressLayout
import sys
import os
# Enable progress bars only in TTY + foreground mode
# Enable progress layout only in TTY + foreground mode
show_progress = IS_TTY and self.exit_on_idle
self.on_startup()
task_ids = {}
if not show_progress:
# No progress bars - just run normally
self._run_orchestrator_loop(None, task_ids)
# No progress layout - just run normally
self._run_orchestrator_loop(None)
else:
# Redirect worker subprocess output to /dev/null
devnull_fd = os.open(os.devnull, os.O_WRONLY)
@@ -384,14 +426,16 @@ class Orchestrator:
original_console = logging_module.CONSOLE
logging_module.CONSOLE = orchestrator_console
# Now create Progress and run loop (DON'T restore stdout/stderr - workers need /dev/null)
with Progress(
TextColumn("[cyan]{task.description}"),
BarColumn(bar_width=40),
TaskProgressColumn(),
# Create layout and run with Live display
progress_layout = ArchiveBoxProgressLayout(crawl_id=self.crawl_id)
with Live(
progress_layout.get_layout(),
refresh_per_second=4,
screen=True,
console=orchestrator_console,
) as progress:
self._run_orchestrator_loop(progress, task_ids)
):
self._run_orchestrator_loop(progress_layout)
# Restore original console
logging_module.CONSOLE = original_console
@@ -409,22 +453,68 @@ class Orchestrator:
pass
# stdout_for_console is closed by orchestrator_console
def _run_orchestrator_loop(self, progress, task_ids):
def _run_orchestrator_loop(self, progress_layout):
"""Run the main orchestrator loop with optional progress display."""
last_queue_sizes = {}
last_snapshot_count = None
tick_count = 0
# Track snapshot progress to detect changes
snapshot_progress = {} # snapshot_id -> (total, completed, current_plugin)
try:
while True:
tick_count += 1
# Check queues and spawn workers
queue_sizes = self.check_queues_and_spawn_workers()
# Debug queue sizes (only when changed)
if progress and queue_sizes != last_queue_sizes:
progress.console.print(f'[yellow]DEBUG: Queue sizes: {queue_sizes}[/yellow]')
last_queue_sizes = queue_sizes.copy()
# Get worker counts for each type
worker_counts = {
WorkerClass.name: len(WorkerClass.get_running_workers())
for WorkerClass in self.WORKER_TYPES
}
# Update progress bars
if progress:
# Update layout if enabled
if progress_layout:
# Get crawl queue and worker counts
crawl_queue_count = queue_sizes.get('crawl', 0)
crawl_workers_count = worker_counts.get('crawl', 0)
# Determine orchestrator status
if crawl_workers_count > 0:
status = "Working"
elif crawl_queue_count > 0:
status = "Spawning"
else:
status = "Idle"
# Update orchestrator status
progress_layout.update_orchestrator_status(
status=status,
crawl_queue_count=crawl_queue_count,
crawl_workers_count=crawl_workers_count,
max_crawl_workers=self.MAX_CRAWL_WORKERS,
)
# Log queue size changes
if queue_sizes != last_queue_sizes:
for worker_type, count in queue_sizes.items():
old_count = last_queue_sizes.get(worker_type, 0)
if count != old_count:
if count > old_count:
progress_layout.log_event(
f"{worker_type.capitalize()} queue: {old_count}{count}",
style="yellow"
)
else:
progress_layout.log_event(
f"{worker_type.capitalize()} queue: {old_count}{count}",
style="green"
)
last_queue_sizes = queue_sizes.copy()
# Update snapshot progress
from archivebox.core.models import Snapshot
# Get all started snapshots (optionally filtered by crawl_id)
@@ -438,9 +528,36 @@ class Orchestrator:
active_snapshots = list(Snapshot.objects.filter(**snapshot_filter))
# Debug snapshot count (only when changed)
# Log snapshot count changes and details
if len(active_snapshots) != last_snapshot_count:
progress.console.print(f'[yellow]DEBUG: Found {len(active_snapshots)} active snapshots (crawl_id={self.crawl_id})[/yellow]')
if last_snapshot_count is not None:
if len(active_snapshots) > last_snapshot_count:
progress_layout.log_event(
f"Active snapshots: {last_snapshot_count}{len(active_snapshots)}",
style="cyan"
)
# Log which snapshots started
for snapshot in active_snapshots[-1:]: # Just show the newest one
progress_layout.log_event(
f"Started: {snapshot.url[:60]}",
style="green"
)
# Log SnapshotWorker count
from archivebox.machine.models import Process
all_workers = Process.objects.filter(
process_type=Process.TypeChoices.WORKER,
status__in=['running', 'started']
).count()
progress_layout.log_event(
f"Workers running: {all_workers} ({crawl_workers_count} CrawlWorkers)",
style="grey53"
)
else:
progress_layout.log_event(
f"Active snapshots: {last_snapshot_count}{len(active_snapshots)}",
style="blue"
)
last_snapshot_count = len(active_snapshots)
# Track which snapshots are still active
@@ -450,13 +567,14 @@ class Orchestrator:
active_ids.add(snapshot.id)
total = snapshot.archiveresult_set.count()
if total == 0:
continue
completed = snapshot.archiveresult_set.filter(
status__in=['succeeded', 'skipped', 'failed']
).count()
# Count hooks by status for debugging
queued = snapshot.archiveresult_set.filter(status='queued').count()
started = snapshot.archiveresult_set.filter(status='started').count()
# Find currently running hook (ordered by hook_name to get lowest step number)
current_ar = snapshot.archiveresult_set.filter(status='started').order_by('hook_name').first()
if not current_ar:
@@ -472,24 +590,78 @@ class Orchestrator:
# Clean up the name: remove prefix and extension
clean_name = hook_name.split('__')[-1] if '__' in hook_name else hook_name
clean_name = clean_name.replace('.py', '').replace('.sh', '').replace('.bg', '')
current_plugin = f"{clean_name}"
current_plugin = clean_name
elif total == 0:
# Snapshot just started, hooks not created yet
current_plugin = "initializing"
elif queued > 0:
# Hooks created but none started yet
current_plugin = "waiting"
# Build description with URL + current plugin
url = snapshot.url[:50] + '...' if len(snapshot.url) > 50 else snapshot.url
description = f"{url}{current_plugin}"
# Update snapshot worker (show even if no hooks yet)
# Debug: Log first time we see this snapshot
if snapshot.id not in progress_layout.snapshot_to_worker:
progress_layout.log_event(
f"Assigning to worker: {snapshot.url[:50]}",
style="grey53"
)
# Create or update task
if snapshot.id not in task_ids:
task_ids[snapshot.id] = progress.add_task(description, total=total, completed=completed)
else:
# Update both progress and description
progress.update(task_ids[snapshot.id], description=description, completed=completed)
# Track progress changes
prev_progress = snapshot_progress.get(snapshot.id, (0, 0, ''))
curr_progress = (total, completed, current_plugin)
# Remove tasks for snapshots that are no longer active
for snapshot_id in list(task_ids.keys()):
if prev_progress != curr_progress:
prev_total, prev_completed, prev_plugin = prev_progress
# Log hooks created
if total > prev_total:
progress_layout.log_event(
f"Hooks created: {total} for {snapshot.url[:40]}",
style="cyan"
)
# Log hook completion
if completed > prev_completed:
progress_layout.log_event(
f"Hook completed: {completed}/{total} for {snapshot.url[:40]}",
style="green"
)
# Log plugin change
if current_plugin and current_plugin != prev_plugin:
progress_layout.log_event(
f"Running: {current_plugin} ({snapshot.url[:40]})",
style="yellow"
)
snapshot_progress[snapshot.id] = curr_progress
# Debug: Every 10 ticks, log detailed status if stuck at initializing
if tick_count % 10 == 0 and total == 0 and current_plugin == "initializing":
progress_layout.log_event(
f"DEBUG: Snapshot stuck at initializing (status={snapshot.status})",
style="red"
)
progress_layout.update_snapshot_worker(
snapshot_id=snapshot.id,
url=snapshot.url,
total=max(total, 1), # Show at least 1 to avoid division by zero
completed=completed,
current_plugin=current_plugin,
)
# Remove snapshots that are no longer active
for snapshot_id in list(progress_layout.snapshot_to_worker.keys()):
if snapshot_id not in active_ids:
progress.remove_task(task_ids[snapshot_id])
del task_ids[snapshot_id]
progress_layout.log_event(
f"Snapshot completed/removed",
style="blue"
)
progress_layout.remove_snapshot_worker(snapshot_id)
# Also clean up progress tracking
if snapshot_id in snapshot_progress:
del snapshot_progress[snapshot_id]
# Track idle state
has_pending = self.has_pending_work(queue_sizes)
@@ -503,6 +675,8 @@ class Orchestrator:
# Check if we should exit
if self.should_exit(queue_sizes):
if progress_layout:
progress_layout.log_event("All work complete", style="green")
log_worker_event(
worker_type='Orchestrator',
event='All work complete',
@@ -514,8 +688,12 @@ class Orchestrator:
time.sleep(self.POLL_INTERVAL)
except KeyboardInterrupt:
if progress_layout:
progress_layout.log_event("Interrupted by user", style="red")
print() # Newline after ^C
except BaseException as e:
if progress_layout:
progress_layout.log_event(f"Error: {e}", style="red")
self.on_shutdown(error=e)
raise
else:

View File

@@ -34,7 +34,7 @@ CPU_COUNT = cpu_count()
WORKER_TYPES: dict[str, type['Worker']] = {}
def _run_worker(worker_class_name: str, worker_id: int, daemon: bool, **kwargs):
def _run_worker(worker_class_name: str, worker_id: int, **kwargs):
"""
Module-level function to run a worker. Must be at module level for pickling.
"""
@@ -43,16 +43,28 @@ def _run_worker(worker_class_name: str, worker_id: int, daemon: bool, **kwargs):
# Get worker class by name to avoid pickling class objects
worker_cls = WORKER_TYPES[worker_class_name]
worker = worker_cls(worker_id=worker_id, daemon=daemon, **kwargs)
worker = worker_cls(worker_id=worker_id, **kwargs)
worker.runloop()
def _run_snapshot_worker(snapshot_id: str, worker_id: int, **kwargs):
"""
Module-level function to run a SnapshotWorker for a specific snapshot.
Must be at module level for pickling compatibility.
"""
from archivebox.config.django import setup_django
setup_django()
worker = SnapshotWorker(snapshot_id=snapshot_id, worker_id=worker_id, **kwargs)
worker.runloop()
class Worker:
"""
Base worker class that polls a queue and processes items directly.
Base worker class for CrawlWorker and SnapshotWorker.
Each item is processed by calling its state machine tick() method.
Workers exit when idle for too long (unless daemon mode).
Workers are spawned as subprocesses to process crawls and snapshots.
Each worker type has its own custom runloop implementation.
"""
name: ClassVar[str] = 'worker'
@@ -60,16 +72,10 @@ class Worker:
# Configuration (can be overridden by subclasses)
MAX_TICK_TIME: ClassVar[int] = 60
MAX_CONCURRENT_TASKS: ClassVar[int] = 1
POLL_INTERVAL: ClassVar[float] = 0.1 # How often to check for new work (seconds)
IDLE_TIMEOUT: ClassVar[int] = 100 # Exit after N idle iterations (10 sec at 0.1 poll interval)
def __init__(self, worker_id: int = 0, daemon: bool = False, crawl_id: str | None = None, **kwargs: Any):
def __init__(self, worker_id: int = 0, **kwargs: Any):
self.worker_id = worker_id
self.daemon = daemon
self.crawl_id = crawl_id # If set, only process work for this crawl
self.pid: int = os.getpid()
self.pid_file: Path | None = None
self.idle_count: int = 0
def __repr__(self) -> str:
return f'[underline]{self.__class__.__name__}[/underline]\\[id={self.worker_id}, pid={self.pid}]'
@@ -78,55 +84,6 @@ class Worker:
"""Get the Django model class. Subclasses must override this."""
raise NotImplementedError("Subclasses must implement get_model()")
def get_queue(self) -> QuerySet:
"""Get the queue of objects ready for processing."""
Model = self.get_model()
return Model.objects.filter(
retry_at__lte=timezone.now()
).exclude(
status__in=Model.FINAL_STATES
).order_by('retry_at')
def claim_next(self):
"""
Atomically claim the next object from the queue.
Returns the claimed object or None if queue is empty or claim failed.
"""
Model = self.get_model()
queue = self.get_queue()
obj = queue.first()
if obj is None:
return None
# Atomic claim using optimistic locking on retry_at
claimed = Model.objects.filter(
pk=obj.pk,
retry_at=obj.retry_at,
).update(
retry_at=timezone.now() + timedelta(seconds=self.MAX_TICK_TIME)
)
if claimed == 1:
obj.refresh_from_db()
return obj
return None # Someone else claimed it
def process_item(self, obj) -> bool:
"""
Process a single item by calling its state machine tick().
Returns True on success, False on failure.
Subclasses can override for custom processing.
"""
try:
obj.sm.tick()
return True
except Exception as e:
# Error will be logged in runloop's completion event
traceback.print_exc()
return False
def on_startup(self) -> None:
"""Called when worker starts."""
from archivebox.machine.models import Process
@@ -139,7 +96,7 @@ class Worker:
if self.db_process.process_type != Process.TypeChoices.WORKER:
self.db_process.process_type = Process.TypeChoices.WORKER
update_fields.append('process_type')
# Store worker type name (crawl/snapshot/archiveresult) in worker_type field
# Store worker type name (crawl/snapshot) in worker_type field
if not self.db_process.worker_type:
self.db_process.worker_type = self.name
update_fields.append('worker_type')
@@ -148,13 +105,11 @@ class Worker:
# Determine worker type for logging
worker_type_name = self.__class__.__name__
indent_level = 1 # Default for most workers
indent_level = 1 # Default for CrawlWorker
# Adjust indent level based on worker type
# SnapshotWorker gets indent level 2
if 'Snapshot' in worker_type_name:
indent_level = 2
elif 'ArchiveResult' in worker_type_name:
indent_level = 3
log_worker_event(
worker_type=worker_type_name,
@@ -162,10 +117,6 @@ class Worker:
indent_level=indent_level,
pid=self.pid,
worker_id=str(self.worker_id),
metadata={
'max_concurrent': self.MAX_CONCURRENT_TASKS,
'poll_interval': self.POLL_INTERVAL,
},
)
def on_shutdown(self, error: BaseException | None = None) -> None:
@@ -179,12 +130,10 @@ class Worker:
# Determine worker type for logging
worker_type_name = self.__class__.__name__
indent_level = 1
indent_level = 1 # CrawlWorker
if 'Snapshot' in worker_type_name:
indent_level = 2
elif 'ArchiveResult' in worker_type_name:
indent_level = 3
log_worker_event(
worker_type=worker_type_name,
@@ -195,121 +144,157 @@ class Worker:
error=error if error and not isinstance(error, KeyboardInterrupt) else None,
)
def should_exit(self) -> bool:
"""Check if worker should exit due to idle timeout."""
if self.daemon:
return False
def _terminate_background_hooks(
self,
background_processes: dict[str, 'Process'],
worker_type: str,
indent_level: int,
) -> None:
"""
Terminate background hooks in 3 phases (shared logic for Crawl/Snapshot workers).
if self.IDLE_TIMEOUT == 0:
return False
Phase 1: Send SIGTERM to all bg hooks + children in parallel (polite request to wrap up)
Phase 2: Wait for each hook's remaining timeout before SIGKILL
Phase 3: SIGKILL any stragglers that exceeded their timeout
return self.idle_count >= self.IDLE_TIMEOUT
Args:
background_processes: Dict mapping hook name -> Process instance
worker_type: Worker type name for logging (e.g., 'CrawlWorker', 'SnapshotWorker')
indent_level: Logging indent level (1 for Crawl, 2 for Snapshot)
"""
import signal
import time
def runloop(self) -> None:
"""Main worker loop - polls queue, processes items."""
self.on_startup()
if not background_processes:
return
# Determine worker type for logging
worker_type_name = self.__class__.__name__
indent_level = 1
now = time.time()
if 'Snapshot' in worker_type_name:
indent_level = 2
elif 'ArchiveResult' in worker_type_name:
indent_level = 3
# Phase 1: Send SIGTERM to ALL background processes + children in parallel
log_worker_event(
worker_type=worker_type,
event=f'Sending SIGTERM to {len(background_processes)} background hooks (+ children)',
indent_level=indent_level,
pid=self.pid,
)
try:
while True:
# Try to claim and process an item
obj = self.claim_next()
if obj is not None:
self.idle_count = 0
# Build metadata for task start
start_metadata = {}
url = None
if hasattr(obj, 'url'):
# SnapshotWorker
url = str(obj.url) if obj.url else None
elif hasattr(obj, 'snapshot') and hasattr(obj.snapshot, 'url'):
# ArchiveResultWorker
url = str(obj.snapshot.url) if obj.snapshot.url else None
elif hasattr(obj, 'get_urls_list'):
# CrawlWorker
urls = obj.get_urls_list()
url = urls[0] if urls else None
plugin = None
if hasattr(obj, 'plugin'):
# ArchiveResultWorker, Crawl
plugin = obj.plugin
# Build deadline map first (before killing, to get accurate remaining time)
deadlines = {}
for hook_name, process in background_processes.items():
elapsed = now - process.started_at.timestamp()
remaining = max(0, process.timeout - elapsed)
deadline = now + remaining
deadlines[hook_name] = (process, deadline)
# Send SIGTERM to all process trees in parallel (non-blocking)
for hook_name, process in background_processes.items():
try:
# Get chrome children (renderer processes etc) before sending signal
children_pids = process.get_children_pids()
if children_pids:
# Chrome hook with children - kill tree
os.kill(process.pid, signal.SIGTERM)
for child_pid in children_pids:
try:
os.kill(child_pid, signal.SIGTERM)
except ProcessLookupError:
pass
log_worker_event(
worker_type=worker_type_name,
event='Processing',
worker_type=worker_type,
event=f'Sent SIGTERM to {hook_name} + {len(children_pids)} children',
indent_level=indent_level,
pid=self.pid,
worker_id=str(self.worker_id),
url=url,
plugin=plugin,
metadata=start_metadata if start_metadata else None,
)
start_time = time.time()
success = self.process_item(obj)
elapsed = time.time() - start_time
# Build metadata for task completion
complete_metadata = {
'duration': elapsed,
'status': 'success' if success else 'failed',
}
log_worker_event(
worker_type=worker_type_name,
event='Completed' if success else 'Failed',
indent_level=indent_level,
pid=self.pid,
worker_id=str(self.worker_id),
url=url,
plugin=plugin,
metadata=complete_metadata,
)
else:
# No work available - idle logging suppressed
self.idle_count += 1
# No children - normal kill
os.kill(process.pid, signal.SIGTERM)
except ProcessLookupError:
pass # Already dead
except Exception as e:
log_worker_event(
worker_type=worker_type,
event=f'Failed to SIGTERM {hook_name}: {e}',
indent_level=indent_level,
pid=self.pid,
)
# Check if we should exit
if self.should_exit():
# Exit logging suppressed - shutdown will be logged by on_shutdown()
break
# Phase 2: Wait for all processes in parallel, respecting individual timeouts
for hook_name, (process, deadline) in deadlines.items():
remaining = deadline - now
log_worker_event(
worker_type=worker_type,
event=f'Waiting up to {remaining:.1f}s for {hook_name}',
indent_level=indent_level,
pid=self.pid,
)
time.sleep(self.POLL_INTERVAL)
# Poll all processes in parallel using Process.poll()
still_running = set(deadlines.keys())
except KeyboardInterrupt:
pass
except BaseException as e:
self.on_shutdown(error=e)
raise
else:
self.on_shutdown()
while still_running:
time.sleep(0.1)
now = time.time()
for hook_name in list(still_running):
process, deadline = deadlines[hook_name]
# Check if process exited using Process.poll()
exit_code = process.poll()
if exit_code is not None:
# Process exited
still_running.remove(hook_name)
log_worker_event(
worker_type=worker_type,
event=f'{hook_name} exited with code {exit_code}',
indent_level=indent_level,
pid=self.pid,
)
continue
# Check if deadline exceeded
if now >= deadline:
# Timeout exceeded - SIGKILL process tree
try:
# Get children before killing (chrome may have spawned more)
children_pids = process.get_children_pids()
if children_pids:
# Kill children first
for child_pid in children_pids:
try:
os.kill(child_pid, signal.SIGKILL)
except ProcessLookupError:
pass
# Then kill parent
process.kill(signal_num=signal.SIGKILL)
log_worker_event(
worker_type=worker_type,
event=f'⚠ Sent SIGKILL to {hook_name} + {len(children_pids) if children_pids else 0} children (exceeded timeout)',
indent_level=indent_level,
pid=self.pid,
)
except Exception as e:
log_worker_event(
worker_type=worker_type,
event=f'Failed to SIGKILL {hook_name}: {e}',
indent_level=indent_level,
pid=self.pid,
)
still_running.remove(hook_name)
@classmethod
def start(cls, worker_id: int | None = None, daemon: bool = False, **kwargs: Any) -> int:
def start(cls, **kwargs: Any) -> int:
"""
Fork a new worker as a subprocess.
Returns the PID of the new process.
"""
from archivebox.machine.models import Process
if worker_id is None:
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
# Use module-level function for pickling compatibility
proc = MPProcess(
target=_run_worker,
args=(cls.name, worker_id, daemon),
args=(cls.name, worker_id),
kwargs=kwargs,
name=f'{cls.name}_worker_{worker_id}',
)
@@ -356,120 +341,397 @@ class Worker:
class CrawlWorker(Worker):
"""Worker for processing Crawl objects."""
"""
Worker for processing Crawl objects.
Responsibilities:
1. Run on_Crawl__* hooks (e.g., chrome launcher)
2. Create Snapshots from URLs
3. Spawn SnapshotWorkers (up to MAX_SNAPSHOT_WORKERS)
4. Monitor snapshots and seal crawl when all done
"""
name: ClassVar[str] = 'crawl'
MAX_TICK_TIME: ClassVar[int] = 60
MAX_SNAPSHOT_WORKERS: ClassVar[int] = 8 # Per crawl limit
def __init__(self, crawl_id: str, **kwargs: Any):
super().__init__(**kwargs)
self.crawl_id = crawl_id
self.crawl = None
def get_model(self):
from archivebox.crawls.models import Crawl
return Crawl
def get_queue(self) -> QuerySet:
"""Get queue of Crawls ready for processing, optionally filtered by crawl_id."""
qs = super().get_queue()
if self.crawl_id:
qs = qs.filter(id=self.crawl_id)
return qs
def on_startup(self) -> None:
"""Load crawl."""
super().on_startup()
from archivebox.crawls.models import Crawl
self.crawl = Crawl.objects.get(id=self.crawl_id)
def runloop(self) -> None:
"""Run crawl state machine, spawn SnapshotWorkers."""
import sys
self.on_startup()
try:
print(f'[cyan]🔄 CrawlWorker.runloop: Starting tick() for crawl {self.crawl_id}[/cyan]', file=sys.stderr)
# Advance state machine: QUEUED → STARTED (triggers run() via @started.enter)
self.crawl.sm.tick()
self.crawl.refresh_from_db()
print(f'[cyan]🔄 tick() complete, crawl status={self.crawl.status}[/cyan]', file=sys.stderr)
# Now spawn SnapshotWorkers and monitor progress
while True:
# Check if crawl is done
if self._is_crawl_finished():
print(f'[cyan]🔄 Crawl finished, sealing...[/cyan]', file=sys.stderr)
self.crawl.sm.seal()
break
# Spawn workers for queued snapshots
self._spawn_snapshot_workers()
time.sleep(2) # Check every 2s
finally:
self.on_shutdown()
def _spawn_snapshot_workers(self) -> None:
"""Spawn SnapshotWorkers for queued snapshots (up to limit)."""
from archivebox.core.models import Snapshot
from archivebox.machine.models import Process
# Count running SnapshotWorkers for this crawl
running_count = Process.objects.filter(
process_type=Process.TypeChoices.WORKER,
worker_type='snapshot',
parent_id=self.db_process.id, # Children of this CrawlWorker
status__in=['running', 'started'],
).count()
if running_count >= self.MAX_SNAPSHOT_WORKERS:
return # At limit
# Get queued snapshots for this crawl (SnapshotWorker will mark as STARTED in on_startup)
queued_snapshots = Snapshot.objects.filter(
crawl_id=self.crawl_id,
status=Snapshot.StatusChoices.QUEUED,
).order_by('created_at')[:self.MAX_SNAPSHOT_WORKERS - running_count]
import sys
print(f'[yellow]🔧 _spawn_snapshot_workers: running={running_count}/{self.MAX_SNAPSHOT_WORKERS}, queued={queued_snapshots.count()}[/yellow]', file=sys.stderr)
# Spawn workers
for snapshot in queued_snapshots:
print(f'[yellow]🔧 Spawning worker for {snapshot.url} (status={snapshot.status})[/yellow]', file=sys.stderr)
SnapshotWorker.start(snapshot_id=str(snapshot.id))
log_worker_event(
worker_type='CrawlWorker',
event=f'Spawned SnapshotWorker for {snapshot.url}',
indent_level=1,
pid=self.pid,
)
def _is_crawl_finished(self) -> bool:
"""Check if all snapshots are sealed."""
from archivebox.core.models import Snapshot
pending = Snapshot.objects.filter(
crawl_id=self.crawl_id,
status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED],
).count()
return pending == 0
def on_shutdown(self, error: BaseException | None = None) -> None:
"""
Terminate all background Crawl hooks when crawl finishes.
Background hooks (e.g., chrome launcher) should only be killed when:
- All snapshots are done (crawl is sealed)
- Worker is shutting down
"""
from archivebox.machine.models import Process
# Query for all running hook processes that are children of this CrawlWorker
background_hooks = Process.objects.filter(
parent_id=self.db_process.id,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
).select_related('machine')
# Build dict for shared termination logic
background_processes = {
hook.cmd[0] if hook.cmd else f'hook-{hook.pid}': hook
for hook in background_hooks
}
# Use shared termination logic from Worker base class
self._terminate_background_hooks(
background_processes=background_processes,
worker_type='CrawlWorker',
indent_level=1,
)
super().on_shutdown(error)
class SnapshotWorker(Worker):
"""Worker for processing Snapshot objects."""
"""
Worker that owns sequential hook execution for ONE snapshot.
Unlike other workers, SnapshotWorker doesn't poll a queue - it's given
a specific snapshot_id and runs all hooks for that snapshot sequentially.
Execution flow:
1. Mark snapshot as STARTED
2. Discover hooks for snapshot
3. For each hook (sorted by name):
a. Fork hook Process
b. If foreground: wait for completion
c. If background: track but continue to next hook
d. Update ArchiveResult status
e. Advance current_step when all step's hooks complete
4. When all hooks done: seal snapshot
5. On shutdown: SIGTERM all background hooks
"""
name: ClassVar[str] = 'snapshot'
MAX_TICK_TIME: ClassVar[int] = 60
def __init__(self, snapshot_id: str, **kwargs: Any):
super().__init__(**kwargs)
self.snapshot_id = snapshot_id
self.snapshot = None
self.background_processes: dict[str, Any] = {} # hook_name -> Process
def get_model(self):
"""Not used - SnapshotWorker doesn't poll queues."""
from archivebox.core.models import Snapshot
return Snapshot
def get_queue(self) -> QuerySet:
"""Get queue of Snapshots ready for processing, optionally filtered by crawl_id."""
qs = super().get_queue()
if self.crawl_id:
qs = qs.filter(crawl_id=self.crawl_id)
return qs
def on_startup(self) -> None:
"""Load snapshot and mark as STARTED."""
super().on_startup()
from archivebox.core.models import Snapshot
self.snapshot = Snapshot.objects.get(id=self.snapshot_id)
class ArchiveResultWorker(Worker):
"""Worker for processing ArchiveResult objects."""
# Mark snapshot as STARTED
self.snapshot.status = Snapshot.StatusChoices.STARTED
self.snapshot.retry_at = None # No more polling needed
self.snapshot.save(update_fields=['status', 'retry_at', 'modified_at'])
name: ClassVar[str] = 'archiveresult'
MAX_TICK_TIME: ClassVar[int] = 120
def __init__(self, plugin: str | None = None, **kwargs: Any):
super().__init__(**kwargs)
self.plugin = plugin
def get_model(self):
def runloop(self) -> None:
"""Execute all hooks sequentially."""
from archivebox.hooks import discover_hooks, is_background_hook, extract_step
from archivebox.core.models import ArchiveResult
return ArchiveResult
def get_queue(self) -> QuerySet:
"""
Get queue of ArchiveResults ready for processing.
self.on_startup()
Uses step-based filtering: only claims ARs where hook step <= snapshot.current_step.
This ensures hooks execute in order (step 0 → 1 → 2 ... → 9).
"""
from archivebox.core.models import ArchiveResult
from archivebox.hooks import extract_step
qs = super().get_queue()
if self.crawl_id:
qs = qs.filter(snapshot__crawl_id=self.crawl_id)
if self.plugin:
qs = qs.filter(plugin=self.plugin)
# Step-based filtering: only process ARs whose step <= snapshot.current_step
# Since step is derived from hook_name, we filter in Python after initial query
# This is efficient because the base query already filters by retry_at and status
# Get candidate ARs
candidates = list(qs[:50]) # Limit to avoid loading too many
ready_pks = []
for ar in candidates:
if not ar.hook_name:
# Legacy ARs without hook_name - process them
ready_pks.append(ar.pk)
continue
ar_step = extract_step(ar.hook_name)
snapshot_step = ar.snapshot.current_step
if ar_step <= snapshot_step:
ready_pks.append(ar.pk)
# Return filtered queryset ordered by hook_name (so earlier hooks run first within a step)
return ArchiveResult.objects.filter(pk__in=ready_pks).order_by('hook_name', 'retry_at')
def process_item(self, obj) -> bool:
"""Process an ArchiveResult by running its plugin."""
try:
obj.sm.tick()
return True
# Discover all hooks for this snapshot
hooks = discover_hooks('Snapshot', config=self.snapshot.config)
hooks = sorted(hooks, key=lambda h: h.name) # Sort by name (includes step prefix)
# Execute each hook sequentially
for hook_path in hooks:
hook_name = hook_path.name
plugin = self._extract_plugin_name(hook_name)
hook_step = extract_step(hook_name)
is_background = is_background_hook(hook_name)
# Create ArchiveResult for THIS HOOK (not per plugin)
# One plugin can have multiple hooks (e.g., chrome/on_Snapshot__20_launch_chrome.js, chrome/on_Snapshot__21_navigate_chrome.js)
# Unique key = (snapshot, plugin, hook_name) for idempotency
ar, created = ArchiveResult.objects.get_or_create(
snapshot=self.snapshot,
plugin=plugin,
hook_name=hook_name,
defaults={
'status': ArchiveResult.StatusChoices.STARTED,
'start_ts': timezone.now(),
}
)
if not created:
# Update existing AR to STARTED
ar.status = ArchiveResult.StatusChoices.STARTED
ar.start_ts = timezone.now()
ar.save(update_fields=['status', 'start_ts', 'modified_at'])
# Fork and run the hook
process = self._run_hook(hook_path, ar)
if is_background:
# Track but don't wait
self.background_processes[hook_name] = process
log_worker_event(
worker_type='SnapshotWorker',
event=f'Started background hook: {hook_name} (timeout={process.timeout}s)',
indent_level=2,
pid=self.pid,
)
else:
# Wait for foreground hook to complete
self._wait_for_hook(process, ar)
log_worker_event(
worker_type='SnapshotWorker',
event=f'Completed hook: {hook_name}',
indent_level=2,
pid=self.pid,
)
# Check if we can advance to next step
self._try_advance_step()
# All hooks launched (or completed) - cleanup and seal
self._cleanup_empty_archiveresults()
self.snapshot.status = Snapshot.StatusChoices.SEALED
self.snapshot.save(update_fields=['status', 'modified_at'])
except Exception as e:
# Error will be logged in runloop's completion event
traceback.print_exc()
return False
# Mark snapshot as failed
self.snapshot.status = Snapshot.StatusChoices.SEALED # Still seal on error
self.snapshot.save(update_fields=['status', 'modified_at'])
raise
finally:
self.on_shutdown()
def _run_hook(self, hook_path: Path, ar: Any) -> Any:
"""Fork and run a hook using Process model, return Process."""
from archivebox.hooks import run_hook
# Create output directory
output_dir = ar.create_output_dir()
# Run hook using Process.launch() - returns Process model directly
# Pass self.db_process as parent to track SnapshotWorker -> Hook hierarchy
process = run_hook(
script=hook_path,
output_dir=output_dir,
config=self.snapshot.config,
timeout=120,
parent=self.db_process,
url=str(self.snapshot.url),
snapshot_id=str(self.snapshot.id),
)
# Link ArchiveResult to Process for tracking
ar.process = process
ar.save(update_fields=['process_id', 'modified_at'])
return process
def _wait_for_hook(self, process: Any, ar: Any) -> None:
"""Wait for hook using Process.wait(), update AR status."""
# Use Process.wait() helper instead of manual polling
try:
exit_code = process.wait(timeout=process.timeout)
except TimeoutError:
# Hook exceeded timeout - kill it
process.kill(signal_num=9)
exit_code = -1
# Update ArchiveResult from hook output
ar.update_from_output()
ar.end_ts = timezone.now()
# Determine final status from hook exit code
if exit_code == 0:
ar.status = ar.StatusChoices.SUCCEEDED
else:
ar.status = ar.StatusChoices.FAILED
ar.save(update_fields=['status', 'end_ts', 'modified_at'])
def _try_advance_step(self) -> None:
"""Advance current_step if all foreground hooks in current step are done."""
from django.db.models import Q
from archivebox.core.models import ArchiveResult
current_step = self.snapshot.current_step
# Single query: foreground hooks in current step that aren't finished
# Foreground hooks: hook_name doesn't contain '.bg.'
pending_foreground = self.snapshot.archiveresult_set.filter(
Q(hook_name__contains=f'__{current_step}_') & # Current step
~Q(hook_name__contains='.bg.') & # Not background
~Q(status__in=ArchiveResult.FINAL_STATES) # Not finished
).exists()
if pending_foreground:
return # Still waiting for hooks
# All foreground hooks done - advance!
self.snapshot.current_step += 1
self.snapshot.save(update_fields=['current_step', 'modified_at'])
log_worker_event(
worker_type='SnapshotWorker',
event=f'Advanced to step {self.snapshot.current_step}',
indent_level=2,
pid=self.pid,
)
def _cleanup_empty_archiveresults(self) -> None:
"""Delete ArchiveResults that produced no output files."""
empty_ars = self.snapshot.archiveresult_set.filter(
output_files={} # No output files
).filter(
status__in=self.snapshot.archiveresult_set.model.FINAL_STATES # Only delete finished ones
)
deleted_count = empty_ars.count()
if deleted_count > 0:
empty_ars.delete()
log_worker_event(
worker_type='SnapshotWorker',
event=f'Deleted {deleted_count} empty ArchiveResults',
indent_level=2,
pid=self.pid,
)
def on_shutdown(self, error: BaseException | None = None) -> None:
"""
Terminate all background Snapshot hooks when snapshot finishes.
Background hooks should only be killed when:
- All foreground hooks are done (snapshot is sealed)
- Worker is shutting down
"""
# Use shared termination logic from Worker base class
self._terminate_background_hooks(
background_processes=self.background_processes,
worker_type='SnapshotWorker',
indent_level=2,
)
super().on_shutdown(error)
@staticmethod
def _extract_plugin_name(hook_name: str) -> str:
"""Extract plugin name from hook filename."""
# on_Snapshot__50_wget.py -> wget
name = hook_name.split('__')[-1] # Get part after last __
name = name.replace('.py', '').replace('.js', '').replace('.sh', '')
name = name.replace('.bg', '') # Remove .bg suffix
return name
@classmethod
def start(cls, worker_id: int | None = None, daemon: bool = False, plugin: str | None = None, **kwargs: Any) -> int:
"""Fork a new worker as subprocess with optional plugin filter."""
def start(cls, snapshot_id: str, **kwargs: Any) -> int:
"""Fork a SnapshotWorker for a specific snapshot."""
from archivebox.machine.models import Process
if worker_id is None:
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
# Use module-level function for pickling compatibility
proc = MPProcess(
target=_run_worker,
args=(cls.name, worker_id, daemon),
kwargs={'plugin': plugin, **kwargs},
name=f'{cls.name}_worker_{worker_id}',
target=_run_snapshot_worker, # New module-level function
args=(snapshot_id, worker_id),
kwargs=kwargs,
name=f'snapshot_worker_{snapshot_id[:8]}',
)
proc.start()
@@ -481,7 +743,6 @@ class ArchiveResultWorker(Worker):
WORKER_TYPES.update({
'crawl': CrawlWorker,
'snapshot': SnapshotWorker,
'archiveresult': ArchiveResultWorker,
})