Refactor ArchiveBox onto abx-dl bus runner

This commit is contained in:
Nick Sweeting
2026-03-21 11:47:57 -07:00
parent ee9ed440d1
commit c87079aa0a
45 changed files with 1282 additions and 6396 deletions

View File

@@ -70,8 +70,6 @@ class ArchiveBoxGroup(click.Group):
'setup': 'install',
'import': 'add',
'archive': 'add',
# Old commands replaced by new model commands
'orchestrator': 'run',
}
legacy_model_subcommands = {
'crawl': {'create', 'list', 'update', 'delete'},
@@ -168,7 +166,6 @@ def cli(ctx, help=False):
os.environ['ARCHIVEBOX_RUNSERVER'] = '1'
if '--reload' in sys.argv:
os.environ['ARCHIVEBOX_AUTORELOAD'] = '1'
os.environ['ARCHIVEBOX_ORCHESTRATOR_MANAGED_BY_WATCHER'] = '1'
from archivebox.config.common import STORAGE_CONFIG
os.environ['ARCHIVEBOX_RUNSERVER_PIDFILE'] = str(STORAGE_CONFIG.TMP_DIR / 'runserver.pid')

View File

@@ -60,8 +60,8 @@ def add(urls: str | list[str],
The flow is:
1. Save URLs to sources file
2. Create Crawl with URLs and max_depth
3. Orchestrator creates Snapshots from Crawl URLs (depth=0)
4. Orchestrator runs parser extractors on root snapshots
3. Crawl runner creates Snapshots from Crawl URLs (depth=0)
4. Crawl runner runs parser extractors on root snapshots
5. Parser extractors output to urls.jsonl
6. URLs are added to Crawl.urls and child Snapshots are created
7. Repeat until max_depth is reached
@@ -78,9 +78,10 @@ def add(urls: str | list[str],
from archivebox.crawls.models import Crawl
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.personas.models import Persona
from archivebox.workers.orchestrator import Orchestrator
from archivebox.misc.logging_util import printable_filesize
from archivebox.misc.system import get_dir_size
from archivebox.config.configset import get_config
from archivebox.services.runner import run_crawl
created_by_id = created_by_id or get_or_create_system_user_pk()
started_at = timezone.now()
@@ -101,6 +102,7 @@ def add(urls: str | list[str],
# Read URLs directly into crawl
urls_content = sources_file.read_text()
persona_name = (persona or 'Default').strip() or 'Default'
plugins = plugins or str(get_config().get('PLUGINS') or '')
persona_obj, _ = Persona.objects.get_or_create(name=persona_name)
persona_obj.ensure_dirs()
@@ -148,21 +150,20 @@ def add(urls: str | list[str],
snapshot.ensure_crawl_symlink()
return crawl, crawl.snapshot_set.all()
# 5. Start the orchestrator to process the queue
# The orchestrator will:
# 5. Start the crawl runner to process the queue
# The runner will:
# - Process Crawl -> create Snapshots from all URLs
# - Process Snapshots -> run extractors
# - Parser extractors discover new URLs -> create child Snapshots
# - Repeat until max_depth reached
if bg:
# Background mode: just queue work and return (orchestrator via server will pick it up)
print('[yellow]\\[*] URLs queued. Orchestrator will process them (run `archivebox server` if not already running).[/yellow]')
# Background mode: just queue work and return (background runner via server will pick it up)
print('[yellow]\\[*] URLs queued. The background runner will process them (run `archivebox server` or `archivebox run --daemon` if not already running).[/yellow]')
else:
# Foreground mode: run full orchestrator until all work is done
print('[green]\\[*] Starting orchestrator to process crawl...[/green]')
orchestrator = Orchestrator(exit_on_idle=True, crawl_id=str(crawl.id))
orchestrator.runloop() # Block until complete
# Foreground mode: run full crawl runner until all work is done
print('[green]\\[*] Starting crawl runner to process crawl...[/green]')
run_crawl(str(crawl.id))
# Print summary for foreground runs
try:
@@ -223,7 +224,7 @@ def add(urls: str | list[str],
@click.option('--overwrite', '-F', is_flag=True, help='Overwrite existing data if URLs have been archived previously')
@click.option('--update', is_flag=True, default=ARCHIVING_CONFIG.ONLY_NEW, help='Retry any previously skipped/failed URLs when re-adding them')
@click.option('--index-only', is_flag=True, help='Just add the URLs to the index without archiving them now')
@click.option('--bg', is_flag=True, help='Run archiving in background (start orchestrator and return immediately)')
@click.option('--bg', is_flag=True, help='Run archiving in background (queue work and return immediately)')
@click.argument('urls', nargs=-1, type=click.Path())
@docstring(add.__doc__)
def main(**kwargs):

View File

@@ -122,7 +122,7 @@ def list_binaries(
is_tty = sys.stdout.isatty()
queryset = Binary.objects.all().order_by('name', '-loaded_at')
queryset = Binary.objects.all().order_by('name', '-modified_at', '-created_at')
# Apply filters
filter_kwargs = {

View File

@@ -31,6 +31,7 @@ __package__ = 'archivebox.cli'
__command__ = 'archivebox extract'
import sys
from collections import defaultdict
import rich_click as click
@@ -102,7 +103,7 @@ def run_plugins(
TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
)
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.workers.orchestrator import Orchestrator
from archivebox.services.runner import run_crawl
is_tty = sys.stdout.isatty()
@@ -197,8 +198,20 @@ def run_plugins(
# Run orchestrator if --wait (default)
if wait:
rprint('[blue]Running plugins...[/blue]', file=sys.stderr)
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
snapshot_ids_by_crawl: dict[str, set[str]] = defaultdict(set)
for snapshot_id in snapshot_ids:
try:
snapshot = Snapshot.objects.only('id', 'crawl_id').get(id=snapshot_id)
except Snapshot.DoesNotExist:
continue
snapshot_ids_by_crawl[str(snapshot.crawl_id)].add(str(snapshot.id))
for crawl_id, crawl_snapshot_ids in snapshot_ids_by_crawl.items():
run_crawl(
crawl_id,
snapshot_ids=sorted(crawl_snapshot_ids),
selected_plugins=plugins_list or None,
)
# Output results as JSONL (when piped) or human-readable (when TTY)
for snapshot_id in snapshot_ids:

View File

@@ -12,7 +12,7 @@ from archivebox.misc.util import docstring, enforce_types
@enforce_types
def install(binaries: tuple[str, ...] = (), binproviders: str = '*', dry_run: bool = False) -> None:
"""Detect and install ArchiveBox dependencies by running a dependency-check crawl
"""Detect and install ArchiveBox dependencies by running the abx-dl install flow
Examples:
archivebox install # Install all dependencies
@@ -46,76 +46,23 @@ def install(binaries: tuple[str, ...] = (), binproviders: str = '*', dry_run: bo
print()
if dry_run:
print('[dim]Dry run - would create a crawl to detect dependencies[/dim]')
print('[dim]Dry run - would run the abx-dl install flow[/dim]')
return
# Set up Django
from archivebox.config.django import setup_django
setup_django()
from django.utils import timezone
from archivebox.crawls.models import Crawl
from archivebox.base_models.models import get_or_create_system_user_pk
# Create a crawl for dependency detection
# Using a minimal crawl that will trigger on_Crawl hooks
created_by_id = get_or_create_system_user_pk()
# Build config for this crawl using existing PLUGINS filter
crawl_config = {}
# Combine binary names and provider names into PLUGINS list
plugins = []
if binaries:
plugins.extend(binaries)
plugin_names = list(binaries)
if binproviders != '*':
plugins.extend(binproviders.split(','))
plugin_names.extend(provider.strip() for provider in binproviders.split(',') if provider.strip())
if plugins:
crawl_config['PLUGINS'] = ','.join(plugins)
crawl, created = Crawl.objects.get_or_create(
urls='archivebox://install',
defaults={
'label': 'Dependency detection',
'created_by_id': created_by_id,
'max_depth': 0,
'status': 'queued',
'config': crawl_config,
}
)
# If crawl already existed, reset it to queued state so it can be processed again
if not created:
crawl.status = 'queued'
crawl.retry_at = timezone.now()
crawl.config = crawl_config # Update config
crawl.save()
print(f'[+] Created dependency detection crawl: {crawl.id}')
if crawl_config:
print(f'[+] Crawl config: {crawl_config}')
print(f'[+] Crawl status: {crawl.status}, retry_at: {crawl.retry_at}')
# Verify the crawl is in the queue
from archivebox.crawls.models import Crawl as CrawlModel
queued_crawls = CrawlModel.objects.filter(
retry_at__lte=timezone.now()
).exclude(
status__in=CrawlModel.FINAL_STATES
)
print(f'[+] Crawls in queue: {queued_crawls.count()}')
if queued_crawls.exists():
for c in queued_crawls:
print(f' - Crawl {c.id}: status={c.status}, retry_at={c.retry_at}')
print('[+] Running crawl to detect binaries via on_Crawl hooks...')
print('[+] Running installer via abx-dl bus...')
print()
# Run the crawl synchronously (this triggers on_Crawl hooks)
from archivebox.workers.orchestrator import Orchestrator
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
from archivebox.services.runner import run_install
run_install(plugin_names=plugin_names or None)
print()

View File

@@ -1,18 +1,19 @@
#!/usr/bin/env python3
"""
archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...]
archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...] [--binary-id=...]
Unified command for processing queued work.
Unified command for processing queued work on the shared abx-dl bus.
Modes:
- With stdin JSONL: Process piped records, exit when complete
- Without stdin (TTY): Run orchestrator in foreground until killed
- --crawl-id: Run orchestrator for specific crawl only
- --snapshot-id: Run worker for specific snapshot only (internal use)
- Without stdin (TTY): Run the background runner in foreground until killed
- --crawl-id: Run the crawl runner for a specific crawl only
- --snapshot-id: Run a specific snapshot through its parent crawl
- --binary-id: Emit a BinaryEvent for a specific Binary row
Examples:
# Run orchestrator in foreground
# Run the background runner in foreground
archivebox run
# Run as daemon (don't exit on idle)
@@ -26,17 +27,21 @@ Examples:
# Mixed types work too
cat mixed_records.jsonl | archivebox run
# Run orchestrator for specific crawl (shows live progress for that crawl)
# Run the crawl runner for a specific crawl
archivebox run --crawl-id=019b7e90-04d0-73ed-adec-aad9cfcd863e
# Run worker for specific snapshot (internal use by orchestrator)
# Run one snapshot from an existing crawl
archivebox run --snapshot-id=019b7e90-5a8e-712c-9877-2c70eebe80ad
# Run one queued binary install directly on the bus
archivebox run --binary-id=019b7e90-5a8e-712c-9877-2c70eebe80ad
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox run'
import sys
from collections import defaultdict
import rich_click as click
from rich import print as rprint
@@ -64,7 +69,7 @@ def process_stdin_records() -> int:
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.crawls.models import Crawl
from archivebox.machine.models import Binary
from archivebox.workers.orchestrator import Orchestrator
from archivebox.services.runner import run_binary, run_crawl
records = list(read_stdin())
is_tty = sys.stdout.isatty()
@@ -75,6 +80,11 @@ def process_stdin_records() -> int:
created_by_id = get_or_create_system_user_pk()
queued_count = 0
output_records = []
full_crawl_ids: set[str] = set()
snapshot_ids_by_crawl: dict[str, set[str]] = defaultdict(set)
plugin_names_by_crawl: dict[str, set[str]] = defaultdict(set)
run_all_plugins_for_crawl: set[str] = set()
binary_ids: list[str] = []
for record in records:
record_type = record.get('type', '')
@@ -97,6 +107,8 @@ def process_stdin_records() -> int:
if crawl.status not in [Crawl.StatusChoices.SEALED]:
crawl.status = Crawl.StatusChoices.QUEUED
crawl.save()
full_crawl_ids.add(str(crawl.id))
run_all_plugins_for_crawl.add(str(crawl.id))
output_records.append(crawl.to_json())
queued_count += 1
@@ -116,6 +128,14 @@ def process_stdin_records() -> int:
if snapshot.status not in [Snapshot.StatusChoices.SEALED]:
snapshot.status = Snapshot.StatusChoices.QUEUED
snapshot.save()
crawl = snapshot.crawl
crawl.retry_at = timezone.now()
if crawl.status != Crawl.StatusChoices.STARTED:
crawl.status = Crawl.StatusChoices.QUEUED
crawl.save(update_fields=['status', 'retry_at', 'modified_at'])
crawl_id = str(snapshot.crawl_id)
snapshot_ids_by_crawl[crawl_id].add(str(snapshot.id))
run_all_plugins_for_crawl.add(crawl_id)
output_records.append(snapshot.to_json())
queued_count += 1
@@ -135,19 +155,30 @@ def process_stdin_records() -> int:
if archiveresult.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED, ArchiveResult.StatusChoices.BACKOFF]:
archiveresult.status = ArchiveResult.StatusChoices.QUEUED
archiveresult.save()
snapshot = archiveresult.snapshot
snapshot.retry_at = timezone.now()
if snapshot.status != Snapshot.StatusChoices.STARTED:
snapshot.status = Snapshot.StatusChoices.QUEUED
snapshot.save(update_fields=['status', 'retry_at', 'modified_at'])
crawl = snapshot.crawl
crawl.retry_at = timezone.now()
if crawl.status != Crawl.StatusChoices.STARTED:
crawl.status = Crawl.StatusChoices.QUEUED
crawl.save(update_fields=['status', 'retry_at', 'modified_at'])
crawl_id = str(snapshot.crawl_id)
snapshot_ids_by_crawl[crawl_id].add(str(snapshot.id))
if archiveresult.plugin:
plugin_names_by_crawl[crawl_id].add(archiveresult.plugin)
output_records.append(archiveresult.to_json())
queued_count += 1
elif record_type == TYPE_BINARY:
# Binary records - create or update and queue for installation
if record_id:
# Existing binary - re-queue
try:
binary = Binary.objects.get(id=record_id)
except Binary.DoesNotExist:
binary = Binary.from_json(record)
else:
# New binary - create it
binary = Binary.from_json(record)
if binary:
@@ -155,6 +186,7 @@ def process_stdin_records() -> int:
if binary.status != Binary.StatusChoices.INSTALLED:
binary.status = Binary.StatusChoices.QUEUED
binary.save()
binary_ids.append(str(binary.id))
output_records.append(binary.to_json())
queued_count += 1
@@ -177,143 +209,123 @@ def process_stdin_records() -> int:
rprint(f'[blue]Processing {queued_count} records...[/blue]', file=sys.stderr)
# Run orchestrator until all queued work is done
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
for binary_id in binary_ids:
run_binary(binary_id)
targeted_crawl_ids = full_crawl_ids | set(snapshot_ids_by_crawl)
if targeted_crawl_ids:
for crawl_id in sorted(targeted_crawl_ids):
run_crawl(
crawl_id,
snapshot_ids=None if crawl_id in full_crawl_ids else sorted(snapshot_ids_by_crawl[crawl_id]),
selected_plugins=None if crawl_id in run_all_plugins_for_crawl else sorted(plugin_names_by_crawl[crawl_id]),
)
return 0
def run_orchestrator(daemon: bool = False) -> int:
def run_runner(daemon: bool = False) -> int:
"""
Run the orchestrator process.
The orchestrator:
1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
2. Spawns worker processes when there is work to do
3. Monitors worker health and restarts failed workers
4. Exits when all queues are empty (unless --daemon)
Run the background runner loop.
Args:
daemon: Run forever (don't exit when idle)
Returns exit code (0 = success, 1 = error).
"""
from archivebox.workers.orchestrator import Orchestrator
from django.utils import timezone
from archivebox.machine.models import Machine, Process
from archivebox.services.runner import run_pending_crawls
if Orchestrator.is_running():
rprint('[yellow]Orchestrator is already running[/yellow]', file=sys.stderr)
return 0
Process.cleanup_stale_running()
Machine.current()
current = Process.current()
if current.process_type != Process.TypeChoices.ORCHESTRATOR:
current.process_type = Process.TypeChoices.ORCHESTRATOR
current.save(update_fields=['process_type', 'modified_at'])
try:
orchestrator = Orchestrator(exit_on_idle=not daemon)
orchestrator.runloop()
run_pending_crawls(daemon=daemon)
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
rprint(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
def run_snapshot_worker(snapshot_id: str) -> int:
"""
Run a SnapshotWorker for a specific snapshot.
Args:
snapshot_id: Snapshot UUID to process
Returns exit code (0 = success, 1 = error).
"""
from archivebox.workers.worker import _run_snapshot_worker
try:
_run_snapshot_worker(snapshot_id=snapshot_id, worker_id=0)
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
rprint(f'[red]Runner error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
finally:
current.refresh_from_db()
if current.status != Process.StatusChoices.EXITED:
current.status = Process.StatusChoices.EXITED
current.ended_at = current.ended_at or timezone.now()
current.save(update_fields=['status', 'ended_at', 'modified_at'])
@click.command()
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
@click.option('--crawl-id', help="Run orchestrator for specific crawl only")
@click.option('--snapshot-id', help="Run worker for specific snapshot only")
@click.option('--binary-id', help="Run worker for specific binary only")
@click.option('--worker-type', help="Run worker of specific type (binary)")
def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str, worker_type: str):
@click.option('--crawl-id', help="Run the crawl runner for a specific crawl only")
@click.option('--snapshot-id', help="Run one snapshot through its crawl")
@click.option('--binary-id', help="Run one queued binary install directly on the bus")
def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str):
"""
Process queued work.
Modes:
- No args + stdin piped: Process piped JSONL records
- No args + TTY: Run orchestrator for all work
- --crawl-id: Run orchestrator for that crawl only
- --snapshot-id: Run worker for that snapshot only
- --binary-id: Run worker for that binary only
- No args + TTY: Run the crawl runner for all work
- --crawl-id: Run the crawl runner for that crawl only
- --snapshot-id: Run one snapshot through its crawl only
- --binary-id: Run one queued binary install directly on the bus
"""
# Snapshot worker mode
if snapshot_id:
sys.exit(run_snapshot_worker(snapshot_id))
# Binary worker mode (specific binary)
if binary_id:
from archivebox.workers.worker import BinaryWorker
try:
worker = BinaryWorker(binary_id=binary_id, worker_id=0)
worker.runloop()
from archivebox.services.runner import run_binary
run_binary(binary_id)
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
rprint(f'[red]Runner error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
# Worker type mode (daemon - processes all pending items)
if worker_type:
if worker_type == 'binary':
from archivebox.workers.worker import BinaryWorker
try:
worker = BinaryWorker(worker_id=0) # No binary_id = daemon mode
worker.runloop()
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
else:
rprint(f'[red]Unknown worker type: {worker_type}[/red]', file=sys.stderr)
sys.exit(1)
# Crawl worker mode
if crawl_id:
from archivebox.workers.worker import CrawlWorker
try:
worker = CrawlWorker(crawl_id=crawl_id, worker_id=0)
worker.runloop()
from archivebox.services.runner import run_crawl
run_crawl(crawl_id)
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
rprint(f'[red]Runner error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
# Check if stdin has data (non-TTY means piped input)
if not sys.stdin.isatty():
sys.exit(process_stdin_records())
else:
sys.exit(run_orchestrator(daemon=daemon))
sys.exit(run_runner(daemon=daemon))
def run_snapshot_worker(snapshot_id: str) -> int:
from archivebox.core.models import Snapshot
from archivebox.services.runner import run_crawl
try:
snapshot = Snapshot.objects.select_related('crawl').get(id=snapshot_id)
run_crawl(str(snapshot.crawl_id), snapshot_ids=[str(snapshot.id)])
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
rprint(f'[red]Runner error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':

View File

@@ -22,14 +22,14 @@ def schedule(add: bool = False,
overwrite: bool = False,
update: bool = not ARCHIVING_CONFIG.ONLY_NEW,
import_path: str | None = None):
"""Manage database-backed scheduled crawls processed by the orchestrator."""
"""Manage database-backed scheduled crawls processed by the crawl runner."""
from django.utils import timezone
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl, CrawlSchedule
from archivebox.crawls.schedule_utils import validate_schedule
from archivebox.workers.orchestrator import Orchestrator
from archivebox.services.runner import run_pending_crawls
depth = int(depth)
result: dict[str, object] = {
@@ -126,16 +126,12 @@ def schedule(add: bool = False,
enqueued += 1
result['run_all_enqueued'] = enqueued
print(f'[green]\\[*] Enqueued {enqueued} scheduled crawl(s) immediately.[/green]')
if enqueued and not Orchestrator.is_running():
print('[yellow]\\[*] No orchestrator is running yet. Start `archivebox server` or `archivebox schedule --foreground` to process the queued crawls.[/yellow]')
if enqueued:
print('[yellow]\\[*] Start `archivebox server`, `archivebox run --daemon`, or `archivebox schedule --foreground` to process the queued crawls.[/yellow]')
if foreground:
print('[green]\\[*] Starting global orchestrator in foreground mode. It will materialize scheduled crawls and process queued work.[/green]')
if Orchestrator.is_running():
print('[yellow]\\[*] Orchestrator is already running.[/yellow]')
else:
orchestrator = Orchestrator(exit_on_idle=False)
orchestrator.runloop()
print('[green]\\[*] Starting global crawl runner in foreground mode. It will materialize scheduled crawls and process queued work.[/green]')
run_pending_crawls(daemon=True)
if quiet:
return result
@@ -161,12 +157,12 @@ def schedule(add: bool = False,
@click.option('--update', is_flag=True, help='Retry previously failed/skipped URLs when scheduled crawls run')
@click.option('--clear', is_flag=True, help='Disable all currently enabled schedules')
@click.option('--show', is_flag=True, help='Print all currently enabled schedules')
@click.option('--foreground', '-f', is_flag=True, help='Run the global orchestrator in the foreground (no crontab required)')
@click.option('--foreground', '-f', is_flag=True, help='Run the global crawl runner in the foreground (no crontab required)')
@click.option('--run-all', is_flag=True, help='Enqueue all enabled schedules immediately and process them once')
@click.argument('import_path', required=False)
@docstring(schedule.__doc__)
def main(**kwargs):
"""Manage database-backed scheduled crawls processed by the orchestrator."""
"""Manage database-backed scheduled crawls processed by the crawl runner."""
schedule(**kwargs)

View File

@@ -43,7 +43,6 @@ def server(runserver_args: Iterable[str]=(SERVER_CONFIG.BIND_ADDR,),
os.environ['ARCHIVEBOX_RUNSERVER'] = '1'
if reload:
os.environ['ARCHIVEBOX_AUTORELOAD'] = '1'
os.environ['ARCHIVEBOX_ORCHESTRATOR_MANAGED_BY_WATCHER'] = '1'
from archivebox.config.common import STORAGE_CONFIG
pidfile = str(STORAGE_CONFIG.TMP_DIR / 'runserver.pid')
os.environ['ARCHIVEBOX_RUNSERVER_PIDFILE'] = pidfile
@@ -52,9 +51,8 @@ def server(runserver_args: Iterable[str]=(SERVER_CONFIG.BIND_ADDR,),
is_reloader_child = os.environ.get(DJANGO_AUTORELOAD_ENV) == 'true'
if not is_reloader_child:
env = os.environ.copy()
env['ARCHIVEBOX_ORCHESTRATOR_WATCHER'] = '1'
subprocess.Popen(
[sys.executable, '-m', 'archivebox', 'manage', 'orchestrator_watch', f'--pidfile={pidfile}'],
[sys.executable, '-m', 'archivebox', 'manage', 'runner_watch', f'--pidfile={pidfile}'],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
@@ -101,7 +99,7 @@ def server(runserver_args: Iterable[str]=(SERVER_CONFIG.BIND_ADDR,),
start_server_workers,
is_port_in_use,
)
from archivebox.workers.orchestrator import Orchestrator
from archivebox.machine.models import Machine, Process
# Check if port is already in use
if is_port_in_use(host, int(port)):
@@ -110,11 +108,15 @@ def server(runserver_args: Iterable[str]=(SERVER_CONFIG.BIND_ADDR,),
print(' Stop the conflicting process or choose a different port')
sys.exit(1)
# Check if orchestrator is already running for this data directory
if Orchestrator.is_running():
print('[red][X] Error: ArchiveBox orchestrator is already running for this data directory[/red]')
print(' Stop the existing orchestrator before starting a new server')
print(' To stop: pkill -f "archivebox manage orchestrator"')
# Check if the background crawl runner is already running for this data directory
if Process.objects.filter(
machine=Machine.current(),
status=Process.StatusChoices.RUNNING,
process_type=Process.TypeChoices.ORCHESTRATOR,
).exists():
print('[red][X] Error: ArchiveBox background runner is already running for this data directory[/red]')
print(' Stop the existing runner before starting a new server')
print(' To stop: pkill -f "archivebox run --daemon"')
sys.exit(1)
# Check if supervisord is already running
@@ -125,12 +127,12 @@ def server(runserver_args: Iterable[str]=(SERVER_CONFIG.BIND_ADDR,),
# If daphne is already running, error out
if daphne_state == 'RUNNING':
orchestrator_proc = get_worker(supervisor, 'worker_orchestrator')
orchestrator_state = orchestrator_proc.get('statename') if isinstance(orchestrator_proc, dict) else None
runner_proc = get_worker(supervisor, 'worker_runner')
runner_state = runner_proc.get('statename') if isinstance(runner_proc, dict) else None
print('[red][X] Error: ArchiveBox server is already running[/red]')
print(f' [green]√[/green] Web server (worker_daphne) is RUNNING on [deep_sky_blue4][link=http://{host}:{port}]http://{host}:{port}[/link][/deep_sky_blue4]')
if orchestrator_state == 'RUNNING':
print(' [green]√[/green] Background worker (worker_orchestrator) is RUNNING')
if runner_state == 'RUNNING':
print(' [green]√[/green] Background runner (worker_runner) is RUNNING')
print()
print('[yellow]To stop the existing server, run:[/yellow]')
print(' pkill -f "archivebox server"')

View File

@@ -723,7 +723,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ConfigEditorMixin, BaseModelAdmin):
messages.success(
request,
f"Queued {queued} snapshots for re-archiving. The orchestrator will process them in the background.",
f"Queued {queued} snapshots for re-archiving. The background runner will process them.",
)
@@ -739,7 +739,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ConfigEditorMixin, BaseModelAdmin):
messages.success(
request,
f"Creating {queryset.count()} new fresh snapshots. The orchestrator will process them in the background.",
f"Creating {queryset.count()} new fresh snapshots. The background runner will process them.",
)
@admin.action(
@@ -750,7 +750,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ConfigEditorMixin, BaseModelAdmin):
messages.success(
request,
f"Queued {queued} snapshots for full re-archive (overwriting existing). The orchestrator will process them in the background.",
f"Queued {queued} snapshots for full re-archive (overwriting existing). The background runner will process them.",
)
@admin.action(

View File

@@ -3,8 +3,6 @@ __package__ = 'archivebox.core'
from django.apps import AppConfig
import os
_ORCHESTRATOR_BOOTSTRAPPED = False
class CoreConfig(AppConfig):
name = 'archivebox.core'
@@ -35,32 +33,15 @@ class CoreConfig(AppConfig):
except Exception:
pass
def _should_manage_orchestrator() -> bool:
if os.environ.get('ARCHIVEBOX_ORCHESTRATOR_MANAGED_BY_WATCHER') == '1':
return False
if os.environ.get('ARCHIVEBOX_ORCHESTRATOR_PROCESS') == '1':
return False
def _should_prepare_runtime() -> bool:
if os.environ.get('ARCHIVEBOX_RUNSERVER') == '1':
if os.environ.get('ARCHIVEBOX_AUTORELOAD') == '1':
return os.environ.get(DJANGO_AUTORELOAD_ENV) == 'true'
return True
return False
argv = ' '.join(sys.argv).lower()
if 'orchestrator' in argv:
return False
return 'daphne' in argv and '--reload' in sys.argv
if _should_manage_orchestrator():
global _ORCHESTRATOR_BOOTSTRAPPED
if _ORCHESTRATOR_BOOTSTRAPPED:
return
_ORCHESTRATOR_BOOTSTRAPPED = True
if _should_prepare_runtime():
from archivebox.machine.models import Process, Machine
from archivebox.workers.orchestrator import Orchestrator
Process.cleanup_stale_running()
Machine.current()
if not Orchestrator.is_running():
Orchestrator(exit_on_idle=False).start()

View File

@@ -1821,7 +1821,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
Check if all ArchiveResults are finished.
Note: This is only called for observability/progress tracking.
SnapshotWorker owns the execution and doesn't poll this.
The shared runner owns execution and does not poll this.
"""
# Check if any ARs are still pending/started
pending = self.archiveresult_set.exclude(
@@ -2325,7 +2325,7 @@ class SnapshotMachine(BaseStateMachine):
@started.enter
def enter_started(self):
"""Just mark as started - SnapshotWorker will create ARs and run hooks."""
"""Just mark as started. The shared runner creates ArchiveResults and runs hooks."""
self.snapshot.status = Snapshot.StatusChoices.STARTED
self.snapshot.retry_at = None # No more polling
self.snapshot.save(update_fields=['status', 'retry_at', 'modified_at'])
@@ -3344,8 +3344,8 @@ class ArchiveResultMachine(BaseStateMachine):
"""
Check if this is the last ArchiveResult to finish - if so, seal the parent Snapshot.
Note: In the new architecture, SnapshotWorker handles step advancement and sealing.
This method is kept for backwards compatibility with manual CLI commands.
Note: In the new architecture, the shared runner handles step advancement and sealing.
This method is kept for direct model-driven edge cases.
"""
import sys

View File

@@ -1068,21 +1068,27 @@ class HealthCheckView(View):
def live_progress_view(request):
"""Simple JSON endpoint for live progress status - used by admin progress monitor."""
try:
from archivebox.workers.orchestrator import Orchestrator
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.machine.models import Process, Machine
# Get orchestrator status
orchestrator_running = Orchestrator.is_running()
total_workers = Orchestrator().get_total_worker_count() if orchestrator_running else 0
machine = Machine.current()
orchestrator_proc = Process.objects.filter(
machine=machine,
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
).order_by('-started_at').first()
orchestrator_running = orchestrator_proc is not None
orchestrator_pid = orchestrator_proc.pid if orchestrator_proc else None
total_workers = Process.objects.filter(
machine=machine,
status=Process.StatusChoices.RUNNING,
process_type__in=[
Process.TypeChoices.WORKER,
Process.TypeChoices.HOOK,
Process.TypeChoices.BINARY,
],
).count()
# Get model counts by status
crawls_pending = Crawl.objects.filter(status=Crawl.StatusChoices.QUEUED).count()
@@ -1128,43 +1134,27 @@ def live_progress_view(request):
# Build hierarchical active crawls with nested snapshots and archive results
running_workers = Process.objects.filter(
running_processes = Process.objects.filter(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
process_type__in=[
Process.TypeChoices.HOOK,
Process.TypeChoices.BINARY,
],
)
crawl_worker_pids: dict[str, int] = {}
snapshot_worker_pids: dict[str, int] = {}
for proc in running_workers:
crawl_process_pids: dict[str, int] = {}
snapshot_process_pids: dict[str, int] = {}
for proc in running_processes:
env = proc.env or {}
if not isinstance(env, dict):
env = {}
cmd = proc.cmd or []
if proc.worker_type == 'crawl':
crawl_id = env.get('CRAWL_ID')
if not crawl_id:
for i, part in enumerate(cmd):
if part == '--crawl-id' and i + 1 < len(cmd):
crawl_id = cmd[i + 1]
break
if part.startswith('--crawl-id='):
crawl_id = part.split('=', 1)[1]
break
if crawl_id:
crawl_worker_pids[str(crawl_id)] = proc.pid
elif proc.worker_type == 'snapshot':
snapshot_id = env.get('SNAPSHOT_ID')
if not snapshot_id:
for i, part in enumerate(cmd):
if part == '--snapshot-id' and i + 1 < len(cmd):
snapshot_id = cmd[i + 1]
break
if part.startswith('--snapshot-id='):
snapshot_id = part.split('=', 1)[1]
break
if snapshot_id:
snapshot_worker_pids[str(snapshot_id)] = proc.pid
crawl_id = env.get('CRAWL_ID')
snapshot_id = env.get('SNAPSHOT_ID')
if crawl_id and proc.pid:
crawl_process_pids.setdefault(str(crawl_id), proc.pid)
if snapshot_id and proc.pid:
snapshot_process_pids.setdefault(str(snapshot_id), proc.pid)
active_crawls_qs = Crawl.objects.filter(
status__in=[Crawl.StatusChoices.QUEUED, Crawl.StatusChoices.STARTED]
@@ -1274,7 +1264,7 @@ def live_progress_view(request):
'failed_plugins': failed_plugins,
'pending_plugins': pending_plugins,
'all_plugins': all_plugins,
'worker_pid': snapshot_worker_pids.get(str(snapshot.id)),
'worker_pid': snapshot_process_pids.get(str(snapshot.id)),
})
# Check if crawl can start (for debugging stuck crawls)
@@ -1303,7 +1293,7 @@ def live_progress_view(request):
'urls_preview': urls_preview,
'retry_at_future': retry_at_future,
'seconds_until_retry': seconds_until_retry,
'worker_pid': crawl_worker_pids.get(str(crawl.id)),
'worker_pid': crawl_process_pids.get(str(crawl.id)),
})
return JsonResponse({

View File

@@ -403,7 +403,7 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
This helper follows that contract by claiming each Binary before ticking
it, and by waiting when another worker already owns the row. That keeps
synchronous crawl execution compatible with the global BinaryWorker and
synchronous crawl execution compatible with the shared background runner and
avoids duplicate installs of the same dependency.
"""
import time
@@ -701,13 +701,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
from archivebox.hooks import run_hook, discover_hooks
from archivebox.machine.models import Process
# Kill any background Crawl hooks using Process records
# Find all running hook Processes that are children of this crawl's workers
# (CrawlWorker already kills its hooks via on_shutdown, but this is backup for orphans)
running_hooks = Process.objects.filter(
parent__worker_type='crawl',
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
env__CRAWL_ID=str(self.id),
).distinct()
for process in running_hooks:

View File

@@ -296,8 +296,8 @@ class Binary(ModelWithHealthStats, ModelWithStateMachine):
@property
def is_valid(self) -> bool:
"""A binary is valid if it has both abspath and version set."""
return bool(self.abspath) and bool(self.version)
"""A binary is valid if it has a resolved path and is marked installed."""
return bool(self.abspath) and self.status == self.StatusChoices.INSTALLED
@cached_property
def binary_info(self) -> dict:
@@ -1176,10 +1176,8 @@ class Process(models.Model):
if 'supervisord' in argv_str:
return cls.TypeChoices.SUPERVISORD
elif 'orchestrator' in argv_str:
elif 'archivebox run' in argv_str or 'runner_watch' in argv_str:
return cls.TypeChoices.ORCHESTRATOR
elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']):
return cls.TypeChoices.WORKER
elif 'archivebox' in argv_str:
return cls.TypeChoices.CLI
else:

View File

@@ -1,861 +0,0 @@
"""
Rich Layout-based live progress display for ArchiveBox orchestrator.
Shows a comprehensive dashboard with:
- Top: Crawl queue status (full width)
- Middle: Crawl queue tree with hook outputs
- Bottom: Running process logs (dynamic panels)
"""
__package__ = 'archivebox.misc'
from datetime import datetime, timezone
import os
import re
from typing import List, Optional, Any
from collections import deque
from pathlib import Path
from rich import box
from rich.console import Group, RenderableType
from rich.layout import Layout
from rich.columns import Columns
from rich.panel import Panel
from rich.text import Text
from rich.table import Table
from rich.tree import Tree
from rich.cells import cell_len
from archivebox.config import VERSION
_RICH_TAG_RE = re.compile(r'\[/?[^\]]+\]')
def _strip_rich(text: str) -> str:
return _RICH_TAG_RE.sub('', text or '').strip()
class CrawlQueuePanel:
"""Display crawl queue status across full width."""
def __init__(self):
self.orchestrator_status = "Idle"
self.crawl_queue_count = 0
self.crawl_workers_count = 0
self.binary_queue_count = 0
self.binary_workers_count = 0
self.max_crawl_workers = 8
self.crawl_id: Optional[str] = None
def __rich__(self) -> RenderableType:
grid = Table.grid(expand=True)
grid.add_column(justify="left", ratio=1)
grid.add_column(justify="center", ratio=1)
grid.add_column(justify="center", ratio=1)
grid.add_column(justify="right", ratio=1)
# Left: ArchiveBox version + timestamp
left_text = Text()
left_text.append("ArchiveBox ", style="bold cyan")
left_text.append(f"v{VERSION}", style="bold yellow")
left_text.append(f"{datetime.now(timezone.utc).strftime('%H:%M:%S')}", style="grey53")
# Center-left: Crawl + Binary queue status
queue_style = "yellow" if self.crawl_queue_count > 0 else "grey53"
center_left_text = Text()
center_left_text.append("Crawls: ", style="white")
center_left_text.append(str(self.crawl_queue_count), style=f"bold {queue_style}")
center_left_text.append(" queued", style="grey53")
center_left_text.append(" • Binaries: ", style="white")
binary_queue_style = "yellow" if self.binary_queue_count > 0 else "grey53"
center_left_text.append(str(self.binary_queue_count), style=f"bold {binary_queue_style}")
center_left_text.append(" queued", style="grey53")
# Center-right: Worker status
worker_style = "green" if self.crawl_workers_count > 0 else "grey53"
center_right_text = Text()
center_right_text.append("Workers: ", style="white")
center_right_text.append(f"{self.crawl_workers_count}/{self.max_crawl_workers}", style=f"bold {worker_style}")
center_right_text.append(" crawl", style="grey53")
binary_worker_style = "green" if self.binary_workers_count > 0 else "grey53"
center_right_text.append("", style="grey53")
center_right_text.append(str(self.binary_workers_count), style=f"bold {binary_worker_style}")
center_right_text.append(" binary", style="grey53")
# Right: Orchestrator status
status_color = "green" if self.crawl_workers_count > 0 else "grey53"
right_text = Text()
right_text.append("Status: ", style="white")
right_text.append(self.orchestrator_status, style=f"bold {status_color}")
if self.crawl_id:
right_text.append(f" [{self.crawl_id[:8]}]", style="grey53")
grid.add_row(left_text, center_left_text, center_right_text, right_text)
return Panel(grid, style="white on blue", box=box.HORIZONTALS)
class ProcessLogPanel:
"""Display logs for a running Process."""
def __init__(self, process: Any, max_lines: int = 8, compact: bool | None = None, bg_terminating: bool = False):
self.process = process
self.max_lines = max_lines
self.compact = compact
self.bg_terminating = bg_terminating
def __rich__(self) -> RenderableType:
completed_line = self._completed_output_line()
if completed_line:
style = "green" if self._completed_ok() else "yellow"
return Text(completed_line, style=style)
is_pending = self._is_pending()
output_line = '' if is_pending else self._output_line()
stdout_lines = []
stderr_lines = []
try:
stdout_lines = list(self.process.tail_stdout(lines=self.max_lines, follow=False))
stderr_lines = list(self.process.tail_stderr(lines=self.max_lines, follow=False))
except Exception:
stdout_lines = []
stderr_lines = []
header_lines = []
chrome_launch_line = self._chrome_launch_line(stderr_lines, stdout_lines)
if chrome_launch_line:
header_lines.append(Text(chrome_launch_line, style="grey53"))
if output_line:
header_lines.append(Text(output_line, style="grey53"))
log_lines = []
for line in stdout_lines:
if line:
log_lines.append(Text(line, style="white"))
for line in stderr_lines:
if line:
log_lines.append(Text(line, style="cyan"))
max_body = max(1, self.max_lines - len(header_lines))
if not log_lines:
log_lines = []
lines = header_lines + log_lines[-max_body:]
content = Group(*lines) if lines else Text("")
title = self._title()
border_style = self._border_style(is_pending=is_pending)
height = 2 if is_pending else None
return Panel(
content,
title=title,
border_style=border_style,
box=box.HORIZONTALS,
padding=(0, 1),
height=height,
)
def plain_lines(self) -> list[str]:
completed_line = self._completed_output_line()
if completed_line:
return [completed_line]
lines = []
if not self._is_pending():
output_line = self._output_line()
if output_line:
lines.append(output_line)
try:
stdout_lines = list(self.process.tail_stdout(lines=self.max_lines, follow=False))
stderr_lines = list(self.process.tail_stderr(lines=self.max_lines, follow=False))
except Exception:
stdout_lines = []
stderr_lines = []
for line in stdout_lines:
if line:
lines.append(line)
for line in stderr_lines:
if line:
lines.append(line)
return lines
def _title(self) -> str:
process_type = getattr(self.process, 'process_type', 'process')
worker_type = getattr(self.process, 'worker_type', '')
pid = getattr(self.process, 'pid', None)
label = process_type
if process_type == 'worker' and worker_type:
label, worker_suffix = self._worker_label(worker_type)
elif process_type == 'hook':
try:
cmd = getattr(self.process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else 'hook'
plugin_name = hook_path.parent.name if hook_path and hook_path.parent.name else 'hook'
except Exception:
hook_name = 'hook'
plugin_name = 'hook'
label = f"{plugin_name}/{hook_name}"
worker_suffix = ''
else:
worker_suffix = ''
url = self._extract_url()
url_suffix = f" url={self._abbrev_url(url)}" if url else ""
time_suffix = self._elapsed_suffix()
title_style = "grey53" if self._is_pending() else "bold white"
if pid:
return f"[{title_style}]{label}[/{title_style}] [grey53]pid={pid}{worker_suffix}{url_suffix}{time_suffix}[/grey53]"
return f"[{title_style}]{label}[/{title_style}]{f' [grey53]{worker_suffix.strip()} {url_suffix.strip()}{time_suffix}[/grey53]' if (worker_suffix or url_suffix or time_suffix) else ''}".rstrip()
def _is_background_hook(self) -> bool:
if getattr(self.process, 'process_type', '') != 'hook':
return False
try:
cmd = getattr(self.process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
return '.bg.' in hook_name
except Exception:
return False
def _is_pending(self) -> bool:
status = getattr(self.process, 'status', '')
if status in ('queued', 'pending', 'backoff'):
return True
if getattr(self.process, 'process_type', '') == 'hook' and not getattr(self.process, 'pid', None):
return True
return False
def _completed_ok(self) -> bool:
exit_code = getattr(self.process, 'exit_code', None)
return exit_code in (0, None)
def _completed_output_line(self) -> str:
status = getattr(self.process, 'status', '')
if status != 'exited':
return ''
output_line = self._output_line()
if not output_line:
return ''
if not self._has_output_files():
return ''
return output_line
def _has_output_files(self) -> bool:
pwd = getattr(self.process, 'pwd', None)
if not pwd:
return False
try:
base = Path(pwd)
if not base.exists():
return False
ignore = {'stdout.log', 'stderr.log', 'cmd.sh', 'process.pid', 'hook.pid', 'listener.pid'}
for path in base.rglob('*'):
if path.is_file() and path.name not in ignore:
return True
except Exception:
return False
return False
def _border_style(self, is_pending: bool) -> str:
if is_pending:
return "grey53"
status = getattr(self.process, 'status', '')
if status == 'exited':
exit_code = getattr(self.process, 'exit_code', None)
return "green" if exit_code in (0, None) else "yellow"
is_hook = getattr(self.process, 'process_type', '') == 'hook'
if is_hook and not self._is_background_hook():
return "green"
if is_hook and self._is_background_hook() and self.bg_terminating:
return "red"
return "cyan"
def _worker_label(self, worker_type: str) -> tuple[str, str]:
cmd = getattr(self.process, 'cmd', []) or []
if worker_type == 'crawl':
crawl_id = self._extract_arg(cmd, '--crawl-id')
suffix = ''
if crawl_id:
suffix = f" id={str(crawl_id)[-8:]}"
try:
from archivebox.crawls.models import Crawl
crawl = Crawl.objects.filter(id=crawl_id).first()
if crawl:
urls = crawl.get_urls_list()
if urls:
url_list = self._abbrev_urls(urls)
suffix += f" urls={url_list}"
except Exception:
pass
return 'crawl', suffix
if worker_type == 'snapshot':
snapshot_id = self._extract_arg(cmd, '--snapshot-id')
suffix = ''
if snapshot_id:
suffix = f" id={str(snapshot_id)[-8:]}"
try:
from archivebox.core.models import Snapshot
snap = Snapshot.objects.filter(id=snapshot_id).first()
if snap and snap.url:
suffix += f" url={self._abbrev_url(snap.url, max_len=48)}"
except Exception:
pass
return 'snapshot', suffix
return f"worker:{worker_type}", ''
@staticmethod
def _extract_arg(cmd: list[str], key: str) -> str | None:
for i, part in enumerate(cmd):
if part.startswith(f'{key}='):
return part.split('=', 1)[1]
if part == key and i + 1 < len(cmd):
return cmd[i + 1]
return None
def _abbrev_urls(self, urls: list[str], max_len: int = 48) -> str:
if not urls:
return ''
if len(urls) == 1:
return self._abbrev_url(urls[0], max_len=max_len)
first = self._abbrev_url(urls[0], max_len=max_len)
return f"{first},+{len(urls) - 1}"
def _extract_url(self) -> str:
url = getattr(self.process, 'url', None)
if url:
return str(url)
cmd = getattr(self.process, 'cmd', []) or []
for i, part in enumerate(cmd):
if part.startswith('--url='):
return part.split('=', 1)[1].strip()
if part == '--url' and i + 1 < len(cmd):
return str(cmd[i + 1]).strip()
return ''
def _abbrev_url(self, url: str, max_len: int = 48) -> str:
if not url:
return ''
if len(url) <= max_len:
return url
return f"{url[:max_len - 3]}..."
def _chrome_launch_line(self, stderr_lines: list[str], stdout_lines: list[str]) -> str:
try:
cmd = getattr(self.process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
if 'chrome_launch' not in hook_name:
return ''
pid = ''
ws = ''
for line in stderr_lines + stdout_lines:
if not ws and 'CDP URL:' in line:
ws = line.split('CDP URL:', 1)[1].strip()
if not pid and 'PID:' in line:
pid = line.split('PID:', 1)[1].strip()
if pid and ws:
return f"Chrome pid={pid} {ws}"
if ws:
return f"Chrome {ws}"
if pid:
return f"Chrome pid={pid}"
try:
from archivebox import DATA_DIR
base = Path(DATA_DIR)
pwd = getattr(self.process, 'pwd', None)
if pwd:
chrome_dir = Path(pwd)
if not chrome_dir.is_absolute():
chrome_dir = (base / chrome_dir).resolve()
cdp_file = chrome_dir / 'cdp_url.txt'
pid_file = chrome_dir / 'chrome.pid'
if cdp_file.exists():
ws = cdp_file.read_text().strip()
if pid_file.exists():
pid = pid_file.read_text().strip()
if pid and ws:
return f"Chrome pid={pid} {ws}"
if ws:
return f"Chrome {ws}"
if pid:
return f"Chrome pid={pid}"
except Exception:
pass
except Exception:
return ''
return ''
def _elapsed_suffix(self) -> str:
started_at = getattr(self.process, 'started_at', None)
timeout = getattr(self.process, 'timeout', None)
if not started_at or not timeout:
return ''
try:
now = datetime.now(timezone.utc) if started_at.tzinfo else datetime.now()
elapsed = int((now - started_at).total_seconds())
elapsed = max(elapsed, 0)
return f" [{elapsed}/{int(timeout)}s]"
except Exception:
return ''
def _output_line(self) -> str:
pwd = getattr(self.process, 'pwd', None)
if not pwd:
return ''
try:
from archivebox import DATA_DIR
rel = Path(pwd)
base = Path(DATA_DIR)
if rel.is_absolute():
try:
rel = rel.relative_to(base)
except Exception:
pass
rel_str = f"./{rel}" if not str(rel).startswith("./") else str(rel)
return f"{rel_str}"
except Exception:
return f"{pwd}"
class WorkerLogPanel:
"""Display worker logs by tailing stdout/stderr from Process."""
def __init__(self, title: str, empty_message: str, running_message: str, max_lines: int = 8):
self.title = title
self.empty_message = empty_message
self.running_message = running_message
self.log_lines: deque = deque(maxlen=max_lines * 2) # Allow more buffer
self.max_lines = max_lines
self.last_stdout_pos = 0 # Track file position for efficient tailing
self.last_stderr_pos = 0
self.last_process_running = False
def update_from_process(self, process: Any):
"""Update logs by tailing the Process stdout/stderr files."""
if not process:
self.last_process_running = False
return
# Use Process tail helpers for consistency
try:
self.last_process_running = bool(getattr(process, 'is_running', False))
stdout_lines = list(process.tail_stdout(lines=self.max_lines, follow=False))
stderr_lines = list(process.tail_stderr(lines=self.max_lines, follow=False))
except Exception:
return
self.log_lines.clear()
# Preserve ordering by showing stdout then stderr
for line in stdout_lines:
if line:
self.log_lines.append(('stdout', line))
for line in stderr_lines:
if line:
self.log_lines.append(('stderr', line))
def __rich__(self) -> Panel:
if not self.log_lines:
message = self.running_message if self.last_process_running else self.empty_message
content = Text(message, style="grey53", justify="center")
else:
# Get the last max_lines for display
display_lines = list(self.log_lines)[-self.max_lines:]
lines = []
for stream, message in display_lines:
line = Text()
# Color code by stream - stderr is usually debug output
if stream == 'stderr':
# Rich formatted logs from stderr
line.append(message, style="cyan")
else:
line.append(message, style="white")
lines.append(line)
content = Group(*lines)
return Panel(
content,
title=f"[bold cyan]{self.title}",
border_style="cyan",
box=box.HORIZONTALS,
)
class CrawlQueueTreePanel:
"""Display crawl queue with snapshots + hook summary in a tree view."""
def __init__(self, max_crawls: int = 8, max_snapshots: int = 16):
self.crawls: list[dict[str, Any]] = []
self.max_crawls = max_crawls
self.max_snapshots = max_snapshots
def update_crawls(self, crawls: list[dict[str, Any]]) -> None:
"""Update crawl tree data."""
self.crawls = crawls[:self.max_crawls]
def __rich__(self) -> Panel:
if not self.crawls:
content = Text("No active crawls", style="grey53", justify="center")
else:
trees = []
for crawl in self.crawls:
crawl_status = crawl.get('status', '')
crawl_label = crawl.get('label', '')
crawl_id = crawl.get('id', '')[:8]
crawl_text = Text(f"{self._status_icon(crawl_status)} {crawl_id} {crawl_label}", style="white")
crawl_tree = Tree(crawl_text, guide_style="grey53")
snapshots = crawl.get('snapshots', [])[:self.max_snapshots]
for snap in snapshots:
snap_status = snap.get('status', '')
snap_label = snap.get('label', '')
snap_text = Text(f"{self._status_icon(snap_status)} {snap_label}", style="white")
snap_node = crawl_tree.add(snap_text)
output_path = snap.get('output_path', '')
if output_path:
snap_node.add(Text(output_path, style="grey53"))
hooks = snap.get('hooks', []) or []
for hook in hooks:
status = hook.get('status', '')
path = hook.get('path', '')
size = hook.get('size', '')
elapsed = hook.get('elapsed', '')
timeout = hook.get('timeout', '')
is_bg = hook.get('is_bg', False)
is_running = hook.get('is_running', False)
is_pending = hook.get('is_pending', False)
icon, color = self._hook_style(status, is_bg=is_bg, is_running=is_running, is_pending=is_pending)
stats = self._hook_stats(size=size, elapsed=elapsed, timeout=timeout, status=status)
line = Text(f"{icon} {path}{stats}", style=color)
stderr_tail = hook.get('stderr', '')
if stderr_tail:
left_str = f"{icon} {path}{stats}"
avail = self._available_width(left_str, indent=16)
trunc = getattr(self, "_truncate_tail", self._truncate_to_width)
stderr_tail = trunc(stderr_tail, avail)
if not stderr_tail:
snap_node.add(line)
continue
row = Table.grid(expand=True)
row.add_column(justify="left", ratio=1)
row.add_column(justify="right")
row.add_row(line, Text(stderr_tail, style="grey70"))
snap_node.add(row)
else:
snap_node.add(line)
trees.append(crawl_tree)
content = Group(*trees)
return Panel(
content,
title="[bold white]Crawl Queue",
border_style="white",
box=box.HORIZONTALS,
)
@staticmethod
def _status_icon(status: str) -> str:
if status in ('queued', 'pending'):
return ''
if status in ('started', 'running'):
return ''
if status in ('sealed', 'done', 'completed'):
return ''
if status in ('failed', 'error'):
return ''
return ''
@staticmethod
def _hook_style(status: str, is_bg: bool = False, is_running: bool = False, is_pending: bool = False) -> tuple[str, str]:
if status == 'succeeded':
return '', 'green'
if status == 'failed':
return '', 'red'
if status == 'skipped':
return '', 'grey53'
if is_pending:
return '⌛️', 'grey53'
if is_running and is_bg:
return '', 'cyan'
if is_running:
return '▶️', 'cyan'
if status == 'started':
return '▶️', 'cyan'
return '', 'grey53'
@staticmethod
def _hook_stats(size: str = '', elapsed: str = '', timeout: str = '', status: str = '') -> str:
if status in ('succeeded', 'failed', 'skipped'):
parts = []
if size:
parts.append(size)
if elapsed:
parts.append(elapsed)
if not parts:
return ''
return f" ({' | '.join(parts)})"
if elapsed or timeout:
size_part = '...' if elapsed or timeout else ''
time_part = ''
if elapsed and timeout:
time_part = f"{elapsed}/{timeout}"
elif elapsed:
time_part = f"{elapsed}"
return f" ({size_part} | {time_part})" if time_part else f" ({size_part})"
return ''
@staticmethod
def _terminal_width() -> int:
try:
return os.get_terminal_size().columns
except OSError:
return 120
@staticmethod
def _truncate_to_width(text: str, max_width: int) -> str:
if not text or max_width <= 0:
return ''
t = Text(text)
t.truncate(max_width, overflow="ellipsis")
return t.plain
@staticmethod
def _truncate_tail(text: str, max_width: int) -> str:
if not text or max_width <= 0:
return ''
if cell_len(text) <= max_width:
return text
if max_width <= 1:
return ''
return f"{text[-(max_width - 1):]}"
def _available_width(self, left_text: str, indent: int = 0) -> int:
width = self._terminal_width()
base = max(0, width - cell_len(left_text) - indent - 6)
cap = max(0, (width * 2) // 5)
return max(0, min(base, cap))
class ArchiveBoxProgressLayout:
"""
Main layout manager for ArchiveBox orchestrator progress display.
Layout structure:
┌─────────────────────────────────────────────────────────────┐
│ Crawl Queue (full width) │
├─────────────────────────────────────────────────────────────┤
│ Crawl Queue Tree (hooks + outputs) │
├─────────────────────────────────────────────────────────────┤
│ Running Process Logs (dynamic panels) │
└─────────────────────────────────────────────────────────────┘
"""
def __init__(self, crawl_id: Optional[str] = None):
self.crawl_id = crawl_id
self.start_time = datetime.now(timezone.utc)
# Create components
self.crawl_queue = CrawlQueuePanel()
self.crawl_queue.crawl_id = crawl_id
self.process_panels: List[ProcessLogPanel] = []
self.crawl_queue_tree = CrawlQueueTreePanel(max_crawls=8, max_snapshots=16)
# Create layout
self.layout = self._make_layout()
def _make_layout(self) -> Layout:
"""Define the layout structure."""
layout = Layout(name="root")
# Top-level split: crawl_queue, crawl_tree, processes
layout.split(
Layout(name="crawl_queue", size=3),
Layout(name="crawl_tree", size=20),
Layout(name="processes", ratio=1),
)
# Assign components to layout sections
layout["crawl_queue"].update(self.crawl_queue)
layout["crawl_tree"].update(self.crawl_queue_tree)
layout["processes"].update(Columns([]))
return layout
def update_orchestrator_status(
self,
status: str,
crawl_queue_count: int = 0,
crawl_workers_count: int = 0,
binary_queue_count: int = 0,
binary_workers_count: int = 0,
max_crawl_workers: int = 8,
):
"""Update orchestrator status in the crawl queue panel."""
self.crawl_queue.orchestrator_status = status
self.crawl_queue.crawl_queue_count = crawl_queue_count
self.crawl_queue.crawl_workers_count = crawl_workers_count
self.crawl_queue.binary_queue_count = binary_queue_count
self.crawl_queue.binary_workers_count = binary_workers_count
self.crawl_queue.max_crawl_workers = max_crawl_workers
def update_process_panels(self, processes: List[Any], pending: Optional[List[Any]] = None) -> None:
"""Update process panels to show all running processes."""
panels = []
all_processes = list(processes) + list(pending or [])
fg_running = False
for process in processes:
if getattr(process, 'process_type', '') != 'hook':
continue
try:
cmd = getattr(process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
if '.bg.' in hook_name:
continue
if '.bg.' not in hook_name:
fg_running = True
break
except Exception:
continue
fg_pending = False
for process in (pending or []):
if getattr(process, 'process_type', '') != 'hook':
continue
try:
cmd = getattr(process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
if '.bg.' in hook_name:
continue
if '.bg.' not in hook_name:
fg_pending = True
break
except Exception:
continue
bg_terminating = bool(processes) and not fg_running and not fg_pending
for process in all_processes:
is_hook = getattr(process, 'process_type', '') == 'hook'
is_bg = False
if is_hook:
try:
cmd = getattr(process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
is_bg = '.bg.' in hook_name
except Exception:
is_bg = False
if is_hook and is_bg:
continue
if not self._has_log_lines(process):
continue
is_pending = getattr(process, 'status', '') in ('queued', 'pending', 'backoff') or (is_hook and not getattr(process, 'pid', None))
max_lines = 2 if is_pending else (4 if is_bg else 7)
panels.append(ProcessLogPanel(process, max_lines=max_lines, compact=is_bg, bg_terminating=bg_terminating))
if not panels:
self.layout["processes"].size = 0
self.layout["processes"].update(Text(""))
self.process_panels = []
return
self.process_panels = panels
self.layout["processes"].size = None
self.layout["processes"].ratio = 1
self.layout["processes"].update(Columns(panels, equal=True, expand=True))
def update_crawl_tree(self, crawls: list[dict[str, Any]]) -> None:
"""Update the crawl queue tree panel."""
self.crawl_queue_tree.update_crawls(crawls)
# Auto-size crawl tree panel to content
line_count = 0
for crawl in crawls:
line_count += 1
for snap in crawl.get('snapshots', []) or []:
line_count += 1
if snap.get('output_path'):
line_count += 1
for _ in snap.get('hooks', []) or []:
line_count += 1
self.layout["crawl_tree"].size = max(4, line_count + 2)
def log_event(self, message: str, style: str = "white") -> None:
"""Add an event to the orchestrator log."""
return
def get_layout(self) -> Layout:
"""Get the Rich Layout object for rendering."""
return self.layout
def plain_lines(self) -> list[tuple[str, str]]:
lines: list[tuple[str, str]] = []
queue = self.crawl_queue
queue_line = (
f"Status: {queue.orchestrator_status} | Crawls: {queue.crawl_queue_count} queued | "
f"Binaries: {queue.binary_queue_count} queued | Workers: {queue.crawl_workers_count}/{queue.max_crawl_workers} "
f"crawl, {queue.binary_workers_count} binary"
)
lines.append(("crawl_queue", queue_line))
for panel in self.process_panels:
title = _strip_rich(panel._title())
for line in panel.plain_lines():
if line:
lines.append((title or "process", line))
for crawl in self.crawl_queue_tree.crawls:
crawl_line = f"{self.crawl_queue_tree._status_icon(crawl.get('status', ''))} {crawl.get('id', '')[:8]} {crawl.get('label', '')}".strip()
lines.append(("crawl_tree", crawl_line))
for snap in crawl.get('snapshots', []):
snap_line = f" {self.crawl_queue_tree._status_icon(snap.get('status', ''))} {snap.get('label', '')}".rstrip()
lines.append(("crawl_tree", snap_line))
output_path = snap.get('output_path', '')
if output_path:
lines.append(("crawl_tree", f" {output_path}"))
for hook in snap.get('hooks', []) or []:
status = hook.get('status', '')
path = hook.get('path', '')
icon, _ = self.crawl_queue_tree._hook_style(
status,
is_bg=hook.get('is_bg', False),
is_running=hook.get('is_running', False),
is_pending=hook.get('is_pending', False),
)
stats = self.crawl_queue_tree._hook_stats(
size=hook.get('size', ''),
elapsed=hook.get('elapsed', ''),
timeout=hook.get('timeout', ''),
status=status,
)
stderr_tail = hook.get('stderr', '')
hook_line = f" {icon} {path}{stats}".strip()
if stderr_tail:
avail = self.crawl_queue_tree._available_width(hook_line, indent=16)
trunc = getattr(self.crawl_queue_tree, "_truncate_tail", self.crawl_queue_tree._truncate_to_width)
stderr_tail = trunc(stderr_tail, avail)
if stderr_tail:
hook_line = f"{hook_line} {stderr_tail}"
if hook_line:
lines.append(("crawl_tree", hook_line))
return lines
@staticmethod
def _has_log_lines(process: Any) -> bool:
try:
stdout_lines = list(process.tail_stdout(lines=1, follow=False))
if any(line.strip() for line in stdout_lines):
return True
stderr_lines = list(process.tail_stderr(lines=1, follow=False))
if any(line.strip() for line in stderr_lines):
return True
except Exception:
return False
return False

View File

@@ -0,0 +1,22 @@
from .archive_result_service import ArchiveResultService
from .binary_service import BinaryService
from .crawl_service import CrawlService
from .machine_service import MachineService
from .process_service import ProcessService
from .runner import run_binary, run_crawl, run_install, run_pending_crawls
from .snapshot_service import SnapshotService
from .tag_service import TagService
__all__ = [
"ArchiveResultService",
"BinaryService",
"CrawlService",
"MachineService",
"ProcessService",
"SnapshotService",
"TagService",
"run_binary",
"run_crawl",
"run_install",
"run_pending_crawls",
]

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
import mimetypes
from collections import defaultdict
from pathlib import Path
from asgiref.sync import sync_to_async
from django.utils import timezone
from abx_dl.events import ArchiveResultEvent
from abx_dl.services.base import BaseService
from .process_service import ProcessService, parse_event_datetime
def _collect_output_metadata(plugin_dir: Path) -> tuple[dict[str, dict], int, str]:
exclude_names = {"stdout.log", "stderr.log", "process.pid", "hook.pid", "listener.pid", "cmd.sh"}
output_files: dict[str, dict] = {}
mime_sizes: dict[str, int] = defaultdict(int)
total_size = 0
if not plugin_dir.exists():
return output_files, total_size, ""
for file_path in plugin_dir.rglob("*"):
if not file_path.is_file():
continue
if ".hooks" in file_path.parts:
continue
if file_path.name in exclude_names:
continue
try:
stat = file_path.stat()
except OSError:
continue
mime_type, _ = mimetypes.guess_type(str(file_path))
mime_type = mime_type or "application/octet-stream"
relative_path = str(file_path.relative_to(plugin_dir))
output_files[relative_path] = {}
mime_sizes[mime_type] += stat.st_size
total_size += stat.st_size
output_mimetypes = ",".join(
mime for mime, _size in sorted(mime_sizes.items(), key=lambda item: item[1], reverse=True)
)
return output_files, total_size, output_mimetypes
def _normalize_status(status: str) -> str:
if status == "noresult":
return "skipped"
return status or "failed"
class ArchiveResultService(BaseService):
LISTENS_TO = [ArchiveResultEvent]
EMITS = []
def __init__(self, bus, *, process_service: ProcessService):
self.process_service = process_service
super().__init__(bus)
async def on_ArchiveResultEvent(self, event: ArchiveResultEvent) -> None:
await sync_to_async(self._project, thread_sensitive=True)(event)
def _project(self, event: ArchiveResultEvent) -> None:
from archivebox.core.models import ArchiveResult, Snapshot
from archivebox.machine.models import Process
snapshot = Snapshot.objects.filter(id=event.snapshot_id).first()
if snapshot is None:
return
process = None
db_process_id = self.process_service.get_db_process_id(event.process_id)
if db_process_id:
process = Process.objects.filter(id=db_process_id).first()
result, _created = ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin=event.plugin,
hook_name=event.hook_name,
defaults={
"status": ArchiveResult.StatusChoices.STARTED,
"process": process,
},
)
plugin_dir = Path(snapshot.output_dir) / event.plugin
output_files, output_size, output_mimetypes = _collect_output_metadata(plugin_dir)
result.process = process or result.process
result.status = _normalize_status(event.status)
result.output_str = event.output_str
result.output_json = event.output_json
result.output_files = output_files
result.output_size = output_size
result.output_mimetypes = output_mimetypes
result.start_ts = parse_event_datetime(event.start_ts) or result.start_ts or timezone.now()
result.end_ts = parse_event_datetime(event.end_ts) or timezone.now()
result.retry_at = None
if event.error:
result.notes = event.error
result.save()

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
from asgiref.sync import sync_to_async
from abx_dl.events import BinaryEvent, BinaryInstalledEvent
from abx_dl.services.base import BaseService
class BinaryService(BaseService):
LISTENS_TO = [BinaryEvent, BinaryInstalledEvent]
EMITS = []
async def on_BinaryEvent(self, event: BinaryEvent) -> None:
await sync_to_async(self._project_binary, thread_sensitive=True)(event)
async def on_BinaryInstalledEvent(self, event: BinaryInstalledEvent) -> None:
await sync_to_async(self._project_installed_binary, thread_sensitive=True)(event)
def _project_binary(self, event: BinaryEvent) -> None:
from archivebox.machine.models import Binary, Machine
machine = Machine.current()
existing = Binary.objects.filter(machine=machine, name=event.name).first()
if existing and existing.status == Binary.StatusChoices.INSTALLED:
changed = False
if event.binproviders and existing.binproviders != event.binproviders:
existing.binproviders = event.binproviders
changed = True
if event.overrides and existing.overrides != event.overrides:
existing.overrides = event.overrides
changed = True
if changed:
existing.save(update_fields=["binproviders", "overrides", "modified_at"])
return
Binary.from_json(
{
"name": event.name,
"abspath": event.abspath,
"version": event.version,
"sha256": event.sha256,
"binproviders": event.binproviders,
"binprovider": event.binprovider,
"overrides": event.overrides or {},
},
)
def _project_installed_binary(self, event: BinaryInstalledEvent) -> None:
from archivebox.machine.models import Binary, Machine
machine = Machine.current()
binary, _ = Binary.objects.get_or_create(
machine=machine,
name=event.name,
defaults={
"status": Binary.StatusChoices.QUEUED,
},
)
binary.abspath = event.abspath or binary.abspath
binary.version = event.version or binary.version
binary.sha256 = event.sha256 or binary.sha256
binary.binprovider = event.binprovider or binary.binprovider
binary.status = Binary.StatusChoices.INSTALLED
binary.retry_at = None
binary.save(update_fields=["abspath", "version", "sha256", "binprovider", "status", "retry_at", "modified_at"])

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
from asgiref.sync import sync_to_async
from django.utils import timezone
from abx_dl.events import CrawlCleanupEvent, CrawlCompletedEvent, CrawlSetupEvent, CrawlStartEvent
from abx_dl.services.base import BaseService
class CrawlService(BaseService):
LISTENS_TO = [CrawlSetupEvent, CrawlStartEvent, CrawlCleanupEvent, CrawlCompletedEvent]
EMITS = []
def __init__(self, bus, *, crawl_id: str):
self.crawl_id = crawl_id
super().__init__(bus)
async def on_CrawlSetupEvent(self, event: CrawlSetupEvent) -> None:
await sync_to_async(self._mark_started, thread_sensitive=True)()
async def on_CrawlStartEvent(self, event: CrawlStartEvent) -> None:
await sync_to_async(self._mark_started, thread_sensitive=True)()
async def on_CrawlCleanupEvent(self, event: CrawlCleanupEvent) -> None:
await sync_to_async(self._mark_started, thread_sensitive=True)()
async def on_CrawlCompletedEvent(self, event: CrawlCompletedEvent) -> None:
await sync_to_async(self._mark_completed, thread_sensitive=True)()
def _mark_started(self) -> None:
from archivebox.crawls.models import Crawl
crawl = Crawl.objects.get(id=self.crawl_id)
if crawl.status != Crawl.StatusChoices.SEALED:
crawl.status = Crawl.StatusChoices.STARTED
crawl.retry_at = None
crawl.save(update_fields=["status", "retry_at", "modified_at"])
def _mark_completed(self) -> None:
from archivebox.crawls.models import Crawl
crawl = Crawl.objects.get(id=self.crawl_id)
crawl.status = Crawl.StatusChoices.SEALED
crawl.retry_at = None
crawl.save(update_fields=["status", "retry_at", "modified_at"])

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from asgiref.sync import sync_to_async
from abx_dl.events import MachineEvent
from abx_dl.services.base import BaseService
class MachineService(BaseService):
LISTENS_TO = [MachineEvent]
EMITS = []
async def on_MachineEvent(self, event: MachineEvent) -> None:
await sync_to_async(self._project, thread_sensitive=True)(event)
def _project(self, event: MachineEvent) -> None:
from archivebox.machine.models import Machine
machine = Machine.current()
config = dict(machine.config or {})
if event.config is not None:
config.update(event.config)
elif event.method == "update":
key = event.key.replace("config/", "", 1).strip()
if key:
config[key] = event.value
else:
return
machine.config = config
machine.save(update_fields=["config", "modified_at"])

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from asgiref.sync import sync_to_async
from django.utils import timezone
from abx_dl.events import ProcessCompletedEvent, ProcessStartedEvent
from abx_dl.services.base import BaseService
if TYPE_CHECKING:
from archivebox.machine.models import Process
def parse_event_datetime(value: str | None):
if not value:
return None
try:
dt = datetime.fromisoformat(value)
except ValueError:
return None
if timezone.is_naive(dt):
return timezone.make_aware(dt, timezone.get_current_timezone())
return dt
class ProcessService(BaseService):
LISTENS_TO = [ProcessStartedEvent, ProcessCompletedEvent]
EMITS = []
def __init__(self, bus):
self.process_ids: dict[str, str] = {}
super().__init__(bus)
async def on_ProcessStartedEvent(self, event: ProcessStartedEvent) -> None:
await sync_to_async(self._project_started, thread_sensitive=True)(event)
async def on_ProcessCompletedEvent(self, event: ProcessCompletedEvent) -> None:
await sync_to_async(self._project_completed, thread_sensitive=True)(event)
def get_db_process_id(self, process_id: str) -> str | None:
return self.process_ids.get(process_id)
def _get_or_create_process(self, event: ProcessStartedEvent | ProcessCompletedEvent) -> "Process":
from archivebox.machine.models import Machine, Process
db_process_id = self.process_ids.get(event.process_id)
if db_process_id:
process = Process.objects.filter(id=db_process_id).first()
if process is not None:
return process
process_type = Process.TypeChoices.BINARY if event.hook_name.startswith("on_Binary") else Process.TypeChoices.HOOK
process = Process.objects.create(
machine=Machine.current(),
process_type=process_type,
pwd=event.output_dir,
cmd=[event.hook_path, *event.hook_args],
env=event.env,
timeout=getattr(event, "timeout", 60),
pid=event.pid or None,
started_at=parse_event_datetime(getattr(event, "start_ts", "")),
status=Process.StatusChoices.RUNNING,
retry_at=None,
)
self.process_ids[event.process_id] = str(process.id)
return process
def _project_started(self, event: ProcessStartedEvent) -> None:
process = self._get_or_create_process(event)
process.pwd = event.output_dir
process.cmd = [event.hook_path, *event.hook_args]
process.env = event.env
process.timeout = event.timeout
process.pid = event.pid or None
process.started_at = parse_event_datetime(event.start_ts) or process.started_at or timezone.now()
process.status = process.StatusChoices.RUNNING
process.retry_at = None
process.save()
def _project_completed(self, event: ProcessCompletedEvent) -> None:
process = self._get_or_create_process(event)
process.pwd = event.output_dir
process.cmd = [event.hook_path, *event.hook_args]
process.env = event.env
process.pid = event.pid or process.pid
process.started_at = parse_event_datetime(event.start_ts) or process.started_at
process.ended_at = parse_event_datetime(event.end_ts) or timezone.now()
process.stdout = event.stdout
process.stderr = event.stderr
process.exit_code = event.exit_code
process.status = process.StatusChoices.EXITED
process.retry_at = None
process.save()

View File

@@ -0,0 +1,454 @@
from __future__ import annotations
import asyncio
import json
import os
import sys
import time
from pathlib import Path
from typing import Any
from django.utils import timezone
from abx_dl.events import BinaryEvent
from abx_dl.models import INSTALL_URL, Snapshot as AbxSnapshot, discover_plugins
from abx_dl.orchestrator import create_bus, download, install_plugins as abx_install_plugins, setup_services as setup_abx_services
from .archive_result_service import ArchiveResultService
from .binary_service import BinaryService
from .crawl_service import CrawlService
from .machine_service import MachineService
from .process_service import ProcessService
from .snapshot_service import SnapshotService
from .tag_service import TagService
def _bus_name(prefix: str, identifier: str) -> str:
normalized = "".join(ch if ch.isalnum() else "_" for ch in identifier)
return f"{prefix}_{normalized}"
def _selected_plugins_from_config(config: dict[str, Any]) -> list[str] | None:
raw = str(config.get("PLUGINS") or "").strip()
if not raw:
return None
return [name.strip() for name in raw.split(",") if name.strip()]
def _attach_bus_trace(bus) -> None:
trace_target = (os.environ.get("ARCHIVEBOX_BUS_TRACE") or "").strip()
if not trace_target:
return
if getattr(bus, "_archivebox_trace_task", None) is not None:
return
trace_path = None if trace_target in {"1", "-", "stderr"} else Path(trace_target)
stop_event = asyncio.Event()
async def trace_loop() -> None:
seen_event_ids: set[str] = set()
while not stop_event.is_set():
for event_id, event in list(bus.event_history.items()):
if event_id in seen_event_ids:
continue
seen_event_ids.add(event_id)
payload = event.model_dump(mode="json")
payload["bus_name"] = bus.name
line = json.dumps(payload, ensure_ascii=False, default=str, separators=(",", ":"))
if trace_path is None:
print(line, file=sys.stderr, flush=True)
else:
trace_path.parent.mkdir(parents=True, exist_ok=True)
with trace_path.open("a", encoding="utf-8") as handle:
handle.write(line + "\n")
await asyncio.sleep(0.05)
bus._archivebox_trace_stop = stop_event
bus._archivebox_trace_task = asyncio.create_task(trace_loop())
async def _stop_bus_trace(bus) -> None:
stop_event = getattr(bus, "_archivebox_trace_stop", None)
trace_task = getattr(bus, "_archivebox_trace_task", None)
if stop_event is None or trace_task is None:
return
stop_event.set()
await asyncio.gather(trace_task, return_exceptions=True)
bus._archivebox_trace_stop = None
bus._archivebox_trace_task = None
class CrawlRunner:
MAX_CONCURRENT_SNAPSHOTS = 8
def __init__(self, crawl, *, snapshot_ids: list[str] | None = None, selected_plugins: list[str] | None = None):
self.crawl = crawl
self.bus = create_bus(name=_bus_name("ArchiveBox", str(crawl.id)), total_timeout=3600.0)
self.plugins = discover_plugins()
self.process_service = ProcessService(self.bus)
self.machine_service = MachineService(self.bus)
self.binary_service = BinaryService(self.bus)
self.tag_service = TagService(self.bus)
self.crawl_service = CrawlService(self.bus, crawl_id=str(crawl.id))
self.snapshot_service = SnapshotService(self.bus, crawl_id=str(crawl.id), schedule_snapshot=self.enqueue_snapshot)
self.archive_result_service = ArchiveResultService(self.bus, process_service=self.process_service)
self.selected_plugins = selected_plugins
self.initial_snapshot_ids = snapshot_ids
self.snapshot_tasks: dict[str, asyncio.Task[None]] = {}
self.snapshot_semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_SNAPSHOTS)
self.abx_services = None
self.persona = None
self.base_config: dict[str, Any] = {}
self.primary_url = ""
async def run(self) -> None:
from asgiref.sync import sync_to_async
from archivebox.crawls.models import Crawl
try:
await sync_to_async(self._prepare, thread_sensitive=True)()
_attach_bus_trace(self.bus)
self.abx_services = setup_abx_services(
self.bus,
plugins=self.plugins,
config_overrides=self.base_config,
auto_install=True,
emit_jsonl=False,
)
if self.crawl.get_system_task() == INSTALL_URL:
await self._run_install_crawl()
else:
snapshot_ids = await sync_to_async(self._initial_snapshot_ids, thread_sensitive=True)()
if snapshot_ids:
root_snapshot_id = snapshot_ids[0]
await self._run_crawl_setup(root_snapshot_id)
for snapshot_id in snapshot_ids:
await self.enqueue_snapshot(snapshot_id)
await self._wait_for_snapshot_tasks()
await self._run_crawl_cleanup(root_snapshot_id)
if self.abx_services is not None:
await self.abx_services.process.wait_for_background_monitors()
finally:
await _stop_bus_trace(self.bus)
await self.bus.stop()
await sync_to_async(self._cleanup_persona, thread_sensitive=True)()
crawl = await sync_to_async(Crawl.objects.get, thread_sensitive=True)(id=self.crawl.id)
if crawl.status != Crawl.StatusChoices.SEALED:
crawl.status = Crawl.StatusChoices.SEALED
crawl.retry_at = None
await sync_to_async(crawl.save, thread_sensitive=True)(update_fields=["status", "retry_at", "modified_at"])
async def enqueue_snapshot(self, snapshot_id: str) -> None:
task = self.snapshot_tasks.get(snapshot_id)
if task is not None and not task.done():
return
task = asyncio.create_task(self._run_snapshot(snapshot_id))
self.snapshot_tasks[snapshot_id] = task
async def _wait_for_snapshot_tasks(self) -> None:
while True:
active = [task for task in self.snapshot_tasks.values() if not task.done()]
if not active:
return
await asyncio.gather(*active)
def _prepare(self) -> None:
from archivebox.config.configset import get_config
self.primary_url = self.crawl.get_urls_list()[0] if self.crawl.get_urls_list() else ""
self.persona = self.crawl.resolve_persona()
self.base_config = get_config(crawl=self.crawl)
if self.selected_plugins is None:
self.selected_plugins = _selected_plugins_from_config(self.base_config)
if self.persona:
chrome_binary = str(self.base_config.get("CHROME_BINARY") or "")
self.base_config.update(self.persona.prepare_runtime_for_crawl(self.crawl, chrome_binary=chrome_binary))
def _cleanup_persona(self) -> None:
if self.persona:
self.persona.cleanup_runtime_for_crawl(self.crawl)
def _create_root_snapshots(self) -> list[str]:
created = self.crawl.create_snapshots_from_urls()
snapshots = created or list(self.crawl.snapshot_set.filter(depth=0).order_by("created_at"))
return [str(snapshot.id) for snapshot in snapshots]
def _initial_snapshot_ids(self) -> list[str]:
if self.initial_snapshot_ids:
return [str(snapshot_id) for snapshot_id in self.initial_snapshot_ids]
return self._create_root_snapshots()
def _snapshot_config(self, snapshot) -> dict[str, Any]:
from archivebox.config.configset import get_config
config = get_config(crawl=self.crawl, snapshot=snapshot)
config.update(self.base_config)
config["CRAWL_DIR"] = str(self.crawl.output_dir)
config["SNAP_DIR"] = str(snapshot.output_dir)
config["SNAPSHOT_ID"] = str(snapshot.id)
config["SNAPSHOT_DEPTH"] = snapshot.depth
config["CRAWL_ID"] = str(self.crawl.id)
config["SOURCE_URL"] = snapshot.url
if snapshot.parent_snapshot_id:
config["PARENT_SNAPSHOT_ID"] = str(snapshot.parent_snapshot_id)
return config
async def _run_install_crawl(self) -> None:
install_snapshot = AbxSnapshot(
url=self.primary_url or INSTALL_URL,
id=str(self.crawl.id),
crawl_id=str(self.crawl.id),
)
await download(
url=self.primary_url or INSTALL_URL,
plugins=self.plugins,
output_dir=Path(self.crawl.output_dir),
selected_plugins=self.selected_plugins,
config_overrides={
**self.base_config,
"CRAWL_DIR": str(self.crawl.output_dir),
"SNAP_DIR": str(self.crawl.output_dir),
"CRAWL_ID": str(self.crawl.id),
"SOURCE_URL": self.crawl.urls,
},
bus=self.bus,
emit_jsonl=False,
snapshot=install_snapshot,
crawl_only=True,
)
async def _run_crawl_setup(self, snapshot_id: str) -> None:
from asgiref.sync import sync_to_async
snapshot = await sync_to_async(self._load_snapshot_run_data, thread_sensitive=True)(snapshot_id)
setup_snapshot = AbxSnapshot(
url=snapshot["url"],
id=snapshot["id"],
title=snapshot["title"],
timestamp=snapshot["timestamp"],
bookmarked_at=snapshot["bookmarked_at"],
created_at=snapshot["created_at"],
tags=snapshot["tags"],
depth=snapshot["depth"],
parent_snapshot_id=snapshot["parent_snapshot_id"],
crawl_id=str(self.crawl.id),
)
await download(
url=snapshot["url"],
plugins=self.plugins,
output_dir=Path(snapshot["output_dir"]),
selected_plugins=self.selected_plugins,
config_overrides=snapshot["config"],
bus=self.bus,
emit_jsonl=False,
snapshot=setup_snapshot,
crawl_setup_only=True,
)
async def _run_crawl_cleanup(self, snapshot_id: str) -> None:
from asgiref.sync import sync_to_async
snapshot = await sync_to_async(self._load_snapshot_run_data, thread_sensitive=True)(snapshot_id)
cleanup_snapshot = AbxSnapshot(
url=snapshot["url"],
id=snapshot["id"],
title=snapshot["title"],
timestamp=snapshot["timestamp"],
bookmarked_at=snapshot["bookmarked_at"],
created_at=snapshot["created_at"],
tags=snapshot["tags"],
depth=snapshot["depth"],
parent_snapshot_id=snapshot["parent_snapshot_id"],
crawl_id=str(self.crawl.id),
)
await download(
url=snapshot["url"],
plugins=self.plugins,
output_dir=Path(snapshot["output_dir"]),
selected_plugins=self.selected_plugins,
config_overrides=snapshot["config"],
bus=self.bus,
emit_jsonl=False,
snapshot=cleanup_snapshot,
crawl_cleanup_only=True,
)
async def _run_snapshot(self, snapshot_id: str) -> None:
from asgiref.sync import sync_to_async
async with self.snapshot_semaphore:
snapshot = await sync_to_async(self._load_snapshot_run_data, thread_sensitive=True)(snapshot_id)
abx_snapshot = AbxSnapshot(
url=snapshot["url"],
id=snapshot["id"],
title=snapshot["title"],
timestamp=snapshot["timestamp"],
bookmarked_at=snapshot["bookmarked_at"],
created_at=snapshot["created_at"],
tags=snapshot["tags"],
depth=snapshot["depth"],
parent_snapshot_id=snapshot["parent_snapshot_id"],
crawl_id=str(self.crawl.id),
)
await download(
url=snapshot["url"],
plugins=self.plugins,
output_dir=Path(snapshot["output_dir"]),
selected_plugins=self.selected_plugins,
config_overrides=snapshot["config"],
bus=self.bus,
emit_jsonl=False,
snapshot=abx_snapshot,
skip_crawl_setup=True,
skip_crawl_cleanup=True,
)
def _load_snapshot_run_data(self, snapshot_id: str):
from archivebox.core.models import Snapshot
snapshot = Snapshot.objects.select_related("crawl").get(id=snapshot_id)
return {
"id": str(snapshot.id),
"url": snapshot.url,
"title": snapshot.title,
"timestamp": snapshot.timestamp,
"bookmarked_at": snapshot.bookmarked_at.isoformat() if snapshot.bookmarked_at else "",
"created_at": snapshot.created_at.isoformat() if snapshot.created_at else "",
"tags": snapshot.tags_str(),
"depth": snapshot.depth,
"parent_snapshot_id": str(snapshot.parent_snapshot_id) if snapshot.parent_snapshot_id else None,
"output_dir": str(snapshot.output_dir),
"config": self._snapshot_config(snapshot),
}
def run_crawl(crawl_id: str, *, snapshot_ids: list[str] | None = None, selected_plugins: list[str] | None = None) -> None:
from archivebox.crawls.models import Crawl
crawl = Crawl.objects.get(id=crawl_id)
asyncio.run(CrawlRunner(crawl, snapshot_ids=snapshot_ids, selected_plugins=selected_plugins).run())
async def _run_binary(binary_id: str) -> None:
from asgiref.sync import sync_to_async
from archivebox.config.configset import get_config
from archivebox.machine.models import Binary
binary = await sync_to_async(Binary.objects.get, thread_sensitive=True)(id=binary_id)
config = get_config()
plugins = discover_plugins()
bus = create_bus(name=_bus_name("ArchiveBox_binary", str(binary.id)), total_timeout=1800.0)
setup_abx_services(
bus,
plugins=plugins,
config_overrides=config,
auto_install=True,
emit_jsonl=False,
)
process_service = ProcessService(bus)
MachineService(bus)
BinaryService(bus)
TagService(bus)
ArchiveResultService(bus, process_service=process_service)
try:
_attach_bus_trace(bus)
await bus.emit(
BinaryEvent(
name=binary.name,
plugin_name="archivebox",
hook_name="archivebox_run",
output_dir=str(binary.output_dir),
binary_id=str(binary.id),
machine_id=str(binary.machine_id),
abspath=binary.abspath,
version=binary.version,
sha256=binary.sha256,
binproviders=binary.binproviders,
binprovider=binary.binprovider,
overrides=binary.overrides or None,
),
)
finally:
await _stop_bus_trace(bus)
await bus.stop()
def run_binary(binary_id: str) -> None:
asyncio.run(_run_binary(binary_id))
async def _run_install(plugin_names: list[str] | None = None) -> None:
from archivebox.config.configset import get_config
config = get_config()
plugins = discover_plugins()
bus = create_bus(name="ArchiveBox_install", total_timeout=3600.0)
abx_services = setup_abx_services(
bus,
plugins=plugins,
config_overrides=config,
auto_install=True,
emit_jsonl=False,
)
process_service = ProcessService(bus)
MachineService(bus)
BinaryService(bus)
TagService(bus)
ArchiveResultService(bus, process_service=process_service)
try:
_attach_bus_trace(bus)
await abx_install_plugins(
plugin_names=plugin_names,
plugins=plugins,
config_overrides=config,
emit_jsonl=False,
bus=bus,
)
await abx_services.process.wait_for_background_monitors()
finally:
await _stop_bus_trace(bus)
await bus.stop()
def run_install(*, plugin_names: list[str] | None = None) -> None:
asyncio.run(_run_install(plugin_names=plugin_names))
def run_pending_crawls(*, daemon: bool = False, crawl_id: str | None = None) -> int:
from archivebox.crawls.models import Crawl, CrawlSchedule
from archivebox.machine.models import Binary
while True:
if daemon and crawl_id is None:
now = timezone.now()
for schedule in CrawlSchedule.objects.filter(is_enabled=True).select_related("template", "template__created_by"):
if schedule.is_due(now):
schedule.enqueue(queued_at=now)
if crawl_id is None:
binary = (
Binary.objects.filter(retry_at__lte=timezone.now())
.exclude(status=Binary.StatusChoices.INSTALLED)
.order_by("retry_at", "created_at")
.first()
)
if binary is not None:
run_binary(str(binary.id))
continue
pending = Crawl.objects.filter(retry_at__lte=timezone.now()).exclude(status=Crawl.StatusChoices.SEALED)
if crawl_id:
pending = pending.filter(id=crawl_id)
pending = pending.order_by("retry_at", "created_at")
crawl = pending.first()
if crawl is None:
if daemon:
time.sleep(2.0)
continue
return 0
run_crawl(str(crawl.id))

View File

@@ -0,0 +1,128 @@
from __future__ import annotations
import re
from asgiref.sync import sync_to_async
from django.utils import timezone
from abx_dl.events import SnapshotCompletedEvent, SnapshotEvent
from abx_dl.services.base import BaseService
class SnapshotService(BaseService):
LISTENS_TO = [SnapshotEvent, SnapshotCompletedEvent]
EMITS = []
def __init__(self, bus, *, crawl_id: str, schedule_snapshot):
self.crawl_id = crawl_id
self.schedule_snapshot = schedule_snapshot
super().__init__(bus)
async def on_SnapshotEvent(self, event: SnapshotEvent) -> None:
snapshot_id = await sync_to_async(self._project_snapshot, thread_sensitive=True)(event)
if snapshot_id and event.depth > 0:
await self.schedule_snapshot(snapshot_id)
async def on_SnapshotCompletedEvent(self, event: SnapshotCompletedEvent) -> None:
await sync_to_async(self._seal_snapshot, thread_sensitive=True)(event.snapshot_id)
def _project_snapshot(self, event: SnapshotEvent) -> str | None:
from archivebox.core.models import Snapshot
from archivebox.crawls.models import Crawl
crawl = Crawl.objects.get(id=self.crawl_id)
if event.depth == 0:
snapshot = Snapshot.objects.filter(id=event.snapshot_id, crawl=crawl).first()
if snapshot is None:
return None
snapshot.status = Snapshot.StatusChoices.STARTED
snapshot.retry_at = None
snapshot.save(update_fields=["status", "retry_at", "modified_at"])
snapshot.ensure_crawl_symlink()
return str(snapshot.id)
if event.depth > crawl.max_depth:
return None
parent_snapshot = Snapshot.objects.filter(id=event.parent_snapshot_id, crawl=crawl).first()
if parent_snapshot is None:
return None
if not self._url_passes_filters(crawl, parent_snapshot, event.url):
return None
snapshot = Snapshot.from_json(
{
"url": event.url,
"depth": event.depth,
"parent_snapshot_id": str(parent_snapshot.id),
"crawl_id": str(crawl.id),
},
overrides={
"crawl": crawl,
"snapshot": parent_snapshot,
"created_by_id": crawl.created_by_id,
},
queue_for_extraction=False,
)
if snapshot is None:
return None
if snapshot.status == Snapshot.StatusChoices.SEALED:
return None
snapshot.retry_at = None
if snapshot.status != Snapshot.StatusChoices.SEALED:
snapshot.status = Snapshot.StatusChoices.QUEUED
snapshot.save(update_fields=["status", "retry_at", "modified_at"])
snapshot.ensure_crawl_symlink()
return str(snapshot.id)
def _url_passes_filters(self, crawl, parent_snapshot, url: str) -> bool:
from archivebox.config.configset import get_config
config = get_config(
user=getattr(crawl, "created_by", None),
crawl=crawl,
snapshot=parent_snapshot,
)
def to_pattern_list(value):
if isinstance(value, list):
return value
if isinstance(value, str):
return [pattern.strip() for pattern in value.split(",") if pattern.strip()]
return []
allowlist = to_pattern_list(config.get("URL_ALLOWLIST", ""))
denylist = to_pattern_list(config.get("URL_DENYLIST", ""))
for pattern in denylist:
try:
if re.search(pattern, url):
return False
except re.error:
continue
if allowlist:
for pattern in allowlist:
try:
if re.search(pattern, url):
return True
except re.error:
continue
return False
return True
def _seal_snapshot(self, snapshot_id: str) -> None:
from archivebox.core.models import Snapshot
snapshot = Snapshot.objects.filter(id=snapshot_id).first()
if snapshot is None:
return
snapshot.status = Snapshot.StatusChoices.SEALED
snapshot.retry_at = None
snapshot.downloaded_at = snapshot.downloaded_at or timezone.now()
snapshot.save(update_fields=["status", "retry_at", "downloaded_at", "modified_at"])
snapshot.write_index_jsonl()
snapshot.write_json_details()
snapshot.write_html_details()

View File

@@ -0,0 +1,21 @@
from __future__ import annotations
from asgiref.sync import sync_to_async
from abx_dl.events import TagEvent
from abx_dl.services.base import BaseService
class TagService(BaseService):
LISTENS_TO = [TagEvent]
EMITS = []
async def on_TagEvent(self, event: TagEvent) -> None:
await sync_to_async(self._project, thread_sensitive=True)(event)
def _project(self, event: TagEvent) -> None:
from archivebox.core.models import Snapshot, Tag
snapshot = Snapshot.objects.filter(id=event.snapshot_id).first()
if snapshot is None:
return
Tag.from_json({"name": event.name}, overrides={"snapshot": snapshot})

View File

@@ -581,12 +581,12 @@
<div class="header-left">
<div class="orchestrator-status">
<span class="status-dot stopped" id="orchestrator-dot"></span>
<span id="orchestrator-text">Stopped</span>
<span id="orchestrator-text">Runner stopped</span>
<span class="pid-label compact" id="orchestrator-pid" style="display:none;"></span>
</div>
<div class="stats">
<div class="stat">
<span class="stat-label">Workers</span>
<span class="stat-label">Processes</span>
<span class="stat-value info" id="worker-count">0</span>
</div>
<div class="stat">
@@ -804,7 +804,7 @@
// Queued and waiting to be picked up by worker
warningHtml = `
<div style="padding: 8px 14px; background: rgba(210, 153, 34, 0.1); border-top: 1px solid #d29922; color: #d29922; font-size: 11px;">
⏳ Waiting for worker to pick up...${crawl.urls_preview ? ` (${crawl.urls_preview})` : ''}
⏳ Waiting for the runner to pick up...${crawl.urls_preview ? ` (${crawl.urls_preview})` : ''}
</div>
`;
}

View File

@@ -30,7 +30,7 @@
{% else %}
<div id="in-progress" style="display: none;">
<center><h3>Creating crawl and queueing snapshots...</h3>
<p>Your crawl is being created. The orchestrator will process URLs and create snapshots in the background.</p>
<p>Your crawl is being created. The background runner will process URLs and create snapshots.</p>
<br/>
<div class="loader"></div>
<br/>

View File

@@ -1,133 +0,0 @@
import os
import signal
import sqlite3
import subprocess
import sys
import time
from pathlib import Path
def _run(cmd, data_dir: Path, env: dict, timeout: int = 120):
return subprocess.run(
cmd,
cwd=data_dir,
env=env,
capture_output=True,
text=True,
timeout=timeout,
)
def _make_env(data_dir: Path) -> dict:
env = os.environ.copy()
env["DATA_DIR"] = str(data_dir)
env["USE_COLOR"] = "False"
env["SHOW_PROGRESS"] = "False"
env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
env["PLUGINS"] = "favicon"
# Keep it fast but still real hooks
env["SAVE_FAVICON"] = "True"
env["SAVE_TITLE"] = "False"
env["SAVE_WGET"] = "False"
env["SAVE_WARC"] = "False"
env["SAVE_PDF"] = "False"
env["SAVE_SCREENSHOT"] = "False"
env["SAVE_DOM"] = "False"
env["SAVE_SINGLEFILE"] = "False"
env["SAVE_READABILITY"] = "False"
env["SAVE_MERCURY"] = "False"
env["SAVE_GIT"] = "False"
env["SAVE_YTDLP"] = "False"
env["SAVE_HEADERS"] = "False"
env["SAVE_HTMLTOTEXT"] = "False"
return env
def _count_running_processes(db_path: Path, where: str) -> int:
for _ in range(50):
try:
conn = sqlite3.connect(db_path, timeout=1)
cur = conn.cursor()
count = cur.execute(
f"SELECT COUNT(*) FROM machine_process WHERE status = 'running' AND {where}"
).fetchone()[0]
conn.close()
return count
except sqlite3.OperationalError:
time.sleep(0.1)
return 0
def _wait_for_count(db_path: Path, where: str, target: int, timeout: int = 20) -> bool:
start = time.time()
while time.time() - start < timeout:
if _count_running_processes(db_path, where) >= target:
return True
time.sleep(0.1)
return False
def test_add_parents_workers_to_orchestrator(tmp_path):
data_dir = tmp_path / "data"
data_dir.mkdir()
env = _make_env(data_dir)
init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
assert init.returncode == 0, init.stderr
add = _run([sys.executable, "-m", "archivebox", "add", "--plugins=favicon", "https://example.com"], data_dir, env, timeout=120)
assert add.returncode == 0, add.stderr
conn = sqlite3.connect(data_dir / "index.sqlite3")
cur = conn.cursor()
orchestrator = cur.execute(
"SELECT id FROM machine_process WHERE process_type = 'orchestrator' ORDER BY created_at DESC LIMIT 1"
).fetchone()
assert orchestrator is not None
orchestrator_id = orchestrator[0]
worker_count = cur.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'crawl' "
"AND parent_id = ?",
(orchestrator_id,),
).fetchone()[0]
conn.close()
assert worker_count >= 1, "Expected crawl worker to be parented to orchestrator"
def test_add_interrupt_cleans_orphaned_processes(tmp_path):
data_dir = tmp_path / "data"
data_dir.mkdir()
env = _make_env(data_dir)
init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
assert init.returncode == 0, init.stderr
proc = subprocess.Popen(
[sys.executable, "-m", "archivebox", "add", "--plugins=favicon", "https://example.com"],
cwd=data_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
db_path = data_dir / "index.sqlite3"
saw_worker = _wait_for_count(db_path, "process_type = 'worker'", 1, timeout=20)
assert saw_worker, "Expected at least one worker to start before interrupt"
proc.send_signal(signal.SIGINT)
proc.wait(timeout=30)
# Wait for workers/hooks to be cleaned up
start = time.time()
while time.time() - start < 30:
running = _count_running_processes(db_path, "process_type IN ('worker','hook')")
if running == 0:
break
time.sleep(0.2)
assert _count_running_processes(db_path, "process_type IN ('worker','hook')") == 0, (
"Expected no running worker/hook processes after interrupt"
)

View File

@@ -1,5 +1,5 @@
"""
Tests for JSONL piping contracts and `archivebox run` / `archivebox orchestrator`.
Tests for JSONL piping contracts and `archivebox run`.
This file covers both:
- low-level JSONL/stdin parsing behavior that makes CLI piping work
@@ -252,8 +252,8 @@ def test_snapshot_list_stdout_pipes_into_run(initialized_archive):
assert snapshot_status == "sealed"
def test_archiveresult_list_stdout_pipes_into_orchestrator_alias(initialized_archive):
"""`archivebox archiveresult list | archivebox orchestrator` should preserve clean JSONL stdout."""
def test_archiveresult_list_stdout_pipes_into_run(initialized_archive):
"""`archivebox archiveresult list | archivebox run` should preserve clean JSONL stdout."""
url = create_test_url()
snapshot_stdout, snapshot_stderr, snapshot_code = run_archivebox_cmd(
@@ -279,18 +279,17 @@ def test_archiveresult_list_stdout_pipes_into_orchestrator_alias(initialized_arc
assert list_code == 0, list_stderr
_assert_stdout_is_jsonl_only(list_stdout)
orchestrator_stdout, orchestrator_stderr, orchestrator_code = run_archivebox_cmd(
["orchestrator"],
run_stdout, run_stderr, run_code = run_archivebox_cmd(
["run"],
stdin=list_stdout,
data_dir=initialized_archive,
timeout=120,
env=PIPE_TEST_ENV,
)
assert orchestrator_code == 0, orchestrator_stderr
_assert_stdout_is_jsonl_only(orchestrator_stdout)
assert "renamed to `archivebox run`" in orchestrator_stderr
assert run_code == 0, run_stderr
_assert_stdout_is_jsonl_only(run_stdout)
run_records = parse_jsonl_output(orchestrator_stdout)
run_records = parse_jsonl_output(run_stdout)
assert any(
record.get("type") == "ArchiveResult" and record.get("id") == archiveresult["id"]
for record in run_records

View File

@@ -1,342 +0,0 @@
"""
Tests for BinaryWorker processing Binary queue.
Tests cover:
- BinaryWorker is spawned by Orchestrator when Binary queue has work
- Binary hooks (on_Binary__*) actually run and install binaries
- Binary status transitions from QUEUED -> INSTALLED
- BinaryWorker exits after idle timeout
"""
import json
import sqlite3
from archivebox.tests.conftest import (
run_archivebox_cmd,
)
class TestBinaryWorkerSpawning:
"""Tests for BinaryWorker lifecycle."""
def test_binary_worker_spawns_when_binary_queued(self, initialized_archive):
"""Orchestrator spawns BinaryWorker when Binary queue has work."""
# Create a Binary record via CLI
binary_record = {
'type': 'Binary',
'name': 'python3',
'binproviders': 'env', # Use env provider to detect system python
}
# Use `archivebox run` to create the Binary (this queues it)
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(binary_record),
data_dir=initialized_archive,
timeout=60, # Increased timeout to allow for binary installation
)
assert code == 0, f"Failed to create Binary: {stderr}"
# Verify Binary was created in DB
conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
c = conn.cursor()
binaries = c.execute(
"SELECT name, status, abspath FROM machine_binary WHERE name='python3'"
).fetchall()
conn.close()
assert len(binaries) >= 1, "Binary was not created in database"
name, status, abspath = binaries[0]
assert name == 'python3'
# Status should be INSTALLED after BinaryWorker processed it
# (or QUEUED if worker timed out before installing)
assert status in ['installed', 'queued']
def test_binary_hooks_actually_run(self, initialized_archive):
"""Binary installation hooks (on_Binary__*) run and update abspath."""
# Create a Binary for python3 (guaranteed to exist on system)
binary_record = {
'type': 'Binary',
'name': 'python3',
'binproviders': 'env',
}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(binary_record),
data_dir=initialized_archive,
timeout=30,
)
assert code == 0, f"Failed to process Binary: {stderr}"
# Query database to check if hooks ran and populated abspath
conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
c = conn.cursor()
result = c.execute(
"SELECT name, status, abspath, version FROM machine_binary WHERE name='python3'"
).fetchone()
conn.close()
assert result is not None, "Binary not found in database"
name, status, abspath, version = result
# If hooks ran successfully, abspath should be populated
if status == 'installed':
assert abspath, f"Binary installed but abspath is empty: {abspath}"
assert '/python3' in abspath or '\\python3' in abspath, \
f"abspath doesn't look like a python3 path: {abspath}"
# Version should also be populated
assert version, f"Binary installed but version is empty: {version}"
def test_binary_status_transitions(self, initialized_archive):
"""Binary status correctly transitions QUEUED -> INSTALLED."""
binary_record = {
'type': 'Binary',
'name': 'python3',
'binproviders': 'env',
}
# Create and process the Binary
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(binary_record),
data_dir=initialized_archive,
timeout=30,
)
assert code == 0
# Check final status
conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
c = conn.cursor()
status = c.execute(
"SELECT status FROM machine_binary WHERE name='python3'"
).fetchone()
conn.close()
assert status is not None
# Should be installed (or queued if worker timed out)
assert status[0] in ['installed', 'queued']
class TestBinaryWorkerHooks:
"""Tests for specific Binary hook providers."""
def test_env_provider_hook_detects_system_binary(self, initialized_archive):
"""on_Binary__15_env_discover.py hook detects system binaries."""
binary_record = {
'type': 'Binary',
'name': 'python3',
'binproviders': 'env',
}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(binary_record),
data_dir=initialized_archive,
timeout=30,
)
assert code == 0
# Check that env provider hook populated the Binary
conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
c = conn.cursor()
result = c.execute(
"SELECT binprovider, abspath FROM machine_binary WHERE name='python3' AND status='installed'"
).fetchone()
conn.close()
if result:
binprovider, abspath = result
assert binprovider == 'env', f"Expected env provider, got: {binprovider}"
assert abspath, "abspath should be populated by env provider"
def test_multiple_binaries_processed_in_batch(self, initialized_archive):
"""BinaryWorker processes multiple queued binaries."""
# Create multiple Binary records
binaries = [
{'type': 'Binary', 'name': 'python3', 'binproviders': 'env'},
{'type': 'Binary', 'name': 'curl', 'binproviders': 'env'},
]
stdin = '\n'.join(json.dumps(b) for b in binaries)
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=stdin,
data_dir=initialized_archive,
timeout=90, # Need more time for multiple binaries
)
assert code == 0
# Both should be processed
conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
c = conn.cursor()
installed = c.execute(
"SELECT name FROM machine_binary WHERE name IN ('python3', 'curl')"
).fetchall()
conn.close()
assert len(installed) >= 1, "At least one binary should be created"
def test_puppeteer_binary_sets_skip_download_for_hooks(self, initialized_archive):
"""Puppeteer installs expose skip-download env to Binary hooks."""
user_plugins_dir = initialized_archive / 'test_plugins'
plugin_dir = user_plugins_dir / 'inspectnpm'
plugin_dir.mkdir(parents=True, exist_ok=True)
hook = plugin_dir / 'on_Binary__10_inspectnpm_install.py'
hook.write_text(
"""#!/usr/bin/env python3
import argparse
import json
import os
import shutil
import sys
parser = argparse.ArgumentParser()
parser.add_argument('--machine-id', required=True)
parser.add_argument('--binary-id', required=True)
parser.add_argument('--name', required=True)
parser.add_argument('--binproviders', default='*')
args = parser.parse_args()
record = {
'type': 'Binary',
'name': args.name,
'abspath': shutil.which('python3') or sys.executable,
'version': '1.0.0',
'sha256': '',
'binprovider': 'inspectnpm',
'machine_id': args.machine_id,
'binary_id': args.binary_id,
}
print(json.dumps(record))
print(json.dumps({
'type': 'Machine',
'config': {
'SEEN_PUPPETEER_SKIP_DOWNLOAD': os.environ.get('PUPPETEER_SKIP_DOWNLOAD', ''),
'SEEN_PUPPETEER_SKIP_CHROMIUM_DOWNLOAD': os.environ.get('PUPPETEER_SKIP_CHROMIUM_DOWNLOAD', ''),
},
}))
"""
)
binary_record = {
'type': 'Binary',
'name': 'puppeteer',
'binproviders': 'inspectnpm',
}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(binary_record),
data_dir=initialized_archive,
env={
'ARCHIVEBOX_USER_PLUGINS_DIR': str(user_plugins_dir),
'PLUGINS': 'inspectnpm',
},
timeout=60,
)
assert code == 0, f"Failed to process puppeteer Binary: {stderr}"
conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
c = conn.cursor()
result = c.execute(
"SELECT status, binprovider FROM machine_binary WHERE name='puppeteer'"
).fetchone()
hook_rows = c.execute(
"SELECT cmd, env FROM machine_process WHERE process_type='hook' ORDER BY created_at DESC"
).fetchall()
conn.close()
assert result is not None, "Puppeteer binary not found in database"
status, binprovider = result
assert status == 'installed', f"Expected puppeteer to install, got: {status}"
assert binprovider == 'inspectnpm', f"Expected inspectnpm provider, got: {binprovider}"
hook_env = None
for cmd_json, env_json in hook_rows:
cmd = json.loads(cmd_json)
if any('inspectnpm' in part for part in cmd):
hook_env = json.loads(env_json)
break
assert hook_env is not None, "Inspectnpm hook process not found"
assert hook_env.get('PUPPETEER_SKIP_DOWNLOAD') == 'true'
assert hook_env.get('PUPPETEER_SKIP_CHROMIUM_DOWNLOAD') == 'true'
class TestBinaryWorkerEdgeCases:
"""Tests for edge cases and error handling."""
def test_nonexistent_binary_stays_queued(self, initialized_archive):
"""Binary that doesn't exist stays queued (doesn't fail permanently)."""
binary_record = {
'type': 'Binary',
'name': 'nonexistent-binary-xyz-12345',
'binproviders': 'env',
}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(binary_record),
data_dir=initialized_archive,
timeout=30,
)
# Command should still succeed (orchestrator doesn't fail on binary install failures)
assert code == 0
# Binary should remain queued (not installed)
conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
c = conn.cursor()
result = c.execute(
"SELECT status FROM machine_binary WHERE name='nonexistent-binary-xyz-12345'"
).fetchone()
conn.close()
if result:
status = result[0]
# Should stay queued since installation failed
assert status == 'queued', f"Expected queued, got: {status}"
def test_binary_worker_respects_machine_isolation(self, initialized_archive):
"""BinaryWorker only processes binaries for current machine."""
# This is implicitly tested by other tests - Binary.objects.filter(machine=current)
# ensures only current machine's binaries are processed
binary_record = {
'type': 'Binary',
'name': 'python3',
'binproviders': 'env',
}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(binary_record),
data_dir=initialized_archive,
timeout=30,
)
assert code == 0
# Check that machine_id is set correctly
conn = sqlite3.connect(initialized_archive / 'index.sqlite3')
c = conn.cursor()
result = c.execute(
"SELECT machine_id FROM machine_binary WHERE name='python3'"
).fetchone()
conn.close()
assert result is not None
machine_id = result[0]
assert machine_id, "machine_id should be set on Binary"

View File

@@ -369,9 +369,9 @@ class TestProcessCurrent(TestCase):
self.assertEqual(proc1.id, proc2.id)
def test_process_detect_type_orchestrator(self):
"""_detect_process_type should detect orchestrator."""
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
def test_process_detect_type_runner(self):
"""_detect_process_type should detect the background runner command."""
with patch('sys.argv', ['archivebox', 'run', '--daemon']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
@@ -381,11 +381,11 @@ class TestProcessCurrent(TestCase):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.CLI)
def test_process_detect_type_worker(self):
"""_detect_process_type should detect workers."""
with patch('sys.argv', ['python', '-m', 'crawl_worker']):
def test_process_detect_type_binary(self):
"""_detect_process_type should detect non-ArchiveBox subprocesses as binary processes."""
with patch('sys.argv', ['/usr/bin/wget', 'https://example.com']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.WORKER)
self.assertEqual(result, Process.TypeChoices.BINARY)
class TestProcessHierarchy(TestCase):

View File

@@ -1,484 +0,0 @@
"""
Unit tests for the Orchestrator and Worker classes.
Tests cover:
1. Orchestrator lifecycle (startup, shutdown)
2. Queue polling and worker spawning
3. Idle detection and exit logic
4. Worker registration and management
5. Process model methods (replacing old pid_utils)
"""
import os
import time
from datetime import datetime, timedelta
from unittest.mock import patch
from typing import ClassVar
import pytest
from django.test import TestCase
from django.utils import timezone
from archivebox.workers.orchestrator import Orchestrator
from archivebox.workers.worker import Worker
class FakeWorker(Worker):
name: ClassVar[str] = 'crawl'
MAX_CONCURRENT_TASKS: ClassVar[int] = 5
running_workers: ClassVar[list[dict[str, object]]] = []
@classmethod
def get_running_workers(cls) -> list[dict[str, object]]:
return cls.running_workers
class TestOrchestratorUnit(TestCase):
"""Unit tests for Orchestrator class (mocked dependencies)."""
def test_orchestrator_creation(self):
"""Orchestrator should initialize with correct defaults."""
orchestrator = Orchestrator(exit_on_idle=True)
self.assertTrue(orchestrator.exit_on_idle)
self.assertEqual(orchestrator.idle_count, 0)
self.assertIsNone(orchestrator.pid_file)
def test_orchestrator_repr(self):
"""Orchestrator __repr__ should include PID."""
orchestrator = Orchestrator()
repr_str = repr(orchestrator)
self.assertIn('Orchestrator', repr_str)
self.assertIn(str(os.getpid()), repr_str)
def test_has_pending_work(self):
"""has_pending_work should check if any queue has items."""
orchestrator = Orchestrator()
self.assertFalse(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 0}))
self.assertTrue(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 5}))
self.assertTrue(orchestrator.has_pending_work({'crawl': 10, 'snapshot': 0}))
def test_should_exit_not_exit_on_idle(self):
"""should_exit should return False when exit_on_idle is False."""
orchestrator = Orchestrator(exit_on_idle=False)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
def test_should_exit_pending_work(self):
"""should_exit should return False when there's pending work."""
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 5}))
@patch.object(Orchestrator, 'has_running_workers')
def test_should_exit_running_workers(self, mock_has_workers):
"""should_exit should return False when workers are running."""
mock_has_workers.return_value = True
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
@patch.object(Orchestrator, 'has_running_workers')
@patch.object(Orchestrator, 'has_future_work')
def test_should_exit_idle_timeout(self, mock_future, mock_workers):
"""should_exit should return True after idle timeout with no work."""
mock_workers.return_value = False
mock_future.return_value = False
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT
self.assertTrue(orchestrator.should_exit({'crawl': 0, 'snapshot': 0}))
@patch.object(Orchestrator, 'has_running_workers')
@patch.object(Orchestrator, 'has_future_work')
def test_should_exit_below_idle_timeout(self, mock_future, mock_workers):
"""should_exit should return False below idle timeout."""
mock_workers.return_value = False
mock_future.return_value = False
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT - 1
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
def test_should_spawn_worker_no_queue(self):
"""should_spawn_worker should return False when queue is empty."""
orchestrator = Orchestrator()
FakeWorker.running_workers = []
self.assertFalse(orchestrator.should_spawn_worker(FakeWorker, 0))
def test_should_spawn_worker_at_limit(self):
"""should_spawn_worker should return False when at per-type limit."""
orchestrator = Orchestrator()
running_workers: list[dict[str, object]] = [{'worker_id': worker_id} for worker_id in range(orchestrator.MAX_CRAWL_WORKERS)]
FakeWorker.running_workers = running_workers
self.assertFalse(orchestrator.should_spawn_worker(FakeWorker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_at_total_limit(self, mock_total):
"""should_spawn_worker should return False when at total limit."""
orchestrator = Orchestrator()
mock_total.return_value = 0
running_workers: list[dict[str, object]] = [{'worker_id': worker_id} for worker_id in range(orchestrator.MAX_CRAWL_WORKERS)]
FakeWorker.running_workers = running_workers
self.assertFalse(orchestrator.should_spawn_worker(FakeWorker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_success(self, mock_total):
"""should_spawn_worker should return True when conditions are met."""
orchestrator = Orchestrator()
mock_total.return_value = 0
FakeWorker.running_workers = []
self.assertTrue(orchestrator.should_spawn_worker(FakeWorker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_enough_workers(self, mock_total):
"""should_spawn_worker should return False when enough workers for queue."""
orchestrator = Orchestrator()
mock_total.return_value = 2
FakeWorker.running_workers = [{}] # 1 worker running
self.assertFalse(orchestrator.should_spawn_worker(FakeWorker, 3))
class TestOrchestratorWithProcess(TestCase):
"""Test Orchestrator using Process model for tracking."""
def setUp(self):
"""Reset process cache."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_is_running_no_orchestrator(self):
"""is_running should return False when no orchestrator process exists."""
from archivebox.machine.models import Process
# Clean up any stale processes first
Process.cleanup_stale_running()
# Mark any running orchestrators as exited for clean test state
Process.objects.filter(
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING
).update(status=Process.StatusChoices.EXITED)
self.assertFalse(Orchestrator.is_running())
def test_is_running_with_orchestrator_process(self):
"""is_running should return True when orchestrator Process exists."""
from archivebox.machine.models import Process, Machine
import psutil
machine = Machine.current()
current_proc = psutil.Process(os.getpid())
# Create an orchestrator Process record
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(), # Use current PID so it appears alive
started_at=datetime.fromtimestamp(current_proc.create_time(), tz=timezone.get_current_timezone()),
cmd=current_proc.cmdline(),
)
try:
# Should detect running orchestrator
self.assertTrue(Orchestrator.is_running())
finally:
# Clean up
proc.status = Process.StatusChoices.EXITED
proc.save()
def test_orchestrator_uses_process_for_is_running(self):
"""Orchestrator.is_running should use Process.get_running_count."""
from archivebox.machine.models import Process
# Verify is_running uses Process model, not pid files
with patch.object(Process, 'get_running_count') as mock_count:
mock_count.return_value = 1
result = Orchestrator.is_running()
# Should have called Process.get_running_count with orchestrator type
mock_count.assert_called()
self.assertTrue(result)
def test_orchestrator_scoped_worker_count(self):
"""Orchestrator with crawl_id should count only descendant workers."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
orchestrator = Orchestrator(exit_on_idle=True, crawl_id='test-crawl')
orchestrator.db_process = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
pid=12345,
started_at=timezone.now(),
)
# Prevent cleanup from marking fake PIDs as exited
orchestrator._last_cleanup_time = time.time()
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
worker_type='crawl',
status=Process.StatusChoices.RUNNING,
pid=12346,
parent=orchestrator.db_process,
started_at=timezone.now(),
)
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
worker_type='crawl',
status=Process.StatusChoices.RUNNING,
pid=12347,
started_at=timezone.now(),
)
self.assertEqual(orchestrator.get_total_worker_count(), 1)
class TestProcessBasedWorkerTracking(TestCase):
"""Test Process model methods that replace pid_utils functionality."""
def setUp(self):
"""Reset caches."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_process_current_creates_record(self):
"""Process.current() should create a Process record for current PID."""
from archivebox.machine.models import Process
proc = Process.current()
self.assertIsNotNone(proc)
self.assertEqual(proc.pid, os.getpid())
self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
self.assertIsNotNone(proc.machine)
self.assertIsNotNone(proc.started_at)
def test_process_current_caches_result(self):
"""Process.current() should return cached Process within interval."""
from archivebox.machine.models import Process
proc1 = Process.current()
proc2 = Process.current()
self.assertEqual(proc1.id, proc2.id)
def test_process_get_running_count(self):
"""Process.get_running_count should count running processes by type."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create some worker processes
for i in range(3):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99990 + i, # Fake PIDs
started_at=timezone.now(),
)
count = Process.get_running_count(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(count, 3)
def test_process_get_next_worker_id(self):
"""Process.get_next_worker_id should return count of running workers."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create 2 worker processes
for i in range(2):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99980 + i,
started_at=timezone.now(),
)
next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(next_id, 2)
def test_process_cleanup_stale_running(self):
"""Process.cleanup_stale_running should mark stale processes as exited."""
from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW
machine = Machine.current()
# Create a stale process (old started_at, fake PID)
stale_proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=999999, # Fake PID that doesn't exist
started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
)
cleaned = Process.cleanup_stale_running()
self.assertGreaterEqual(cleaned, 1)
stale_proc.refresh_from_db()
self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED)
def test_process_get_running(self):
"""Process.get_running should return queryset of running processes."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create a running process
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99970,
started_at=timezone.now(),
)
running = Process.get_running(process_type=Process.TypeChoices.HOOK)
self.assertIn(proc, running)
def test_process_type_detection(self):
"""Process._detect_process_type should detect process type from argv."""
from archivebox.machine.models import Process
# Test detection logic
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.CLI)
with patch('sys.argv', ['supervisord', '-c', 'config.ini']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.SUPERVISORD)
class TestProcessLifecycle(TestCase):
"""Test Process model lifecycle methods."""
def setUp(self):
"""Reset caches and create a machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
self.machine = models.Machine.current()
def test_process_is_running_property(self):
"""Process.is_running should check actual OS process."""
from archivebox.machine.models import Process
proc = Process.current()
# Should be running (current process exists)
self.assertTrue(proc.is_running)
# Create a process with fake PID
fake_proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
# Should not be running (PID doesn't exist)
self.assertFalse(fake_proc.is_running)
def test_process_poll(self):
"""Process.poll should check and update exit status."""
from archivebox.machine.models import Process
# Create a process with fake PID (already exited)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
exit_code = proc.poll()
# Should have detected exit and updated status
self.assertIsNotNone(exit_code)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_terminate_already_dead(self):
"""Process.terminate should handle already-dead processes."""
from archivebox.machine.models import Process
# Create a process with fake PID
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
result = proc.terminate()
# Should return False (was already dead)
self.assertFalse(result)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_tree_traversal(self):
"""Process parent/children relationships should work."""
from archivebox.machine.models import Process
# Create parent process
parent = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
pid=1,
started_at=timezone.now(),
)
# Create child process
child = Process.objects.create(
machine=self.machine,
parent=parent,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=2,
started_at=timezone.now(),
)
# Test relationships
self.assertEqual(child.parent, parent)
self.assertIn(child, parent.children.all())
self.assertEqual(child.root, parent)
self.assertEqual(child.depth, 1)
self.assertEqual(parent.depth, 0)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -1,138 +0,0 @@
import os
import sqlite3
import subprocess
from pathlib import Path
def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
candidates = {snapshot_id}
if len(snapshot_id) == 32:
hyphenated = f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}"
candidates.add(hyphenated)
elif len(snapshot_id) == 36 and '-' in snapshot_id:
candidates.add(snapshot_id.replace('-', ''))
for needle in candidates:
for path in data_dir.rglob(needle):
if path.is_dir():
return path
return None
def _find_html_with_text(root: Path, needle: str) -> list[Path]:
hits: list[Path] = []
for path in root.rglob("*.htm*"):
if not path.is_file():
continue
try:
if needle in path.read_text(errors="ignore"):
hits.append(path)
except Exception:
continue
return hits
def test_add_real_world_example_domain(tmp_path):
os.chdir(tmp_path)
tmp_short = Path("/tmp") / f"abx-{tmp_path.name}"
tmp_short.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["TMP_DIR"] = str(tmp_short)
env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
env["SAVE_TITLE"] = "True"
env["SAVE_WGET"] = "True"
env["SAVE_SINGLEFILE"] = "True"
env["SAVE_READABILITY"] = "False"
env["SAVE_HTMLTOTEXT"] = "True"
env["SAVE_HEADERS"] = "True"
env["SAVE_PDF"] = "False"
env["SAVE_SCREENSHOT"] = "False"
env["SAVE_ARCHIVEDOTORG"] = "False"
env["SAVE_YTDLP"] = "False"
env["SAVE_GIT"] = "False"
init = subprocess.run(
["archivebox", "init"],
capture_output=True,
text=True,
timeout=120,
env=env,
)
assert init.returncode == 0, f"archivebox init failed: {init.stderr}"
result = subprocess.run(
["archivebox", "add", "--plugins=title,wget,singlefile,htmltotext,headers", "https://example.com"],
capture_output=True,
text=True,
timeout=900,
env=env,
)
assert result.returncode == 0, (
"archivebox add failed.\n"
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
conn = sqlite3.connect(tmp_path / "index.sqlite3")
c = conn.cursor()
snapshot_row = c.execute(
"SELECT id, url, title FROM core_snapshot WHERE url = ?",
("https://example.com",),
).fetchone()
assert snapshot_row is not None, "Snapshot for https://example.com not found in DB"
snapshot_id, snapshot_url, snapshot_title = snapshot_row
assert snapshot_title and "Example Domain" in snapshot_title, (
f"Expected title to contain Example Domain, got: {snapshot_title}"
)
failed_results = c.execute(
"SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ? AND status = 'failed'",
(snapshot_id,),
).fetchone()[0]
assert failed_results == 0, "Some archive results failed for example.com snapshot"
binary_workers = c.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary'"
).fetchone()[0]
assert binary_workers > 0, "Expected BinaryWorker to run installs via BinaryMachine"
failed_binary_workers = c.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary' "
"AND exit_code IS NOT NULL AND exit_code != 0"
).fetchone()[0]
assert failed_binary_workers == 0, "BinaryWorker reported non-zero exit codes"
queued_binaries = c.execute(
"SELECT name FROM machine_binary WHERE status != 'installed'"
).fetchall()
assert not queued_binaries, f"Some binaries did not install: {queued_binaries}"
conn.close()
snapshot_dir = _find_snapshot_dir(tmp_path, str(snapshot_id))
assert snapshot_dir is not None, "Snapshot output directory not found"
title_path = snapshot_dir / "title" / "title.txt"
assert title_path.exists(), f"Missing title output: {title_path}"
assert "Example Domain" in title_path.read_text(errors="ignore")
html_sources = []
for candidate in ("wget", "singlefile", "dom"):
for candidate_dir in (snapshot_dir / candidate, *snapshot_dir.glob(f"*_{candidate}")):
if candidate_dir.exists():
html_sources.extend(_find_html_with_text(candidate_dir, "Example Domain"))
assert len(html_sources) >= 2, (
"Expected HTML outputs from multiple extractors to contain Example Domain "
f"(found {len(html_sources)})."
)
text_hits = 0
for path in (
*snapshot_dir.glob("*_htmltotext/htmltotext.txt"),
snapshot_dir / "htmltotext" / "htmltotext.txt",
):
if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
text_hits += 1
assert text_hits >= 1, (
"Expected htmltotext output to contain Example Domain "
f"(htmltotext hits={text_hits})."
)

View File

@@ -1,84 +0,0 @@
from datetime import timedelta
from typing import cast
from unittest.mock import patch
from django.contrib.auth import get_user_model
from django.contrib.auth.models import UserManager
from django.test import TestCase
from django.utils import timezone
from archivebox.crawls.models import Crawl, CrawlSchedule
from archivebox.workers.orchestrator import Orchestrator
from archivebox.workers.worker import CrawlWorker
class TestScheduledCrawlMaterialization(TestCase):
def setUp(self):
user_manager = cast(UserManager, get_user_model().objects)
self.user = user_manager.create_user(
username='schedule-user',
password='password',
)
def _create_due_schedule(self) -> CrawlSchedule:
template = Crawl.objects.create(
urls='https://example.com/feed.xml',
max_depth=1,
tags_str='scheduled',
label='Scheduled Feed',
notes='template',
created_by=self.user,
status=Crawl.StatusChoices.SEALED,
retry_at=None,
)
schedule = CrawlSchedule.objects.create(
template=template,
schedule='daily',
is_enabled=True,
label='Scheduled Feed',
notes='template',
created_by=self.user,
)
past = timezone.now() - timedelta(days=2)
Crawl.objects.filter(pk=template.pk).update(created_at=past, modified_at=past)
template.refresh_from_db()
schedule.refresh_from_db()
return schedule
def test_global_orchestrator_materializes_due_schedule(self):
schedule = self._create_due_schedule()
orchestrator = Orchestrator(exit_on_idle=False)
orchestrator._materialize_due_schedules()
scheduled_crawls = Crawl.objects.filter(schedule=schedule).order_by('created_at')
self.assertEqual(scheduled_crawls.count(), 2)
queued_crawl = scheduled_crawls.last()
self.assertIsNotNone(queued_crawl)
assert queued_crawl is not None
self.assertEqual(queued_crawl.status, Crawl.StatusChoices.QUEUED)
self.assertEqual(queued_crawl.urls, 'https://example.com/feed.xml')
self.assertEqual(queued_crawl.max_depth, 1)
self.assertEqual(queued_crawl.tags_str, 'scheduled')
def test_one_shot_orchestrator_does_not_materialize_due_schedule(self):
schedule = self._create_due_schedule()
Orchestrator(exit_on_idle=True)._materialize_due_schedules()
self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1)
Orchestrator(exit_on_idle=False, crawl_id=str(schedule.template.id))._materialize_due_schedules()
self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1)
@patch.object(CrawlWorker, 'start')
def test_global_orchestrator_waits_one_tick_before_spawning_materialized_schedule(self, mock_start):
schedule = self._create_due_schedule()
orchestrator = Orchestrator(exit_on_idle=False)
with patch.object(orchestrator, '_claim_crawl', return_value=True):
queue_sizes = orchestrator.check_queues_and_spawn_workers()
self.assertEqual(queue_sizes['crawl'], 1)
self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 2)
mock_start.assert_not_called()

View File

@@ -1,76 +0,0 @@
from pathlib import Path
from types import SimpleNamespace
from typing import Any, cast
from unittest.mock import patch
from django.test import SimpleTestCase
from archivebox.workers.worker import SnapshotWorker
class TestSnapshotWorkerRetryForegroundHooks(SimpleTestCase):
def _make_worker(self):
worker = SnapshotWorker.__new__(SnapshotWorker)
worker.pid = 12345
cast(Any, worker).snapshot = SimpleNamespace(
status='started',
refresh_from_db=lambda: None,
)
worker._snapshot_exceeded_hard_timeout = lambda: False
worker._seal_snapshot_due_to_timeout = lambda: None
worker._run_hook = lambda *args, **kwargs: SimpleNamespace()
worker._wait_for_hook = lambda process, ar: None
return worker
@patch('archivebox.workers.worker.log_worker_event')
def test_retry_skips_successful_hook_with_only_inline_output(self, mock_log):
worker = self._make_worker()
archive_result = SimpleNamespace(
status='succeeded',
output_files={},
output_str='scrolled 600px',
output_json=None,
refresh_from_db=lambda: None,
)
worker._retry_failed_empty_foreground_hooks(
[(Path('/tmp/on_Snapshot__45_infiniscroll.js'), archive_result)],
config={},
)
mock_log.assert_not_called()
@patch('archivebox.workers.worker.log_worker_event')
def test_retry_replays_failed_hook_with_no_outputs(self, mock_log):
worker = self._make_worker()
run_calls = []
wait_calls = []
def run_hook(*args, **kwargs):
run_calls.append((args, kwargs))
return SimpleNamespace()
def wait_for_hook(process, ar):
wait_calls.append((process, ar))
ar.status = 'succeeded'
ar.output_files = {'singlefile.html': {}}
archive_result = SimpleNamespace(
status='failed',
output_files={},
output_str='',
output_json=None,
refresh_from_db=lambda: None,
)
worker._run_hook = run_hook
worker._wait_for_hook = wait_for_hook
worker._retry_failed_empty_foreground_hooks(
[(Path('/tmp/on_Snapshot__50_singlefile.py'), archive_result)],
config={},
)
assert len(run_calls) == 1
assert len(wait_calls) == 1
mock_log.assert_called_once()

View File

@@ -1,143 +0,0 @@
import threading
import time
import pytest
from django.db import close_old_connections
from django.utils import timezone
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
from archivebox.machine.models import Binary, Machine
from archivebox.workers.worker import BinaryWorker
def get_fresh_machine() -> Machine:
import archivebox.machine.models as machine_models
machine_models._CURRENT_MACHINE = None
machine_models._CURRENT_BINARIES.clear()
return Machine.current()
@pytest.mark.django_db
def test_claim_processing_lock_does_not_steal_future_retry_at():
"""
retry_at is both the schedule and the ownership lock.
Once one process claims a due row and moves retry_at into the future, a
fresh reader must not be able to "re-claim" that future timestamp and run
the same side effects a second time.
"""
machine = get_fresh_machine()
binary = Binary.objects.create(
machine=machine,
name='claim-test',
binproviders='env',
status=Binary.StatusChoices.QUEUED,
retry_at=timezone.now(),
)
owner = Binary.objects.get(pk=binary.pk)
contender = Binary.objects.get(pk=binary.pk)
assert owner.claim_processing_lock(lock_seconds=30) is True
contender.refresh_from_db()
assert contender.retry_at > timezone.now()
assert contender.claim_processing_lock(lock_seconds=30) is False
@pytest.mark.django_db
def test_binary_worker_skips_binary_claimed_by_other_owner(monkeypatch):
"""
BinaryWorker must never run install side effects for a Binary whose retry_at
lock has already been claimed by another process.
"""
machine = get_fresh_machine()
binary = Binary.objects.create(
machine=machine,
name='claimed-binary',
binproviders='env',
status=Binary.StatusChoices.QUEUED,
retry_at=timezone.now(),
)
owner = Binary.objects.get(pk=binary.pk)
assert owner.claim_processing_lock(lock_seconds=30) is True
calls: list[str] = []
def fake_run(self):
calls.append(self.name)
self.status = self.StatusChoices.INSTALLED
self.abspath = '/tmp/fake-binary'
self.version = '1.0'
self.save(update_fields=['status', 'abspath', 'version', 'modified_at'])
monkeypatch.setattr(Binary, 'run', fake_run)
worker = BinaryWorker(binary_id=str(binary.id))
worker._process_single_binary()
assert calls == []
@pytest.mark.django_db(transaction=True)
def test_crawl_install_declared_binaries_waits_for_existing_owner(monkeypatch):
"""
Crawl.install_declared_binaries should wait for the current owner of a Binary
to finish instead of launching a duplicate install against shared provider
state such as the npm tree.
"""
machine = get_fresh_machine()
crawl = Crawl.objects.create(
urls='https://example.com',
created_by_id=get_or_create_system_user_pk(),
status=Crawl.StatusChoices.QUEUED,
retry_at=timezone.now(),
)
binary = Binary.objects.create(
machine=machine,
name='puppeteer',
binproviders='npm',
status=Binary.StatusChoices.QUEUED,
retry_at=timezone.now(),
)
owner = Binary.objects.get(pk=binary.pk)
assert owner.claim_processing_lock(lock_seconds=30) is True
calls: list[str] = []
def fake_run(self):
calls.append(self.name)
self.status = self.StatusChoices.INSTALLED
self.abspath = '/tmp/should-not-run'
self.version = '1.0'
self.save(update_fields=['status', 'abspath', 'version', 'modified_at'])
monkeypatch.setattr(Binary, 'run', fake_run)
def finish_existing_install():
close_old_connections()
try:
time.sleep(0.3)
Binary.objects.filter(pk=binary.pk).update(
status=Binary.StatusChoices.INSTALLED,
retry_at=None,
abspath='/tmp/finished-by-owner',
version='1.0',
modified_at=timezone.now(),
)
finally:
close_old_connections()
thread = threading.Thread(target=finish_existing_install, daemon=True)
thread.start()
crawl.install_declared_binaries({'puppeteer'}, machine=machine)
thread.join(timeout=5)
binary.refresh_from_db()
assert binary.status == Binary.StatusChoices.INSTALLED
assert binary.abspath == '/tmp/finished-by-owner'
assert calls == []

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,7 @@
"""
Workers admin module.
The orchestrator/worker system doesn't need Django admin registration
as workers are managed via CLI commands and the orchestrator.
Background runner processes do not need Django admin registration.
"""
__package__ = 'archivebox.workers'

View File

@@ -1,20 +0,0 @@
from django.core.management.base import BaseCommand
from archivebox.workers.orchestrator import Orchestrator
class Command(BaseCommand):
help = 'Run the archivebox orchestrator'
def add_arguments(self, parser):
parser.add_argument(
'--exit-on-idle',
action='store_true',
default=False,
help="Exit when all work is complete (default: run forever)"
)
def handle(self, *args, **kwargs):
exit_on_idle = kwargs.get('exit_on_idle', False)
orchestrator = Orchestrator(exit_on_idle=exit_on_idle)
orchestrator.runloop()

View File

@@ -2,7 +2,7 @@ from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Watch the runserver autoreload PID file and restart orchestrator on reloads."
help = "Watch the runserver autoreload PID file and restart the background runner on reloads."
def add_arguments(self, parser):
parser.add_argument(
@@ -19,22 +19,24 @@ class Command(BaseCommand):
def handle(self, *args, **kwargs):
import os
import subprocess
import sys
import time
from archivebox.config.common import STORAGE_CONFIG
from archivebox.machine.models import Process, Machine
from archivebox.workers.orchestrator import Orchestrator
os.environ['ARCHIVEBOX_ORCHESTRATOR_WATCHER'] = '1'
from archivebox.config.common import STORAGE_CONFIG
from archivebox.machine.models import Machine, Process
pidfile = kwargs.get("pidfile") or os.environ.get("ARCHIVEBOX_RUNSERVER_PIDFILE")
if not pidfile:
pidfile = str(STORAGE_CONFIG.TMP_DIR / "runserver.pid")
interval = max(0.2, float(kwargs.get("interval", 1.0)))
last_pid = None
runner_proc: subprocess.Popen[bytes] | None = None
def restart_runner() -> None:
nonlocal runner_proc
def restart_orchestrator():
Process.cleanup_stale_running()
machine = Machine.current()
@@ -43,21 +45,39 @@ class Command(BaseCommand):
status=Process.StatusChoices.RUNNING,
process_type__in=[
Process.TypeChoices.ORCHESTRATOR,
Process.TypeChoices.WORKER,
Process.TypeChoices.HOOK,
Process.TypeChoices.BINARY,
],
)
for proc in running:
try:
if proc.process_type == Process.TypeChoices.HOOK:
proc.kill_tree(graceful_timeout=0.5)
else:
proc.terminate(graceful_timeout=1.0)
proc.kill_tree(graceful_timeout=0.5)
except Exception:
continue
if not Orchestrator.is_running():
Orchestrator(exit_on_idle=False).start()
if runner_proc and runner_proc.poll() is None:
try:
runner_proc.terminate()
runner_proc.wait(timeout=2.0)
except Exception:
try:
runner_proc.kill()
except Exception:
pass
runner_proc = subprocess.Popen(
[sys.executable, '-m', 'archivebox', 'run', '--daemon'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
def runner_running() -> bool:
return Process.objects.filter(
machine=Machine.current(),
status=Process.StatusChoices.RUNNING,
process_type=Process.TypeChoices.ORCHESTRATOR,
).exists()
while True:
try:
@@ -68,11 +88,10 @@ class Command(BaseCommand):
pid = None
if pid and pid != last_pid:
restart_orchestrator()
restart_runner()
last_pid = pid
elif not Orchestrator.is_running():
Orchestrator(exit_on_idle=False).start()
elif not runner_running():
restart_runner()
except Exception:
pass

File diff suppressed because it is too large Load Diff

View File

@@ -29,13 +29,12 @@ WORKERS_DIR_NAME = "workers"
# Global reference to supervisord process for cleanup
_supervisord_proc = None
ORCHESTRATOR_WORKER = {
"name": "worker_orchestrator",
# Use Django management command to avoid stdin/TTY ambiguity in `archivebox run`.
"command": "archivebox manage orchestrator",
RUNNER_WORKER = {
"name": "worker_runner",
"command": "archivebox run --daemon",
"autostart": "true",
"autorestart": "true",
"stdout_logfile": "logs/worker_orchestrator.log",
"stdout_logfile": "logs/worker_runner.log",
"redirect_stderr": "true",
}
@@ -515,9 +514,7 @@ def watch_worker(supervisor, daemon_name, interval=5):
def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
supervisor = get_or_create_supervisord_process(daemonize=daemonize)
bg_workers = [
ORCHESTRATOR_WORKER,
]
bg_workers = [RUNNER_WORKER]
print()
start_worker(supervisor, SERVER_WORKER(host=host, port=port))
@@ -532,7 +529,7 @@ def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
sys.stdout.write('Tailing worker logs (Ctrl+C to stop)...\n\n')
sys.stdout.flush()
tail_multiple_worker_logs(
log_files=['logs/worker_daphne.log', 'logs/worker_orchestrator.log'],
log_files=['logs/worker_daphne.log', 'logs/worker_runner.log'],
follow=True,
proc=_supervisord_proc, # Stop tailing when supervisord exits
)
@@ -551,7 +548,7 @@ def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
def start_cli_workers(watch=False):
supervisor = get_or_create_supervisord_process(daemonize=False)
start_worker(supervisor, ORCHESTRATOR_WORKER)
start_worker(supervisor, RUNNER_WORKER)
if watch:
try:
@@ -560,7 +557,7 @@ def start_cli_workers(watch=False):
_supervisord_proc.wait()
else:
# Fallback to watching worker if no proc reference
watch_worker(supervisor, ORCHESTRATOR_WORKER['name'])
watch_worker(supervisor, RUNNER_WORKER['name'])
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
@@ -571,7 +568,7 @@ def start_cli_workers(watch=False):
# Ensure supervisord and all children are stopped
stop_existing_supervisord_process()
time.sleep(1.0) # Give processes time to fully terminate
return [ORCHESTRATOR_WORKER]
return [RUNNER_WORKER]
# def main(daemons):

View File

@@ -1,11 +1,11 @@
"""
Background task functions for queuing work to the orchestrator.
Background task functions for queuing work to the background runner.
These functions queue Snapshots/Crawls for processing by setting their status
to QUEUED, which the orchestrator workers will pick up and process.
to QUEUED so `archivebox run --daemon` or `archivebox server` can pick them up.
NOTE: These functions do NOT start the orchestrator - they assume it's already
running via `archivebox server` (supervisord) or will be run inline by the CLI.
NOTE: These functions do NOT start the runner. They assume it's already
running via `archivebox server` or will be run inline by the CLI.
"""
__package__ = 'archivebox.workers'
@@ -34,14 +34,12 @@ def bg_add(add_kwargs: dict) -> int:
def bg_archive_snapshots(snapshots, kwargs: dict | None = None) -> int:
"""
Queue multiple snapshots for archiving via the state machine system.
This sets snapshots to 'queued' status so the orchestrator workers pick them up.
The actual archiving happens through the worker's process_item() method.
Queue multiple snapshots for archiving via the shared runner loop.
Returns the number of snapshots queued.
"""
from archivebox.core.models import Snapshot
from archivebox.crawls.models import Crawl
kwargs = kwargs or {}
@@ -49,11 +47,16 @@ def bg_archive_snapshots(snapshots, kwargs: dict | None = None) -> int:
queued_count = 0
for snapshot in snapshots:
if hasattr(snapshot, 'id'):
# Update snapshot to queued state so workers pick it up
Snapshot.objects.filter(id=snapshot.id).update(
status=Snapshot.StatusChoices.QUEUED,
retry_at=timezone.now(),
)
crawl_id = getattr(snapshot, 'crawl_id', None)
if crawl_id:
Crawl.objects.filter(id=crawl_id).exclude(status=Crawl.StatusChoices.SEALED).update(
status=Crawl.StatusChoices.QUEUED,
retry_at=timezone.now(),
)
queued_count += 1
return queued_count
@@ -61,21 +64,24 @@ def bg_archive_snapshots(snapshots, kwargs: dict | None = None) -> int:
def bg_archive_snapshot(snapshot, overwrite: bool = False, methods: list | None = None) -> int:
"""
Queue a single snapshot for archiving via the state machine system.
This sets the snapshot to 'queued' status so the orchestrator workers pick it up.
The actual archiving happens through the worker's process_item() method.
Queue a single snapshot for archiving via the shared runner loop.
Returns 1 if queued, 0 otherwise.
"""
from archivebox.core.models import Snapshot
from archivebox.crawls.models import Crawl
# Queue the snapshot by setting status to queued
if hasattr(snapshot, 'id'):
Snapshot.objects.filter(id=snapshot.id).update(
status=Snapshot.StatusChoices.QUEUED,
retry_at=timezone.now(),
)
crawl_id = getattr(snapshot, 'crawl_id', None)
if crawl_id:
Crawl.objects.filter(id=crawl_id).exclude(status=Crawl.StatusChoices.SEALED).update(
status=Crawl.StatusChoices.QUEUED,
retry_at=timezone.now(),
)
return 1
return 0

File diff suppressed because it is too large Load Diff