rename archive_org to archivedotorg, add BinaryWorker, fix config pass-through

This commit is contained in:
Nick Sweeting
2026-01-04 22:38:15 -08:00
parent 456aaee287
commit 7ceaeae2d9
32 changed files with 1111 additions and 110 deletions

View File

@@ -36,7 +36,7 @@ from django.utils import timezone
from rich import print
from archivebox.misc.logging_util import log_worker_event
from .worker import Worker, CrawlWorker
from .worker import Worker, BinaryWorker, CrawlWorker
def _run_orchestrator_process(exit_on_idle: bool) -> None:
@@ -63,13 +63,14 @@ class Orchestrator:
- Each SnapshotWorker runs hooks sequentially for its snapshot
"""
# Only CrawlWorker - SnapshotWorkers are spawned by CrawlWorker subprocess, not by Orchestrator
WORKER_TYPES: list[Type[Worker]] = [CrawlWorker]
# BinaryWorker (singleton daemon) and CrawlWorker - SnapshotWorkers are spawned by CrawlWorker subprocess, not by Orchestrator
WORKER_TYPES: list[Type[Worker]] = [BinaryWorker, CrawlWorker]
# Configuration
POLL_INTERVAL: float = 2.0 # How often to check for new work (seconds)
IDLE_TIMEOUT: int = 3 # Exit after N idle ticks (0 = never exit)
MAX_CRAWL_WORKERS: int = 8 # Max crawls processing simultaneously
MAX_BINARY_WORKERS: int = 1 # Max binaries installing simultaneously (sequential only)
def __init__(self, exit_on_idle: bool = True, crawl_id: str | None = None):
self.exit_on_idle = exit_on_idle
@@ -194,15 +195,23 @@ class Orchestrator:
return len(WorkerClass.get_running_workers())
def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool:
"""Determine if we should spawn a new CrawlWorker."""
"""Determine if we should spawn a new worker."""
if queue_count == 0:
return False
# Check CrawlWorker limit
# Get appropriate limit based on worker type
if WorkerClass.name == 'crawl':
max_workers = self.MAX_CRAWL_WORKERS
elif WorkerClass.name == 'binary':
max_workers = self.MAX_BINARY_WORKERS # Force sequential: only 1 binary at a time
else:
max_workers = 1 # Default for unknown types
# Check worker limit
running_workers = WorkerClass.get_running_workers()
running_count = len(running_workers)
if running_count >= self.MAX_CRAWL_WORKERS:
if running_count >= max_workers:
return False
# Check if we already have enough workers for the queue size
@@ -285,14 +294,35 @@ class Orchestrator:
def check_queues_and_spawn_workers(self) -> dict[str, int]:
"""
Check Crawl queue and spawn CrawlWorkers as needed.
Check Binary and Crawl queues and spawn workers as needed.
Returns dict of queue sizes.
"""
from archivebox.crawls.models import Crawl
from archivebox.machine.models import Binary, Machine
queue_sizes = {}
# Only check Crawl queue
# Check Binary queue
machine = Machine.current()
binary_queue = Binary.objects.filter(
machine=machine,
status=Binary.StatusChoices.QUEUED,
retry_at__lte=timezone.now()
).order_by('retry_at')
binary_count = binary_queue.count()
queue_sizes['binary'] = binary_count
# Spawn BinaryWorker if needed (one worker per binary, up to MAX_BINARY_WORKERS)
if self.should_spawn_worker(BinaryWorker, binary_count):
# Get next binary to process
binary = binary_queue.first()
if binary:
BinaryWorker.start(binary_id=str(binary.id))
# Check if any BinaryWorkers are still running
running_binary_workers = len(BinaryWorker.get_running_workers())
# Check Crawl queue
crawl_queue = Crawl.objects.filter(
retry_at__lte=timezone.now()
).exclude(
@@ -307,12 +337,15 @@ class Orchestrator:
crawl_count = crawl_queue.count()
queue_sizes['crawl'] = crawl_count
# Spawn CrawlWorker if needed
if self.should_spawn_worker(CrawlWorker, crawl_count):
# Claim next crawl
crawl = crawl_queue.first()
if crawl and self._claim_crawl(crawl):
CrawlWorker.start(crawl_id=str(crawl.id))
# CRITICAL: Only spawn CrawlWorkers if binary queue is empty AND no BinaryWorkers running
# This ensures all binaries are installed before snapshots start processing
if binary_count == 0 and running_binary_workers == 0:
# Spawn CrawlWorker if needed
if self.should_spawn_worker(CrawlWorker, crawl_count):
# Claim next crawl
crawl = crawl_queue.first()
if crawl and self._claim_crawl(crawl):
CrawlWorker.start(crawl_id=str(crawl.id))
return queue_sizes
@@ -328,7 +361,7 @@ class Orchestrator:
)
return updated == 1
def has_pending_work(self, queue_sizes: dict[str, int]) -> bool:
"""Check if any queue has pending work."""
return any(count > 0 for count in queue_sizes.values())

View File

@@ -323,6 +323,20 @@ class Worker:
pwd = Path(snapshot.output_dir) # Run in snapshot's output directory
env = get_config(snapshot=snapshot)
elif cls.name == 'binary':
# BinaryWorker processes a specific binary installation
binary_id = kwargs.get('binary_id')
if not binary_id:
raise ValueError("BinaryWorker requires binary_id")
from archivebox.machine.models import Binary
binary = Binary.objects.get(id=binary_id)
cmd = [sys.executable, '-m', 'archivebox', 'run', '--binary-id', str(binary_id)]
pwd = Path(settings.DATA_DIR) / 'machines' / str(Machine.current().id) / 'binaries' / binary.name / str(binary.id)
pwd.mkdir(parents=True, exist_ok=True)
env = get_config()
else:
raise ValueError(f"Unknown worker type: {cls.name}")
@@ -654,16 +668,8 @@ class SnapshotWorker(Worker):
hooks = discover_hooks('Snapshot', config=config)
hooks = sorted(hooks, key=lambda h: h.name) # Sort by name (includes step prefix)
import sys
print(f'[SnapshotWorker] Discovered {len(hooks)} hooks for snapshot {self.snapshot.url}', file=sys.stderr, flush=True)
if hooks:
print(f'[SnapshotWorker] First 5 hooks: {[h.name for h in hooks[:5]]}', file=sys.stderr, flush=True)
else:
print(f'[SnapshotWorker] WARNING: No hooks discovered! Config keys: {list(config.keys())[:10]}...', file=sys.stderr, flush=True)
# Execute each hook sequentially
for hook_path in hooks:
print(f'[SnapshotWorker] Running hook: {hook_path.name}', file=sys.stderr, flush=True)
hook_name = hook_path.name
plugin = self._extract_plugin_name(hook_name)
hook_step = extract_step(hook_name)
@@ -829,8 +835,96 @@ class SnapshotWorker(Worker):
return name
class BinaryWorker(Worker):
"""
Worker that processes a specific Binary installation.
Like CrawlWorker and SnapshotWorker, BinaryWorker:
- Processes one specific binary (specified by binary_id)
- Installs it via Binary.run() which runs on_Binary__* hooks
- Exits when done
Orchestrator spawns BinaryWorkers sequentially (MAX_BINARY_WORKERS=1) to avoid
conflicts during binary installations.
"""
name: ClassVar[str] = 'binary'
MAX_TICK_TIME: ClassVar[int] = 600 # 10 minutes for binary installations
MAX_CONCURRENT_TASKS: ClassVar[int] = 1 # One binary per worker
def __init__(self, binary_id: str, worker_id: int = 0):
self.binary_id = binary_id
super().__init__(worker_id=worker_id)
def get_model(self):
from archivebox.machine.models import Binary
return Binary
def get_next_item(self):
"""Get the specific binary to install."""
from archivebox.machine.models import Binary
try:
return Binary.objects.get(id=self.binary_id)
except Binary.DoesNotExist:
return None
def runloop(self) -> None:
"""Install the specified binary."""
import sys
self.on_startup()
try:
binary = self.get_next_item()
if not binary:
log_worker_event(
worker_type='BinaryWorker',
event=f'Binary {self.binary_id} not found',
indent_level=1,
pid=self.pid,
)
return
print(f'[cyan]🔧 BinaryWorker installing: {binary.name}[/cyan]', file=sys.stderr)
# Tick the state machine to trigger installation
# This calls BinaryMachine.on_install() -> Binary.run() -> on_Binary__* hooks
binary.sm.tick()
# Check result
binary.refresh_from_db()
if binary.status == Binary.StatusChoices.INSTALLED:
log_worker_event(
worker_type='BinaryWorker',
event=f'Installed: {binary.name} -> {binary.abspath}',
indent_level=1,
pid=self.pid,
)
else:
log_worker_event(
worker_type='BinaryWorker',
event=f'Installation pending: {binary.name} (status={binary.status})',
indent_level=1,
pid=self.pid,
)
except Exception as e:
log_worker_event(
worker_type='BinaryWorker',
event=f'Failed to install binary',
indent_level=1,
pid=self.pid,
error=e,
)
finally:
self.on_shutdown()
# Populate the registry
WORKER_TYPES.update({
'binary': BinaryWorker,
'crawl': CrawlWorker,
'snapshot': SnapshotWorker,
})