simplify entrypoints for orchestrator and workers

This commit is contained in:
Nick Sweeting
2026-01-04 13:17:07 -08:00
parent 5449971777
commit 839ae744cf
13 changed files with 301 additions and 350 deletions

View File

@@ -15,6 +15,12 @@ import os
import sys
from pathlib import Path
# Force unbuffered output for real-time logs
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
os.environ['PYTHONUNBUFFERED'] = '1'
ASCII_LOGO = """
█████╗ ██████╗ ██████╗██╗ ██╗██╗██╗ ██╗███████╗ ██████╗ ██████╗ ██╗ ██╗
██╔══██╗██╔══██╗██╔════╝██║ ██║██║██║ ██║██╔════╝ ██╔══██╗██╔═══██╗╚██╗██╔╝

View File

@@ -53,8 +53,6 @@ class ArchiveBoxGroup(click.Group):
'manage': 'archivebox.cli.archivebox_manage.main',
# Introspection commands
'pluginmap': 'archivebox.cli.archivebox_pluginmap.main',
# Worker command
'worker': 'archivebox.cli.archivebox_worker.main',
}
all_subcommands = {
**meta_commands,

View File

@@ -127,10 +127,11 @@ def add(urls: str | list[str],
# Background mode: just queue work and return (orchestrator via server will pick it up)
print('[yellow]\\[*] URLs queued. Orchestrator will process them (run `archivebox server` if not already running).[/yellow]')
else:
# Foreground mode: run orchestrator inline until all work is done
print(f'[green]\\[*] Starting orchestrator to process crawl...[/green]')
orchestrator = Orchestrator(exit_on_idle=True, crawl_id=str(crawl.id))
orchestrator.runloop() # Block until complete
# Foreground mode: run CrawlWorker inline until all work is done
print(f'[green]\\[*] Starting worker to process crawl...[/green]')
from archivebox.workers.worker import CrawlWorker
worker = CrawlWorker(crawl_id=str(crawl.id), worker_id=0)
worker.runloop() # Block until complete
# 6. Return the list of Snapshots in this crawl
return crawl.snapshot_set.all()

View File

@@ -1,67 +0,0 @@
#!/usr/bin/env python3
"""
archivebox orchestrator [--daemon]
Start the orchestrator process that manages workers.
The orchestrator polls queues for each model type (Crawl, Snapshot, ArchiveResult)
and lazily spawns worker processes when there is work to be done.
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox orchestrator'
import sys
import rich_click as click
from archivebox.misc.util import docstring
def orchestrator(daemon: bool = False, watch: bool = False) -> int:
"""
Start the orchestrator process.
The orchestrator:
1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
2. Spawns worker processes when there is work to do
3. Monitors worker health and restarts failed workers
4. Exits when all queues are empty (unless --daemon)
Args:
daemon: Run forever (don't exit when idle)
watch: Just watch the queues without spawning workers (for debugging)
Exit codes:
0: All work completed successfully
1: Error occurred
"""
from archivebox.workers.orchestrator import Orchestrator
if Orchestrator.is_running():
print('[yellow]Orchestrator is already running[/yellow]')
return 0
try:
orchestrator_instance = Orchestrator(exit_on_idle=not daemon)
orchestrator_instance.runloop()
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
print(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
@click.command()
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
@click.option('--watch', '-w', is_flag=True, help="Watch queues without spawning workers")
@docstring(orchestrator.__doc__)
def main(daemon: bool, watch: bool):
"""Start the ArchiveBox orchestrator process"""
sys.exit(orchestrator(daemon=daemon, watch=watch))
if __name__ == '__main__':
main()

View File

@@ -1,16 +1,18 @@
#!/usr/bin/env python3
"""
archivebox run [--daemon]
archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...]
Unified command for processing queued work.
Modes:
- With stdin JSONL: Process piped records, exit when complete
- Without stdin (TTY): Run orchestrator in foreground until killed
- --crawl-id: Run orchestrator for specific crawl only
- --snapshot-id: Run worker for specific snapshot only (internal use)
Examples:
# Run orchestrator in foreground (replaces `archivebox orchestrator`)
# Run orchestrator in foreground
archivebox run
# Run as daemon (don't exit on idle)
@@ -23,6 +25,12 @@ Examples:
# Mixed types work too
cat mixed_records.jsonl | archivebox run
# Run orchestrator for specific crawl (shows live progress for that crawl)
archivebox run --crawl-id=019b7e90-04d0-73ed-adec-aad9cfcd863e
# Run worker for specific snapshot (internal use by orchestrator)
archivebox run --snapshot-id=019b7e90-5a8e-712c-9877-2c70eebe80ad
"""
__package__ = 'archivebox.cli'
@@ -187,15 +195,62 @@ def run_orchestrator(daemon: bool = False) -> int:
return 1
def run_snapshot_worker(snapshot_id: str) -> int:
"""
Run a SnapshotWorker for a specific snapshot.
Args:
snapshot_id: Snapshot UUID to process
Returns exit code (0 = success, 1 = error).
"""
from archivebox.workers.worker import _run_snapshot_worker
try:
_run_snapshot_worker(snapshot_id=snapshot_id, worker_id=0)
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
return 1
@click.command()
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
def main(daemon: bool):
@click.option('--crawl-id', help="Run orchestrator for specific crawl only")
@click.option('--snapshot-id', help="Run worker for specific snapshot only")
def main(daemon: bool, crawl_id: str, snapshot_id: str):
"""
Process queued work.
When stdin is piped: Process those specific records and exit.
When run standalone: Run orchestrator in foreground.
Modes:
- No args + stdin piped: Process piped JSONL records
- No args + TTY: Run orchestrator for all work
- --crawl-id: Run orchestrator for that crawl only
- --snapshot-id: Run worker for that snapshot only
"""
# Snapshot worker mode
if snapshot_id:
sys.exit(run_snapshot_worker(snapshot_id))
# Crawl worker mode
if crawl_id:
from archivebox.workers.worker import CrawlWorker
try:
worker = CrawlWorker(crawl_id=crawl_id, worker_id=0)
worker.runloop()
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
# Check if stdin has data (non-TTY means piped input)
if not sys.stdin.isatty():
sys.exit(process_stdin_records())

View File

@@ -1,50 +0,0 @@
#!/usr/bin/env python3
__package__ = 'archivebox.cli'
__command__ = 'archivebox worker'
import sys
import rich_click as click
from archivebox.misc.util import docstring
def worker(worker_type: str, daemon: bool = False, plugin: str | None = None):
"""
Start a worker process to process items from the queue.
Worker types:
- crawl: Process Crawl objects (parse seeds, create snapshots)
- snapshot: Process Snapshot objects (create archive results)
- archiveresult: Process ArchiveResult objects (run plugins)
Workers poll the database for queued items, claim them atomically,
and spawn subprocess tasks to handle each item.
"""
from archivebox.workers.worker import get_worker_class
WorkerClass = get_worker_class(worker_type)
# Build kwargs
kwargs = {'daemon': daemon}
if plugin and worker_type == 'archiveresult':
kwargs['extractor'] = plugin # internal field still called extractor
# Create and run worker
worker_instance = WorkerClass(**kwargs)
worker_instance.runloop()
@click.command()
@click.argument('worker_type', type=click.Choice(['crawl', 'snapshot', 'archiveresult']))
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
@click.option('--plugin', '-p', default=None, help='Filter by plugin (archiveresult only)')
@docstring(worker.__doc__)
def main(worker_type: str, daemon: bool, plugin: str | None):
"""Start an ArchiveBox worker process"""
worker(worker_type, daemon=daemon, plugin=plugin)
if __name__ == '__main__':
main()

View File

@@ -1456,27 +1456,6 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
empty_ars.delete()
print(f'[yellow]🗑️ Deleted {deleted_count} empty ArchiveResults for {self.url}[/yellow]')
def has_running_background_hooks(self) -> bool:
"""
Check if any ArchiveResult background hooks are still running.
Used by state machine to determine if snapshot is finished.
"""
from archivebox.misc.process_utils import validate_pid_file
if not self.OUTPUT_DIR.exists():
return False
for plugin_dir in self.OUTPUT_DIR.iterdir():
if not plugin_dir.is_dir():
continue
pid_file = plugin_dir / 'hook.pid'
cmd_file = plugin_dir / 'cmd.sh'
if validate_pid_file(pid_file, cmd_file):
return True
return False
def to_json(self) -> dict:
"""
Convert Snapshot model instance to a JSON-serializable dict.

View File

@@ -449,17 +449,27 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
def cleanup(self):
"""Clean up background hooks and run on_CrawlEnd hooks."""
from archivebox.hooks import run_hook, discover_hooks
from archivebox.misc.process_utils import safe_kill_process
from archivebox.machine.models import Process
# Kill any background processes by scanning for all .pid files
# Kill any background Crawl hooks using Process records
# Find all running hook Processes that are children of this crawl's workers
# (CrawlWorker already kills its hooks via on_shutdown, but this is backup for orphans)
running_hooks = Process.objects.filter(
parent__worker_type='crawl',
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
).distinct()
for process in running_hooks:
# Use Process.kill_tree() to gracefully kill parent + children
killed_count = process.kill_tree(graceful_timeout=2.0)
if killed_count > 0:
print(f'[yellow]🔪 Killed {killed_count} orphaned crawl hook process(es)[/yellow]')
# Clean up .pid files from output directory
if self.OUTPUT_DIR.exists():
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
cmd_file = pid_file.parent / 'cmd.sh'
# safe_kill_process now waits for termination and escalates to SIGKILL
# Returns True only if process is confirmed dead
killed = safe_kill_process(pid_file, cmd_file)
if killed:
pid_file.unlink(missing_ok=True)
pid_file.unlink(missing_ok=True)
# Run on_CrawlEnd hooks
from archivebox.config.configset import get_config
@@ -472,7 +482,7 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
output_dir = self.OUTPUT_DIR / plugin_name
output_dir.mkdir(parents=True, exist_ok=True)
result = run_hook(
process = run_hook(
hook,
output_dir=output_dir,
config=config,
@@ -481,7 +491,7 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
)
# Log failures but don't block
if result and result['returncode'] != 0:
if process.exit_code != 0:
print(f'[yellow]⚠️ CrawlEnd hook failed: {hook.name}[/yellow]')

View File

@@ -0,0 +1,18 @@
# Generated by Django 6.0 on 2026-01-03 06:58
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('machine', '0009_alter_binary_status'),
]
operations = [
migrations.AlterField(
model_name='process',
name='process_type',
field=models.CharField(choices=[('supervisord', 'Supervisord'), ('orchestrator', 'Orchestrator'), ('worker', 'Worker'), ('cli', 'CLI'), ('hook', 'Hook'), ('binary', 'Binary')], db_index=True, default='cli', help_text='Type of process (cli, worker, orchestrator, binary, supervisord)', max_length=16),
),
]

View File

@@ -499,20 +499,25 @@ class Binary(ModelWithHealthStats):
since installations are foreground, but included for consistency).
"""
from pathlib import Path
from archivebox.misc.process_utils import safe_kill_process
# Kill any background binary installation hooks using Process records
# (rarely used since binary installations are typically foreground)
running_hooks = Process.objects.filter(
binary=self,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
)
for process in running_hooks:
killed_count = process.kill_tree(graceful_timeout=2.0)
if killed_count > 0:
print(f'[yellow]🔪 Killed {killed_count} binary installation hook process(es)[/yellow]')
# Clean up .pid files from output directory
output_dir = self.OUTPUT_DIR
if not output_dir.exists():
return
# Kill any background hooks
for plugin_dir in output_dir.iterdir():
if not plugin_dir.is_dir():
continue
pid_file = plugin_dir / 'hook.pid'
cmd_file = plugin_dir / 'cmd.sh'
safe_kill_process(pid_file, cmd_file)
if output_dir.exists():
for pid_file in output_dir.glob('**/*.pid'):
pid_file.unlink(missing_ok=True)
def symlink_to_lib_bin(self, lib_bin_dir: str | Path) -> Path | None:
"""
@@ -1273,32 +1278,61 @@ class Process(models.Model):
def _write_pid_file(self) -> None:
"""Write PID file with mtime set to process start time."""
from archivebox.misc.process_utils import write_pid_file_with_mtime
if self.pid and self.started_at and self.pid_file:
write_pid_file_with_mtime(
self.pid_file,
self.pid,
self.started_at.timestamp()
)
# Write PID to file
self.pid_file.write_text(str(self.pid))
# Set mtime to process start time for validation
try:
start_time = self.started_at.timestamp()
os.utime(self.pid_file, (start_time, start_time))
except OSError:
pass # mtime optional, validation degrades gracefully
def _write_cmd_file(self) -> None:
"""Write cmd.sh script for debugging/validation."""
from archivebox.misc.process_utils import write_cmd_file
if self.cmd and self.cmd_file:
write_cmd_file(self.cmd_file, self.cmd)
# Escape shell arguments (quote if contains space, ", or $)
def escape(arg: str) -> str:
return f'"{arg.replace(chr(34), chr(92)+chr(34))}"' if any(c in arg for c in ' "$') else arg
# Write executable shell script
script = '#!/bin/bash\n' + ' '.join(escape(arg) for arg in self.cmd) + '\n'
self.cmd_file.write_text(script)
try:
self.cmd_file.chmod(0o755)
except OSError:
pass
def _build_env(self) -> dict:
"""Build environment dict for subprocess, merging stored env with system."""
import json
env = os.environ.copy()
env.update(self.env or {})
# Convert all values to strings for subprocess.Popen
if self.env:
for key, value in self.env.items():
if value is None:
continue
elif isinstance(value, str):
env[key] = value # Already a string, use as-is
elif isinstance(value, bool):
env[key] = 'True' if value else 'False'
elif isinstance(value, (int, float)):
env[key] = str(value)
else:
# Lists, dicts, etc. - serialize to JSON
env[key] = json.dumps(value, default=str)
return env
def launch(self, background: bool = False) -> 'Process':
def launch(self, background: bool = False, cwd: str | None = None) -> 'Process':
"""
Spawn the subprocess and update this Process record.
Args:
background: If True, don't wait for completion (for daemons/bg hooks)
cwd: Working directory for the subprocess (defaults to self.pwd)
Returns:
self (updated with pid, started_at, etc.)
@@ -1310,6 +1344,9 @@ class Process(models.Model):
if not self.pwd:
raise ValueError("Process.pwd must be set before calling launch()")
# Use provided cwd or default to pwd
working_dir = cwd or self.pwd
# Ensure output directory exists
Path(self.pwd).mkdir(parents=True, exist_ok=True)
@@ -1322,7 +1359,7 @@ class Process(models.Model):
with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err:
proc = subprocess.Popen(
self.cmd,
cwd=self.pwd,
cwd=working_dir,
stdout=out,
stderr=err,
env=self._build_env(),

View File

@@ -1,123 +0,0 @@
"""
Process validation using psutil and filesystem mtime.
Uses mtime as a "password": PID files are timestamped with process start time.
Since filesystem mtimes can be set arbitrarily but process start times cannot,
comparing them detects PID reuse.
"""
__package__ = 'archivebox.misc'
import os
import time
from pathlib import Path
from typing import Optional
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
def validate_pid_file(pid_file: Path, cmd_file: Optional[Path] = None, tolerance: float = 5.0) -> bool:
"""Validate PID using mtime and optional cmd.sh. Returns True if process is ours."""
if not PSUTIL_AVAILABLE or not pid_file.exists():
return False
try:
pid = int(pid_file.read_text().strip())
proc = psutil.Process(pid)
# Check mtime matches process start time
if abs(pid_file.stat().st_mtime - proc.create_time()) > tolerance:
return False # PID reused
# Validate command if provided
if cmd_file and cmd_file.exists():
cmd = cmd_file.read_text()
cmdline = ' '.join(proc.cmdline())
if '--remote-debugging-port' in cmd and '--remote-debugging-port' not in cmdline:
return False
if ('chrome' in cmd.lower() or 'chromium' in cmd.lower()):
if 'chrome' not in proc.name().lower() and 'chromium' not in proc.name().lower():
return False
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, ValueError, OSError):
return False
def write_pid_file_with_mtime(pid_file: Path, pid: int, start_time: float):
"""Write PID file and set mtime to process start time."""
pid_file.write_text(str(pid))
try:
os.utime(pid_file, (start_time, start_time))
except OSError:
pass # mtime optional, validation degrades gracefully
def write_cmd_file(cmd_file: Path, cmd: list[str]):
"""Write shell command script."""
def escape(arg: str) -> str:
return f'"{arg.replace(chr(34), chr(92)+chr(34))}"' if any(c in arg for c in ' "$') else arg
script = '#!/bin/bash\n' + ' '.join(escape(arg) for arg in cmd) + '\n'
cmd_file.write_text(script)
try:
cmd_file.chmod(0o755)
except OSError:
pass
def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15, timeout: float = 3.0) -> bool:
"""
Kill process after validation, with graceful wait and SIGKILL escalation.
Returns True only if process is confirmed dead (either already dead or killed successfully).
"""
import time
import signal
if not validate_pid_file(pid_file, cmd_file):
pid_file.unlink(missing_ok=True) # Clean stale file
return True # Process already dead, consider it killed
try:
pid = int(pid_file.read_text().strip())
# Send initial signal (SIGTERM by default)
try:
os.kill(pid, signal_num)
except ProcessLookupError:
# Process already dead
return True
# Wait for process to terminate gracefully
start_time = time.time()
while time.time() - start_time < timeout:
try:
os.kill(pid, 0) # Check if process still exists
time.sleep(0.1)
except ProcessLookupError:
# Process terminated
return True
# Process didn't terminate, escalate to SIGKILL
try:
os.kill(pid, signal.SIGKILL)
time.sleep(0.5) # Brief wait after SIGKILL
# Verify it's dead
try:
os.kill(pid, 0)
# Process still alive after SIGKILL - this is unusual
return False
except ProcessLookupError:
# Process finally dead
return True
except ProcessLookupError:
# Process died between timeout and SIGKILL
return True
except (OSError, ValueError):
return False

View File

@@ -206,6 +206,12 @@ def test_config_save_screenshot_false_skips():
env = os.environ.copy()
env['SCREENSHOT_ENABLED'] = 'False'
# DEBUG: Check if NODE_V8_COVERAGE is in env
if 'NODE_V8_COVERAGE' in env:
print(f"\n[DEBUG] NODE_V8_COVERAGE in env: {env['NODE_V8_COVERAGE']}")
else:
print("\n[DEBUG] NODE_V8_COVERAGE NOT in env")
result = subprocess.run(
['node', str(SCREENSHOT_HOOK), f'--url={TEST_URL}', '--snapshot-id=test999'],
cwd=tmpdir,

View File

@@ -17,7 +17,7 @@ import traceback
from typing import ClassVar, Any
from datetime import timedelta
from pathlib import Path
from multiprocessing import Process as MPProcess, cpu_count
from multiprocessing import cpu_count
from django.db.models import QuerySet
from django.utils import timezone
@@ -282,26 +282,80 @@ class Worker:
still_running.remove(hook_name)
@classmethod
def start(cls, **kwargs: Any) -> int:
def start(cls, parent: Any = None, **kwargs: Any) -> int:
"""
Fork a new worker as a subprocess.
Fork a new worker as a subprocess using Process.launch().
Args:
parent: Parent Process record (for hierarchy tracking)
**kwargs: Worker-specific args (crawl_id or snapshot_id)
Returns the PID of the new process.
"""
from archivebox.machine.models import Process
from archivebox.machine.models import Process, Machine
from archivebox.config.configset import get_config
from pathlib import Path
from django.conf import settings
import sys
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
# Build command and get config for the appropriate scope
if cls.name == 'crawl':
crawl_id = kwargs.get('crawl_id')
if not crawl_id:
raise ValueError("CrawlWorker requires crawl_id")
# Use module-level function for pickling compatibility
proc = MPProcess(
target=_run_worker,
args=(cls.name, worker_id),
kwargs=kwargs,
name=f'{cls.name}_worker_{worker_id}',
from archivebox.crawls.models import Crawl
crawl = Crawl.objects.get(id=crawl_id)
cmd = [sys.executable, '-m', 'archivebox', 'run', '--crawl-id', str(crawl_id)]
pwd = Path(crawl.OUTPUT_DIR) # Run in crawl's output directory
env = get_config(scope='crawl', crawl=crawl)
elif cls.name == 'snapshot':
snapshot_id = kwargs.get('snapshot_id')
if not snapshot_id:
raise ValueError("SnapshotWorker requires snapshot_id")
from archivebox.core.models import Snapshot
snapshot = Snapshot.objects.get(id=snapshot_id)
cmd = [sys.executable, '-m', 'archivebox', 'run', '--snapshot-id', str(snapshot_id)]
pwd = Path(snapshot.output_dir) # Run in snapshot's output directory
env = get_config(scope='snapshot', snapshot=snapshot)
else:
raise ValueError(f"Unknown worker type: {cls.name}")
# Ensure output directory exists
pwd.mkdir(parents=True, exist_ok=True)
# Convert config to JSON-serializable format for storage
import json
env_serializable = {
k: json.loads(json.dumps(v, default=str))
for k, v in env.items()
if v is not None
}
# Create Process record with full config as environment
# pwd = where stdout/stderr/pid/cmd files are written (snapshot/crawl output dir)
# cwd (passed to launch) = where subprocess runs from (DATA_DIR)
# parent = parent Process for hierarchy tracking (CrawlWorker -> SnapshotWorker)
process = Process.objects.create(
machine=Machine.current(),
parent=parent,
process_type=Process.TypeChoices.WORKER,
worker_type=cls.name,
pwd=str(pwd),
cmd=cmd,
env=env_serializable,
timeout=3600, # 1 hour default timeout for workers
)
proc.start()
assert proc.pid is not None
return proc.pid
# Launch in background with DATA_DIR as working directory
process.launch(background=True, cwd=str(settings.DATA_DIR))
return process.pid
@classmethod
def get_running_workers(cls) -> list:
@@ -377,17 +431,18 @@ class CrawlWorker(Worker):
self.on_startup()
try:
print(f'[cyan]🔄 CrawlWorker.runloop: Starting tick() for crawl {self.crawl_id}[/cyan]', file=sys.stderr)
print(f'🔄 CrawlWorker starting for crawl {self.crawl_id}', file=sys.stderr)
# Advance state machine: QUEUED → STARTED (triggers run() via @started.enter)
self.crawl.sm.tick()
self.crawl.refresh_from_db()
print(f'[cyan]🔄 tick() complete, crawl status={self.crawl.status}[/cyan]', file=sys.stderr)
print(f'🔄 tick() complete, crawl status={self.crawl.status}', file=sys.stderr)
# Now spawn SnapshotWorkers and monitor progress
while True:
# Check if crawl is done
if self._is_crawl_finished():
print(f'[cyan]🔄 Crawl finished, sealing...[/cyan]', file=sys.stderr)
print(f'🔄 Crawl finished, sealing...', file=sys.stderr)
self.crawl.sm.seal()
break
@@ -401,9 +456,12 @@ class CrawlWorker(Worker):
def _spawn_snapshot_workers(self) -> None:
"""Spawn SnapshotWorkers for queued snapshots (up to limit)."""
from pathlib import Path
from archivebox.core.models import Snapshot
from archivebox.machine.models import Process
debug_log = Path('/tmp/archivebox_crawl_worker_debug.log')
# Count running SnapshotWorkers for this crawl
running_count = Process.objects.filter(
process_type=Process.TypeChoices.WORKER,
@@ -412,22 +470,51 @@ class CrawlWorker(Worker):
status__in=['running', 'started'],
).count()
with open(debug_log, 'a') as f:
f.write(f' _spawn_snapshot_workers: running={running_count}/{self.MAX_SNAPSHOT_WORKERS}\n')
f.flush()
if running_count >= self.MAX_SNAPSHOT_WORKERS:
return # At limit
# Get queued snapshots for this crawl (SnapshotWorker will mark as STARTED in on_startup)
queued_snapshots = Snapshot.objects.filter(
crawl_id=self.crawl_id,
status=Snapshot.StatusChoices.QUEUED,
).order_by('created_at')[:self.MAX_SNAPSHOT_WORKERS - running_count]
# Get snapshots that need workers spawned
# Find all running SnapshotWorker processes for this crawl
running_processes = Process.objects.filter(
parent_id=self.db_process.id,
worker_type='snapshot',
status__in=['running', 'started'],
)
import sys
print(f'[yellow]🔧 _spawn_snapshot_workers: running={running_count}/{self.MAX_SNAPSHOT_WORKERS}, queued={queued_snapshots.count()}[/yellow]', file=sys.stderr)
# Extract snapshot IDs from their pwd (contains snapshot ID at the end)
running_snapshot_ids = []
for proc in running_processes:
if proc.pwd:
# pwd is like: /path/to/archive/{timestamp}
# We need to match this against snapshot.output_dir
running_snapshot_ids.append(proc.pwd)
# Find snapshots that don't have a running worker
all_snapshots = Snapshot.objects.filter(
crawl_id=self.crawl_id,
status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED],
).order_by('created_at')
# Filter out snapshots that already have workers
pending_snapshots = [
snap for snap in all_snapshots
if snap.output_dir not in running_snapshot_ids
][:self.MAX_SNAPSHOT_WORKERS - running_count]
with open(debug_log, 'a') as f:
f.write(f' Found {len(pending_snapshots)} snapshots needing workers for crawl {self.crawl_id}\n')
f.flush()
# Spawn workers
for snapshot in queued_snapshots:
print(f'[yellow]🔧 Spawning worker for {snapshot.url} (status={snapshot.status})[/yellow]', file=sys.stderr)
SnapshotWorker.start(snapshot_id=str(snapshot.id))
for snapshot in pending_snapshots:
with open(debug_log, 'a') as f:
f.write(f' Spawning worker for {snapshot.url} (status={snapshot.status})\n')
f.flush()
SnapshotWorker.start(parent=self.db_process, snapshot_id=str(snapshot.id))
log_worker_event(
worker_type='CrawlWorker',
event=f'Spawned SnapshotWorker for {snapshot.url}',
@@ -437,13 +524,25 @@ class CrawlWorker(Worker):
def _is_crawl_finished(self) -> bool:
"""Check if all snapshots are sealed."""
from pathlib import Path
from archivebox.core.models import Snapshot
debug_log = Path('/tmp/archivebox_crawl_worker_debug.log')
total = Snapshot.objects.filter(crawl_id=self.crawl_id).count()
pending = Snapshot.objects.filter(
crawl_id=self.crawl_id,
status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED],
).count()
queued = Snapshot.objects.filter(crawl_id=self.crawl_id, status=Snapshot.StatusChoices.QUEUED).count()
started = Snapshot.objects.filter(crawl_id=self.crawl_id, status=Snapshot.StatusChoices.STARTED).count()
sealed = Snapshot.objects.filter(crawl_id=self.crawl_id, status=Snapshot.StatusChoices.SEALED).count()
with open(debug_log, 'a') as f:
f.write(f' _is_crawl_finished: total={total}, queued={queued}, started={started}, sealed={sealed}, pending={pending}\n')
f.flush()
return pending == 0
def on_shutdown(self, error: BaseException | None = None) -> None:
@@ -701,24 +800,6 @@ class SnapshotWorker(Worker):
name = name.replace('.bg', '') # Remove .bg suffix
return name
@classmethod
def start(cls, snapshot_id: str, **kwargs: Any) -> int:
"""Fork a SnapshotWorker for a specific snapshot."""
from archivebox.machine.models import Process
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
proc = MPProcess(
target=_run_snapshot_worker, # New module-level function
args=(snapshot_id, worker_id),
kwargs=kwargs,
name=f'snapshot_worker_{snapshot_id[:8]}',
)
proc.start()
assert proc.pid is not None
return proc.pid
# Populate the registry
WORKER_TYPES.update({