From f3622d8cd38ce208546cde84c99940fe4ff465db Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Wed, 25 Mar 2026 05:36:07 -0700 Subject: [PATCH] update working changes --- archivebox/cli/archivebox_pluginmap.py | 6 +- archivebox/config/configset.py | 10 +- archivebox/core/admin_archiveresults.py | 4 +- archivebox/core/views.py | 73 ++- archivebox/crawls/models.py | 2 +- archivebox/hooks.py | 25 +- archivebox/machine/models.py | 35 +- archivebox/services/archive_result_service.py | 163 +++-- archivebox/services/binary_service.py | 174 ++---- archivebox/services/crawl_service.py | 48 +- archivebox/services/db.py | 16 - archivebox/services/machine_service.py | 15 +- archivebox/services/process_service.py | 389 ++++-------- archivebox/services/runner.py | 560 +++++++----------- archivebox/services/snapshot_service.py | 176 +++--- archivebox/services/tag_service.py | 16 +- archivebox/tests/migrations_helpers.py | 3 +- archivebox/tests/test_admin_links.py | 4 +- archivebox/tests/test_admin_views.py | 25 +- .../tests/test_archive_result_service.py | 339 +++++------ archivebox/tests/test_cli_run.py | 5 +- archivebox/tests/test_hooks.py | 19 +- archivebox/tests/test_machine_models.py | 6 +- archivebox/tests/test_process_service.py | 65 -- archivebox/tests/test_runner.py | 414 ++++--------- archivebox/tests/test_tag_service.py | 48 ++ docs | 2 +- old/TODO_hook_architecture.md | 4 +- pyproject.toml | 5 + 29 files changed, 985 insertions(+), 1666 deletions(-) delete mode 100644 archivebox/services/db.py create mode 100644 archivebox/tests/test_tag_service.py diff --git a/archivebox/cli/archivebox_pluginmap.py b/archivebox/cli/archivebox_pluginmap.py index f17ddbb9..547d05e3 100644 --- a/archivebox/cli/archivebox_pluginmap.py +++ b/archivebox/cli/archivebox_pluginmap.py @@ -26,7 +26,7 @@ EVENT_FLOW_DIAGRAM = """ │ CrawlStartEvent │ │ └─ SnapshotEvent │ │ └─ on_Snapshot__* │ -│ └─ Snapshot / ArchiveResult / Tag / Machine / BinaryRequest │ +│ └─ ArchiveResult / Snapshot / Tag │ │ │ │ SnapshotCleanupEvent -> internal cleanup, no direct hook family │ │ CrawlCleanupEvent -> internal cleanup, no direct hook family │ @@ -89,8 +89,8 @@ def pluginmap( "emits": ["ProcessEvent"], }, "SnapshotEvent": { - "description": "Per-snapshot extraction phase. on_Snapshot hooks emit ArchiveResult, Snapshot, Tag, and BinaryRequest records.", - "emits": ["ArchiveResultEvent", "SnapshotEvent", "TagEvent", "BinaryRequestEvent", "ProcessEvent"], + "description": "Per-snapshot extraction phase. on_Snapshot hooks emit ArchiveResult, Snapshot, and Tag records.", + "emits": ["ArchiveResultEvent", "SnapshotEvent", "TagEvent", "ProcessEvent"], }, "SnapshotCleanupEvent": { "description": "Internal snapshot cleanup phase.", diff --git a/archivebox/config/configset.py b/archivebox/config/configset.py index d14e1bfc..da055632 100644 --- a/archivebox/config/configset.py +++ b/archivebox/config/configset.py @@ -267,19 +267,13 @@ def get_config( if crawl and hasattr(crawl, "output_dir"): config["CRAWL_OUTPUT_DIR"] = str(crawl.output_dir) config["CRAWL_DIR"] = str(crawl.output_dir) - config["CRAWL_ID"] = str(getattr(crawl, "id", "")) if getattr(crawl, "id", None) else config.get("CRAWL_ID") # Apply snapshot config overrides (highest priority) if snapshot and hasattr(snapshot, "config") and snapshot.config: config.update(snapshot.config) - if snapshot: - config["SNAPSHOT_ID"] = str(getattr(snapshot, "id", "")) if getattr(snapshot, "id", None) else config.get("SNAPSHOT_ID") - config["SNAPSHOT_DEPTH"] = int(getattr(snapshot, "depth", 0) or 0) - if hasattr(snapshot, "output_dir"): - config["SNAP_DIR"] = str(snapshot.output_dir) - if getattr(snapshot, "crawl_id", None): - config["CRAWL_ID"] = str(snapshot.crawl_id) + if snapshot and hasattr(snapshot, "output_dir"): + config["SNAP_DIR"] = str(snapshot.output_dir) # Normalize all aliases to canonical names (after all sources merged) # This handles aliases that came from user/crawl/snapshot configs, not just env diff --git a/archivebox/core/admin_archiveresults.py b/archivebox/core/admin_archiveresults.py index 8c4bc602..ba8affbb 100644 --- a/archivebox/core/admin_archiveresults.py +++ b/archivebox/core/admin_archiveresults.py @@ -38,8 +38,8 @@ def _quote_shell_string(value: str) -> str: def _get_replay_source_url(result: ArchiveResult) -> str: - process_env = getattr(getattr(result, "process", None), "env", None) or {} - return str(process_env.get("SOURCE_URL") or result.snapshot.url or "") + process = getattr(result, "process", None) + return str(getattr(process, "url", None) or result.snapshot.url or "") def build_abx_dl_display_command(result: ArchiveResult) -> str: diff --git a/archivebox/core/views.py b/archivebox/core/views.py index e356fea4..999ea5d6 100644 --- a/archivebox/core/views.py +++ b/archivebox/core/views.py @@ -1322,6 +1322,17 @@ def live_progress_view(request): # Build hierarchical active crawls with nested snapshots and archive results + active_crawls_qs = ( + Crawl.objects.filter(status__in=[Crawl.StatusChoices.QUEUED, Crawl.StatusChoices.STARTED]) + .prefetch_related( + "snapshot_set", + "snapshot_set__archiveresult_set", + "snapshot_set__archiveresult_set__process", + ) + .distinct() + .order_by("-modified_at")[:10] + ) + running_processes = Process.objects.filter( machine=machine, status=Process.StatusChoices.RUNNING, @@ -1343,28 +1354,45 @@ def live_progress_view(request): process_records_by_crawl: dict[str, list[tuple[dict[str, object], object | None]]] = {} process_records_by_snapshot: dict[str, list[tuple[dict[str, object], object | None]]] = {} seen_process_records: set[str] = set() + snapshots = [snapshot for crawl in active_crawls_qs for snapshot in crawl.snapshot_set.all()] for proc in running_processes: - env = proc.env or {} - if not isinstance(env, dict): - env = {} - - crawl_id = env.get("CRAWL_ID") - snapshot_id = env.get("SNAPSHOT_ID") + if not proc.pwd: + continue + proc_pwd = Path(proc.pwd) + matched_snapshot = None + for snapshot in snapshots: + try: + proc_pwd.relative_to(snapshot.output_dir) + matched_snapshot = snapshot + break + except ValueError: + continue + if matched_snapshot is None: + continue + crawl_id = str(matched_snapshot.crawl_id) + snapshot_id = str(matched_snapshot.id) _plugin, _label, phase, _hook_name = process_label(proc.cmd) if crawl_id and proc.pid: - crawl_process_pids.setdefault(str(crawl_id), proc.pid) + crawl_process_pids.setdefault(crawl_id, proc.pid) if phase == "snapshot" and snapshot_id and proc.pid: - snapshot_process_pids.setdefault(str(snapshot_id), proc.pid) + snapshot_process_pids.setdefault(snapshot_id, proc.pid) for proc in recent_processes: - env = proc.env or {} - if not isinstance(env, dict): - env = {} - - crawl_id = env.get("CRAWL_ID") - snapshot_id = env.get("SNAPSHOT_ID") - if not crawl_id and not snapshot_id: + if not proc.pwd: continue + proc_pwd = Path(proc.pwd) + matched_snapshot = None + for snapshot in snapshots: + try: + proc_pwd.relative_to(snapshot.output_dir) + matched_snapshot = snapshot + break + except ValueError: + continue + if matched_snapshot is None: + continue + crawl_id = str(matched_snapshot.crawl_id) + snapshot_id = str(matched_snapshot.id) plugin, label, phase, hook_name = process_label(proc.cmd) @@ -1393,20 +1421,9 @@ def live_progress_view(request): payload["pid"] = proc.pid proc_started_at = proc.started_at or proc.modified_at if phase == "snapshot" and snapshot_id: - process_records_by_snapshot.setdefault(str(snapshot_id), []).append((payload, proc_started_at)) + process_records_by_snapshot.setdefault(snapshot_id, []).append((payload, proc_started_at)) elif crawl_id: - process_records_by_crawl.setdefault(str(crawl_id), []).append((payload, proc_started_at)) - - active_crawls_qs = ( - Crawl.objects.filter(status__in=[Crawl.StatusChoices.QUEUED, Crawl.StatusChoices.STARTED]) - .prefetch_related( - "snapshot_set", - "snapshot_set__archiveresult_set", - "snapshot_set__archiveresult_set__process", - ) - .distinct() - .order_by("-modified_at")[:10] - ) + process_records_by_crawl.setdefault(crawl_id, []).append((payload, proc_started_at)) active_crawls = [] total_workers = 0 diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index d3487b89..4b5e58d5 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -827,7 +827,7 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith for record in records[:3]: print(f" Record: type={record.get('type')}, keys={list(record.keys())[:5]}") if system_task: - records = [record for record in records if record.get("type") in ("BinaryRequest", "Binary", "Machine")] + records = [record for record in records if record.get("type") in ("BinaryRequest", "Binary")] overrides = {"crawl": self} stats = process_hook_records(records, overrides=overrides) if stats: diff --git a/archivebox/hooks.py b/archivebox/hooks.py index 2ff94ba2..9817e268 100644 --- a/archivebox/hooks.py +++ b/archivebox/hooks.py @@ -13,13 +13,9 @@ Hook-backed event families are discovered from filenames like: on_CrawlSetup__* on_Snapshot__* -InstallEvent itself is still part of the runtime lifecycle, but it has no -corresponding hook family. Its dependency declarations come directly from each -plugin's `config.json > required_binaries`. - -Lifecycle event names like `InstallEvent` or `SnapshotCleanupEvent` are -normalized to the corresponding `on_{EventFamily}__*` prefix by a simple -string transform. If no scripts exist for that prefix, discovery returns `[]`. +Internal bus event names are normalized to the corresponding +`on_{EventFamily}__*` prefix by a simple string transform. If no scripts exist +for that prefix, discovery returns `[]`. Directory structure: abx_plugins/plugins//on___. (built-in package) @@ -120,7 +116,6 @@ def normalize_hook_event_name(event_name: str) -> str | None: Normalize a hook event family or event class name to its on_* prefix. Examples: - InstallEvent -> Install BinaryRequestEvent -> BinaryRequest CrawlSetupEvent -> CrawlSetup SnapshotEvent -> Snapshot @@ -171,7 +166,7 @@ def discover_hooks( Args: event_name: Hook event family or event class name. - Examples: 'Install', 'InstallEvent', 'BinaryRequestEvent', 'Snapshot'. + Examples: 'BinaryRequestEvent', 'Snapshot'. Event names are normalized by stripping a trailing `Event`. If no matching `on_{EventFamily}__*` scripts exist, returns []. filter_disabled: If True, skip hooks from disabled plugins (default: True) @@ -1070,9 +1065,8 @@ def process_hook_records(records: list[dict[str, Any]], overrides: dict[str, Any Process JSONL records emitted by hook stdout. This handles hook-emitted record types such as Snapshot, Tag, BinaryRequest, - Binary, and Machine. It does not process bus lifecycle events like - InstallEvent, CrawlEvent, CrawlCleanupEvent, or SnapshotCleanupEvent, since - those are not emitted as JSONL records by hook subprocesses. + and Binary. It does not process internal bus lifecycle events, since those + are not emitted as JSONL records by hook subprocesses. Args: records: List of JSONL record dicts from result['records'] @@ -1131,13 +1125,6 @@ def process_hook_records(records: list[dict[str, Any]], overrides: dict[str, Any if obj: stats[record_type] = stats.get(record_type, 0) + 1 - elif record_type == "Machine": - from archivebox.machine.models import Machine - - obj = Machine.from_json(record.copy(), overrides) - if obj: - stats["Machine"] = stats.get("Machine", 0) + 1 - else: import sys diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index 3479114d..1ffae6c9 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -566,33 +566,6 @@ class Binary(ModelWithHealthStats, ModelWithStateMachine): return None return {provider.strip() for provider in providers.split(",") if provider.strip()} - def _get_custom_install_command(self) -> str | None: - """Extract a custom install command from overrides when the custom provider is used.""" - import shlex - - if not isinstance(self.overrides, dict): - return None - - for key in ("custom_cmd", "cmd", "command"): - value = self.overrides.get(key) - if isinstance(value, str) and value.strip(): - return value.strip() - - custom_overrides = self.overrides.get("custom") - if isinstance(custom_overrides, dict): - for key in ("custom_cmd", "cmd", "command"): - value = custom_overrides.get(key) - if isinstance(value, str) and value.strip(): - return value.strip() - - install_args = custom_overrides.get("install_args") - if isinstance(install_args, str) and install_args.strip(): - return install_args.strip() - if isinstance(install_args, list) and install_args: - return " ".join(shlex.quote(str(arg)) for arg in install_args if str(arg).strip()) - - return None - def run(self): """ Execute binary installation by running on_BinaryRequest__* hooks. @@ -637,13 +610,8 @@ class Binary(ModelWithHealthStats, ModelWithStateMachine): plugin_output_dir = output_dir / plugin_name plugin_output_dir.mkdir(parents=True, exist_ok=True) - custom_cmd = None overrides_json = None - if plugin_name == "custom": - custom_cmd = self._get_custom_install_command() - if not custom_cmd: - continue - elif self.overrides: + if self.overrides: overrides_json = json.dumps(self.overrides) # Run the hook @@ -656,7 +624,6 @@ class Binary(ModelWithHealthStats, ModelWithStateMachine): machine_id=str(self.machine_id), name=self.name, binproviders=self.binproviders, - custom_cmd=custom_cmd, overrides=overrides_json, ) diff --git a/archivebox/services/archive_result_service.py b/archivebox/services/archive_result_service.py index cc0b8c10..2fe41359 100644 --- a/archivebox/services/archive_result_service.py +++ b/archivebox/services/archive_result_service.py @@ -9,12 +9,11 @@ from typing import Any from asgiref.sync import sync_to_async from django.utils import timezone -from abx_dl.events import ArchiveResultEvent, ProcessCompletedEvent +from abx_dl.events import ArchiveResultEvent, ProcessCompletedEvent, ProcessStartedEvent, SnapshotEvent from abx_dl.output_files import guess_mimetype from abx_dl.services.base import BaseService -from .db import run_db_op -from .process_service import ProcessService, parse_event_datetime +from .process_service import parse_event_datetime def _collect_output_metadata(plugin_dir: Path) -> tuple[dict[str, dict], int, str]: @@ -209,79 +208,41 @@ class ArchiveResultService(BaseService): LISTENS_TO = [ArchiveResultEvent, ProcessCompletedEvent] EMITS = [] - def __init__(self, bus, *, process_service: ProcessService): - self.process_service = process_service + def __init__(self, bus): super().__init__(bus) + self.bus.on(ArchiveResultEvent, self.on_ArchiveResultEvent__save_to_db) + self.bus.on(ProcessCompletedEvent, self.on_ProcessCompletedEvent__save_to_db) - async def on_ArchiveResultEvent__Outer(self, event: ArchiveResultEvent) -> None: - snapshot_output_dir = await run_db_op(self._get_snapshot_output_dir, event.snapshot_id) - if snapshot_output_dir is None: - return - plugin_dir = Path(snapshot_output_dir) / event.plugin - output_files, output_size, output_mimetypes = await sync_to_async(_resolve_output_metadata)(event.output_files, plugin_dir) - await run_db_op(self._project, event, output_files, output_size, output_mimetypes) - - async def on_ProcessCompletedEvent__Outer(self, event: ProcessCompletedEvent) -> None: - if not event.snapshot_id or not event.hook_name.startswith("on_Snapshot"): - return - - plugin_dir = Path(event.output_dir) - output_files, output_size, output_mimetypes = await sync_to_async(_resolve_output_metadata)(event.output_files, plugin_dir) - records = _iter_archiveresult_records(event.stdout) - if records: - for record in records: - await run_db_op( - self._project_from_process_completed, - event, - record, - output_files, - output_size, - output_mimetypes, - ) - return - - synthetic_record = { - "plugin": event.plugin_name, - "hook_name": event.hook_name, - "status": "failed" if event.exit_code != 0 else ("succeeded" if _has_content_files(event.output_files) else "skipped"), - "output_str": event.stderr if event.exit_code != 0 else "", - "error": event.stderr if event.exit_code != 0 else "", - } - await run_db_op( - self._project_from_process_completed, - event, - synthetic_record, - output_files, - output_size, - output_mimetypes, - ) - - def _get_snapshot_output_dir(self, snapshot_id: str) -> str | None: - from archivebox.core.models import Snapshot - - snapshot = Snapshot.objects.filter(id=snapshot_id).only("output_dir").first() - return str(snapshot.output_dir) if snapshot is not None else None - - def _project( - self, - event: ArchiveResultEvent, - output_files: dict[str, dict], - output_size: int, - output_mimetypes: str, - ) -> None: + async def on_ArchiveResultEvent__save_to_db(self, event: ArchiveResultEvent) -> None: from archivebox.core.models import ArchiveResult, Snapshot from archivebox.machine.models import Process - snapshot = Snapshot.objects.filter(id=event.snapshot_id).first() + snapshot = await Snapshot.objects.filter(id=event.snapshot_id).select_related("crawl", "crawl__created_by").afirst() if snapshot is None: return - + plugin_dir = Path(snapshot.output_dir) / event.plugin + output_files, output_size, output_mimetypes = await sync_to_async(_resolve_output_metadata)(event.output_files, plugin_dir) + process_started = await self.bus.find( + ProcessStartedEvent, + past=True, + future=False, + where=lambda candidate: self.bus.event_is_child_of(event, candidate), + ) process = None - db_process_id = self.process_service.get_db_process_id(event.process_id) - if db_process_id: - process = Process.objects.filter(id=db_process_id).first() + if process_started is not None: + started_at = parse_event_datetime(process_started.start_ts) + if started_at is None: + raise ValueError("ProcessStartedEvent.start_ts is required") + process_query = Process.objects.filter( + pwd=process_started.output_dir, + cmd=[process_started.hook_path, *process_started.hook_args], + started_at=started_at, + ) + if process_started.pid: + process_query = process_query.filter(pid=process_started.pid) + process = await process_query.order_by("-modified_at").afirst() - result, _created = ArchiveResult.objects.get_or_create( + result, _created = await ArchiveResult.objects.aget_or_create( snapshot=snapshot, plugin=event.plugin, hook_name=event.hook_name, @@ -302,32 +263,54 @@ class ArchiveResultService(BaseService): result.end_ts = parse_event_datetime(event.end_ts) or timezone.now() if event.error: result.notes = event.error - result.save() + await result.asave() next_title = _extract_snapshot_title(str(snapshot.output_dir), event.plugin, result.output_str, snapshot_url=snapshot.url) if next_title and _should_update_snapshot_title(snapshot.title or "", next_title, snapshot_url=snapshot.url): snapshot.title = next_title - snapshot.save(update_fields=["title", "modified_at"]) + await snapshot.asave(update_fields=["title", "modified_at"]) - def _project_from_process_completed( - self, - event: ProcessCompletedEvent, - record: dict, - output_files: dict[str, dict], - output_size: int, - output_mimetypes: str, - ) -> None: - archive_result_event = ArchiveResultEvent( - snapshot_id=record.get("snapshot_id") or event.snapshot_id, - plugin=record.get("plugin") or event.plugin_name, - hook_name=record.get("hook_name") or event.hook_name, - status=record.get("status") or "", - process_id=event.process_id, - output_str=record.get("output_str") or "", - output_json=record.get("output_json") if isinstance(record.get("output_json"), dict) else None, - output_files=event.output_files, - start_ts=event.start_ts, - end_ts=event.end_ts, - error=record.get("error") or (event.stderr if event.exit_code != 0 else ""), + async def on_ProcessCompletedEvent__save_to_db(self, event: ProcessCompletedEvent) -> None: + if not event.hook_name.startswith("on_Snapshot"): + return + snapshot_event = await self.bus.find( + SnapshotEvent, + past=True, + future=False, + where=lambda candidate: self.bus.event_is_child_of(event, candidate), + ) + if snapshot_event is None: + return + + records = _iter_archiveresult_records(event.stdout) + if records: + for record in records: + await self.bus.emit( + ArchiveResultEvent( + snapshot_id=record.get("snapshot_id") or snapshot_event.snapshot_id, + plugin=record.get("plugin") or event.plugin_name, + hook_name=record.get("hook_name") or event.hook_name, + status=record.get("status") or "", + output_str=record.get("output_str") or "", + output_json=record.get("output_json") if isinstance(record.get("output_json"), dict) else None, + output_files=event.output_files, + start_ts=event.start_ts, + end_ts=event.end_ts, + error=record.get("error") or (event.stderr if event.exit_code != 0 else ""), + ), + ) + return + + await self.bus.emit( + ArchiveResultEvent( + snapshot_id=snapshot_event.snapshot_id, + plugin=event.plugin_name, + hook_name=event.hook_name, + status="failed" if event.exit_code != 0 else ("succeeded" if _has_content_files(event.output_files) else "skipped"), + output_str=event.stderr if event.exit_code != 0 else "", + output_files=event.output_files, + start_ts=event.start_ts, + end_ts=event.end_ts, + error=event.stderr if event.exit_code != 0 else "", + ), ) - self._project(archive_result_event, output_files, output_size, output_mimetypes) diff --git a/archivebox/services/binary_service.py b/archivebox/services/binary_service.py index 184c2b9f..5b18e391 100644 --- a/archivebox/services/binary_service.py +++ b/archivebox/services/binary_service.py @@ -1,20 +1,62 @@ from __future__ import annotations -import asyncio +from asgiref.sync import sync_to_async from abx_dl.events import BinaryRequestEvent, BinaryEvent from abx_dl.services.base import BaseService -from .db import run_db_op - class BinaryService(BaseService): LISTENS_TO = [BinaryRequestEvent, BinaryEvent] EMITS = [] - async def on_BinaryRequestEvent__Outer(self, event: BinaryRequestEvent) -> None: - await run_db_op(self._project_binary, event) - cached = await run_db_op(self._load_cached_binary, event) + def __init__(self, bus): + super().__init__(bus) + self.bus.on(BinaryRequestEvent, self.on_BinaryRequestEvent) + self.bus.on(BinaryEvent, self.on_BinaryEvent) + + async def on_BinaryRequestEvent(self, event: BinaryRequestEvent) -> None: + from archivebox.machine.models import Binary, Machine + + machine = await sync_to_async(Machine.current, thread_sensitive=True)() + existing = await Binary.objects.filter(machine=machine, name=event.name).afirst() + if existing and existing.status == Binary.StatusChoices.INSTALLED: + changed = False + if event.binproviders and existing.binproviders != event.binproviders: + existing.binproviders = event.binproviders + changed = True + if event.overrides and existing.overrides != event.overrides: + existing.overrides = event.overrides + changed = True + if changed: + await existing.asave(update_fields=["binproviders", "overrides", "modified_at"]) + elif existing is None: + await Binary.objects.acreate( + machine=machine, + name=event.name, + binproviders=event.binproviders, + overrides=event.overrides or {}, + status=Binary.StatusChoices.QUEUED, + ) + + installed = ( + await Binary.objects.filter(machine=machine, name=event.name, status=Binary.StatusChoices.INSTALLED) + .exclude(abspath="") + .exclude(abspath__isnull=True) + .order_by("-modified_at") + .afirst() + ) + cached = None + if installed is not None: + cached = { + "abspath": installed.abspath, + "version": installed.version or "", + "sha256": installed.sha256 or "", + "binproviders": installed.binproviders or "", + "binprovider": installed.binprovider or "", + "machine_id": str(installed.machine_id), + "overrides": installed.overrides or {}, + } if cached is not None: await self.bus.emit( BinaryEvent( @@ -28,126 +70,34 @@ class BinaryService(BaseService): binprovider=cached["binprovider"], overrides=event.overrides or cached["overrides"], binary_id=event.binary_id, - machine_id=event.machine_id or cached["machine_id"], + machine_id=cached["machine_id"], ), ) - async def on_BinaryEvent__Outer(self, event: BinaryEvent) -> None: - resolved = await asyncio.to_thread(self._resolve_installed_binary_metadata, event) - await run_db_op(self._project_installed_binary, event, resolved) - - def _project_binary(self, event: BinaryRequestEvent) -> None: + async def on_BinaryEvent(self, event: BinaryEvent) -> None: from archivebox.machine.models import Binary, Machine - machine = Machine.current() - existing = Binary.objects.filter(machine=machine, name=event.name).first() - if existing and existing.status == Binary.StatusChoices.INSTALLED: - changed = False - if event.binproviders and existing.binproviders != event.binproviders: - existing.binproviders = event.binproviders - changed = True - if event.overrides and existing.overrides != event.overrides: - existing.overrides = event.overrides - changed = True - if changed: - existing.save(update_fields=["binproviders", "overrides", "modified_at"]) - return - - Binary.from_json( - { - "name": event.name, - "binproviders": event.binproviders, - "overrides": event.overrides or {}, - }, - ) - - def _load_cached_binary(self, event: BinaryRequestEvent) -> dict[str, str] | None: - from archivebox.machine.models import Binary, Machine - - machine = Machine.current() - installed = ( - Binary.objects.filter(machine=machine, name=event.name, status=Binary.StatusChoices.INSTALLED) - .exclude(abspath="") - .exclude(abspath__isnull=True) - .order_by("-modified_at") - .first() - ) - if installed is None: - return None - return { - "abspath": installed.abspath, - "version": installed.version or "", - "sha256": installed.sha256 or "", - "binproviders": installed.binproviders or "", - "binprovider": installed.binprovider or "", - "machine_id": str(installed.machine_id), - "overrides": installed.overrides or {}, - } - - def _resolve_installed_binary_metadata(self, event: BinaryEvent) -> dict[str, str]: - resolved = { - "abspath": event.abspath or "", - "version": event.version or "", - "sha256": event.sha256 or "", - "binproviders": event.binproviders or "", - "binprovider": event.binprovider or "", - } - if resolved["abspath"] and resolved["version"] and resolved["binprovider"]: - return resolved - - if resolved["abspath"] and not resolved["version"]: - try: - from abx_pkg.semver import bin_version - - detected_version = bin_version(resolved["abspath"]) - except Exception: - detected_version = None - if detected_version: - resolved["version"] = str(detected_version) - if resolved["version"] and resolved["binprovider"]: - return resolved - - try: - from abx_dl.dependencies import load_binary - - allowed_providers = resolved["binproviders"] or resolved["binprovider"] or "env,pip,npm,brew,apt" - spec = { - "name": event.name, - "binproviders": allowed_providers, - "overrides": event.overrides or {}, - } - binary = load_binary(spec) - resolved["abspath"] = str(binary.abspath or resolved["abspath"] or "") - resolved["version"] = str(binary.version or resolved["version"] or "") - resolved["sha256"] = str(binary.sha256 or resolved["sha256"] or "") - if binary.loaded_binprovider is not None and binary.loaded_binprovider.name: - resolved["binprovider"] = str(binary.loaded_binprovider.name) - except Exception: - pass - - return resolved - - def _project_installed_binary(self, event: BinaryEvent, resolved: dict[str, str]) -> None: - from archivebox.machine.models import Binary, Machine - - machine = Machine.current() - binary, _ = Binary.objects.get_or_create( + machine = await sync_to_async(Machine.current, thread_sensitive=True)() + binary, _ = await Binary.objects.aget_or_create( machine=machine, name=event.name, defaults={ "status": Binary.StatusChoices.QUEUED, }, ) - binary.abspath = resolved["abspath"] or binary.abspath - binary.version = resolved["version"] or binary.version - binary.sha256 = resolved["sha256"] or binary.sha256 - if resolved["binproviders"]: - binary.binproviders = resolved["binproviders"] - binary.binprovider = resolved["binprovider"] or binary.binprovider + binary.abspath = event.abspath + if event.version: + binary.version = event.version + if event.sha256: + binary.sha256 = event.sha256 + if event.binproviders: + binary.binproviders = event.binproviders + if event.binprovider: + binary.binprovider = event.binprovider if event.overrides and binary.overrides != event.overrides: binary.overrides = event.overrides binary.status = Binary.StatusChoices.INSTALLED binary.retry_at = None - binary.save( + await binary.asave( update_fields=["abspath", "version", "sha256", "binproviders", "binprovider", "overrides", "status", "retry_at", "modified_at"], ) diff --git a/archivebox/services/crawl_service.py b/archivebox/services/crawl_service.py index 1b5e314b..fd81f7e6 100644 --- a/archivebox/services/crawl_service.py +++ b/archivebox/services/crawl_service.py @@ -3,8 +3,6 @@ from __future__ import annotations from abx_dl.events import CrawlCleanupEvent, CrawlCompletedEvent, CrawlSetupEvent, CrawlStartEvent from abx_dl.services.base import BaseService -from .db import run_db_op - class CrawlService(BaseService): LISTENS_TO = [CrawlSetupEvent, CrawlStartEvent, CrawlCleanupEvent, CrawlCompletedEvent] @@ -13,32 +11,42 @@ class CrawlService(BaseService): def __init__(self, bus, *, crawl_id: str): self.crawl_id = crawl_id super().__init__(bus) + self.bus.on(CrawlSetupEvent, self.on_CrawlSetupEvent__save_to_db) + self.bus.on(CrawlStartEvent, self.on_CrawlStartEvent__save_to_db) + self.bus.on(CrawlCleanupEvent, self.on_CrawlCleanupEvent__save_to_db) + self.bus.on(CrawlCompletedEvent, self.on_CrawlCompletedEvent__save_to_db) - async def on_CrawlSetupEvent__Outer(self, event: CrawlSetupEvent) -> None: - await run_db_op(self._mark_started) - - async def on_CrawlStartEvent__Outer(self, event: CrawlStartEvent) -> None: - await run_db_op(self._mark_started) - - async def on_CrawlCleanupEvent__Outer(self, event: CrawlCleanupEvent) -> None: - await run_db_op(self._mark_started) - - async def on_CrawlCompletedEvent__Outer(self, event: CrawlCompletedEvent) -> None: - await run_db_op(self._mark_completed) - - def _mark_started(self) -> None: + async def on_CrawlSetupEvent__save_to_db(self, event: CrawlSetupEvent) -> None: from archivebox.crawls.models import Crawl - crawl = Crawl.objects.get(id=self.crawl_id) + crawl = await Crawl.objects.aget(id=self.crawl_id) if crawl.status != Crawl.StatusChoices.SEALED: crawl.status = Crawl.StatusChoices.STARTED crawl.retry_at = None - crawl.save(update_fields=["status", "retry_at", "modified_at"]) + await crawl.asave(update_fields=["status", "retry_at", "modified_at"]) - def _mark_completed(self) -> None: + async def on_CrawlStartEvent__save_to_db(self, event: CrawlStartEvent) -> None: from archivebox.crawls.models import Crawl - crawl = Crawl.objects.get(id=self.crawl_id) + crawl = await Crawl.objects.aget(id=self.crawl_id) + if crawl.status != Crawl.StatusChoices.SEALED: + crawl.status = Crawl.StatusChoices.STARTED + crawl.retry_at = None + await crawl.asave(update_fields=["status", "retry_at", "modified_at"]) + + async def on_CrawlCleanupEvent__save_to_db(self, event: CrawlCleanupEvent) -> None: + from archivebox.crawls.models import Crawl + + crawl = await Crawl.objects.aget(id=self.crawl_id) + if crawl.status != Crawl.StatusChoices.SEALED: + crawl.status = Crawl.StatusChoices.STARTED + crawl.retry_at = None + await crawl.asave(update_fields=["status", "retry_at", "modified_at"]) + + async def on_CrawlCompletedEvent__save_to_db(self, event: CrawlCompletedEvent) -> None: + from archivebox.crawls.models import Crawl + + crawl = await Crawl.objects.aget(id=self.crawl_id) crawl.status = Crawl.StatusChoices.SEALED crawl.retry_at = None - crawl.save(update_fields=["status", "retry_at", "modified_at"]) + await crawl.asave(update_fields=["status", "retry_at", "modified_at"]) diff --git a/archivebox/services/db.py b/archivebox/services/db.py deleted file mode 100644 index 0c8e542c..00000000 --- a/archivebox/services/db.py +++ /dev/null @@ -1,16 +0,0 @@ -from __future__ import annotations - -from asgiref.sync import sync_to_async -from django.db import close_old_connections - - -def _run_db_op(func, *args, **kwargs): - close_old_connections() - try: - return func(*args, **kwargs) - finally: - close_old_connections() - - -async def run_db_op(func, *args, **kwargs): - return await sync_to_async(_run_db_op, thread_sensitive=True)(func, *args, **kwargs) diff --git a/archivebox/services/machine_service.py b/archivebox/services/machine_service.py index e90975ad..f451ab36 100644 --- a/archivebox/services/machine_service.py +++ b/archivebox/services/machine_service.py @@ -1,22 +1,23 @@ from __future__ import annotations +from asgiref.sync import sync_to_async + from abx_dl.events import MachineEvent from abx_dl.services.base import BaseService -from .db import run_db_op - class MachineService(BaseService): LISTENS_TO = [MachineEvent] EMITS = [] - async def on_MachineEvent__Outer(self, event: MachineEvent) -> None: - await run_db_op(self._project, event) + def __init__(self, bus): + super().__init__(bus) + self.bus.on(MachineEvent, self.on_MachineEvent__save_to_db) - def _project(self, event: MachineEvent) -> None: + async def on_MachineEvent__save_to_db(self, event: MachineEvent) -> None: from archivebox.machine.models import Machine, _sanitize_machine_config - machine = Machine.current() + machine = await sync_to_async(Machine.current, thread_sensitive=True)() config = dict(machine.config or {}) if event.config is not None: @@ -29,4 +30,4 @@ class MachineService(BaseService): return machine.config = _sanitize_machine_config(config) - machine.save(update_fields=["config", "modified_at"]) + await machine.asave(update_fields=["config", "modified_at"]) diff --git a/archivebox/services/process_service.py b/archivebox/services/process_service.py index 21547b19..cdcb9bbf 100644 --- a/archivebox/services/process_service.py +++ b/archivebox/services/process_service.py @@ -1,29 +1,15 @@ from __future__ import annotations -import asyncio -from datetime import datetime, timezone as datetime_timezone -import json -from pathlib import Path -import shlex -import socket -import time -from typing import TYPE_CHECKING, Any, ClassVar -from urllib.parse import urlparse +from datetime import datetime +from typing import ClassVar +from asgiref.sync import sync_to_async from django.utils import timezone from abxbus import BaseEvent -from abx_dl.events import ProcessCompletedEvent, ProcessEvent, ProcessStartedEvent, ProcessStdoutEvent +from abx_dl.events import ProcessCompletedEvent, ProcessStartedEvent from abx_dl.services.base import BaseService -from .db import run_db_op - -if TYPE_CHECKING: - from archivebox.machine.models import Process - - -WORKER_READY_TIMEOUT = 10.0 - def parse_event_datetime(value: str | None): if not value: @@ -37,308 +23,133 @@ def parse_event_datetime(value: str | None): return dt -def _is_port_listening(host: str, port: int) -> bool: - if not host or not port: - return False - try: - with socket.create_connection((host, port), timeout=0.5): - return True - except OSError: - return False - - -def _worker_socket_from_url(url: str) -> tuple[str, int] | None: - if not url: - return None - parsed = urlparse(url) - if parsed.scheme != "tcp" or not parsed.hostname or not parsed.port: - return None - return parsed.hostname, parsed.port - - -def _supervisor_env(env: dict[str, str]) -> str: - pairs = [] - for key, value in env.items(): - escaped = value.replace('"', '\\"') - pairs.append(f'{key}="{escaped}"') - return ",".join(pairs) - - -def _iso_from_epoch(value: object) -> str: - if not isinstance(value, (int, float)) or value <= 0: - return "" - return datetime.fromtimestamp(value, tz=datetime_timezone.utc).isoformat() - - -def _int_from_object(value: object) -> int: - if isinstance(value, bool): - return int(value) - if isinstance(value, int): - return value - if isinstance(value, float): - return int(value) - if isinstance(value, str): - try: - return int(value) - except ValueError: - return 0 - return 0 - - -def _ensure_worker(process_event: ProcessEvent) -> dict[str, object]: - from archivebox.workers.supervisord_util import get_or_create_supervisord_process, get_worker, start_worker - - output_dir = Path(process_event.output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - worker_name = process_event.hook_name - supervisor = get_or_create_supervisord_process(daemonize=True) - worker_socket = _worker_socket_from_url(getattr(process_event, "url", "")) - - existing = get_worker(supervisor, worker_name) - if ( - isinstance(existing, dict) - and existing.get("statename") == "RUNNING" - and (worker_socket is None or _is_port_listening(*worker_socket)) - ): - return existing - - daemon = { - "name": worker_name, - "command": shlex.join([process_event.hook_path, *process_event.hook_args]), - "directory": str(output_dir), - "autostart": "false", - "autorestart": "true", - "stdout_logfile": str(output_dir / f"{worker_name}.stdout.log"), - "redirect_stderr": "true", - } - if process_event.env: - daemon["environment"] = _supervisor_env(process_event.env) - - proc = start_worker(supervisor, daemon) - deadline = time.monotonic() + WORKER_READY_TIMEOUT - while time.monotonic() < deadline: - current = get_worker(supervisor, worker_name) - if isinstance(current, dict) and current.get("statename") == "RUNNING": - if worker_socket is None or _is_port_listening(*worker_socket): - return current - time.sleep(0.1) - return proc if isinstance(proc, dict) else {} - - class ProcessService(BaseService): - LISTENS_TO: ClassVar[list[type[BaseEvent]]] = [ProcessStdoutEvent, ProcessStartedEvent, ProcessCompletedEvent] - EMITS: ClassVar[list[type[BaseEvent]]] = [ProcessEvent, ProcessStartedEvent, ProcessCompletedEvent] + LISTENS_TO: ClassVar[list[type[BaseEvent]]] = [ProcessStartedEvent, ProcessCompletedEvent] + EMITS: ClassVar[list[type[BaseEvent]]] = [] def __init__(self, bus): - self.process_ids: dict[str, str] = {} super().__init__(bus) + self.bus.on(ProcessStartedEvent, self.on_ProcessStartedEvent__save_to_db) + self.bus.on(ProcessCompletedEvent, self.on_ProcessCompletedEvent__save_to_db) - async def on_ProcessStdoutEvent(self, event: ProcessStdoutEvent) -> None: - try: - record = json.loads(event.line) - except (json.JSONDecodeError, ValueError): - return - if not isinstance(record, dict) or record.get("type") != "ProcessEvent": - return - - passthrough_fields: dict[str, Any] = { - key: value - for key, value in record.items() - if key - not in { - "type", - "plugin_name", - "hook_name", - "hook_path", - "hook_args", - "is_background", - "output_dir", - "env", - "snapshot_id", - "process_id", - "url", - "timeout", - "daemon", - "process_type", - "worker_type", - "event_timeout", - "event_handler_timeout", - } - } - process_event = ProcessEvent( - plugin_name=record.get("plugin_name") or event.plugin_name, - hook_name=record.get("hook_name") or "process", - hook_path=record["hook_path"], - hook_args=[str(arg) for arg in record.get("hook_args", [])], - is_background=bool(record.get("is_background", True)), - output_dir=record.get("output_dir") or event.output_dir, - env={str(key): str(value) for key, value in (record.get("env") or {}).items()}, - snapshot_id=record.get("snapshot_id") or event.snapshot_id, - timeout=int(record.get("timeout") or 60), - daemon=bool(record.get("daemon", False)), - url=str(record.get("url") or ""), - process_type=str(record.get("process_type") or ""), - worker_type=str(record.get("worker_type") or ""), - event_timeout=float(record.get("event_timeout") or 360.0), - event_handler_timeout=float(record.get("event_handler_timeout") or 390.0), - **passthrough_fields, - ) - if not process_event.daemon: - await self.bus.emit(process_event) - return - - proc = await asyncio.to_thread(_ensure_worker, process_event) - process_id = str(record.get("process_id") or f"worker:{process_event.hook_name}") - start_ts = _iso_from_epoch(proc.get("start")) - pid = _int_from_object(proc.get("pid")) - statename = str(proc.get("statename") or "") - exitstatus = _int_from_object(proc.get("exitstatus")) - process_type = process_event.process_type or "worker" - worker_type = process_event.worker_type or process_event.plugin_name - - if statename == "RUNNING" and pid: - await self.bus.emit( - ProcessStartedEvent( - plugin_name=process_event.plugin_name, - hook_name=process_event.hook_name, - hook_path=process_event.hook_path, - hook_args=process_event.hook_args, - output_dir=process_event.output_dir, - env=process_event.env, - timeout=process_event.timeout, - pid=pid, - process_id=process_id, - snapshot_id=process_event.snapshot_id, - is_background=True, - url=process_event.url, - process_type=process_type, - worker_type=worker_type, - start_ts=start_ts, - **passthrough_fields, - ), - ) - return - - stderr = ( - f"Worker {process_event.hook_name} failed to start" - if not statename - else f"Worker {process_event.hook_name} state={statename} exitstatus={exitstatus}" - ) - await self.bus.emit( - ProcessCompletedEvent( - plugin_name=process_event.plugin_name, - hook_name=process_event.hook_name, - hook_path=process_event.hook_path, - hook_args=process_event.hook_args, - env=process_event.env, - stdout="", - stderr=stderr, - exit_code=exitstatus or 1, - output_dir=process_event.output_dir, - is_background=True, - process_id=process_id, - snapshot_id=process_event.snapshot_id, - pid=pid, - url=process_event.url, - process_type=process_type, - worker_type=worker_type, - start_ts=start_ts, - end_ts=datetime.now(tz=datetime_timezone.utc).isoformat(), - **passthrough_fields, - ), - ) - raise RuntimeError(stderr) - - async def on_ProcessStartedEvent__Outer(self, event: ProcessStartedEvent) -> None: - await run_db_op(self._project_started, event) - - async def on_ProcessCompletedEvent__Outer(self, event: ProcessCompletedEvent) -> None: - await run_db_op(self._project_completed, event) - - def get_db_process_id(self, process_id: str) -> str | None: - return self.process_ids.get(process_id) - - def _get_or_create_process(self, event: ProcessStartedEvent | ProcessCompletedEvent) -> Process: + async def on_ProcessStartedEvent__save_to_db(self, event: ProcessStartedEvent) -> None: from archivebox.machine.models import NetworkInterface, Process - db_process_id = self.process_ids.get(event.process_id) - iface = NetworkInterface.current(refresh=True) - if db_process_id: - process = Process.objects.filter(id=db_process_id).first() - if process is not None: - if getattr(process, "iface_id", None) != iface.id or process.machine_id != iface.machine_id: - process.iface = iface - process.machine = iface.machine - process.save(update_fields=["iface", "machine", "modified_at"]) - return process - - process_type = getattr(event, "process_type", "") or ( + iface = await sync_to_async(NetworkInterface.current, thread_sensitive=True)(refresh=True) + process_type = event.process_type or ( Process.TypeChoices.BINARY if event.hook_name.startswith("on_BinaryRequest") else Process.TypeChoices.HOOK ) - worker_type = getattr(event, "worker_type", "") or "" - if process_type == Process.TypeChoices.WORKER and worker_type: - existing = ( - Process.objects.filter( - process_type=Process.TypeChoices.WORKER, - worker_type=worker_type, - pwd=event.output_dir, - ) - .order_by("-modified_at") - .first() - ) - if existing is not None: - self.process_ids[event.process_id] = str(existing.id) - return existing - process = Process.objects.create( - machine=iface.machine, - iface=iface, + worker_type = event.worker_type or "" + started_at = parse_event_datetime(event.start_ts) + if started_at is None: + raise ValueError("ProcessStartedEvent.start_ts is required") + process_query = Process.objects.filter( process_type=process_type, worker_type=worker_type, pwd=event.output_dir, cmd=[event.hook_path, *event.hook_args], - env=event.env, - timeout=getattr(event, "timeout", 60), - pid=event.pid or None, - url=getattr(event, "url", "") or None, - started_at=parse_event_datetime(getattr(event, "start_ts", "")), - status=Process.StatusChoices.RUNNING, - retry_at=None, + started_at=started_at, ) - self.process_ids[event.process_id] = str(process.id) - return process + if event.pid: + process_query = process_query.filter(pid=event.pid) + process = await process_query.order_by("-modified_at").afirst() + if process is None: + process = await Process.objects.acreate( + machine=iface.machine, + iface=iface, + process_type=process_type, + worker_type=worker_type, + pwd=event.output_dir, + cmd=[event.hook_path, *event.hook_args], + env=event.env, + timeout=event.timeout, + pid=event.pid or None, + url=event.url or None, + started_at=started_at, + status=Process.StatusChoices.RUNNING, + retry_at=None, + ) + elif process.iface_id != iface.id or process.machine_id != iface.machine_id: + process.iface = iface + process.machine = iface.machine + await process.asave(update_fields=["iface", "machine", "modified_at"]) - def _project_started(self, event: ProcessStartedEvent) -> None: - process = self._get_or_create_process(event) process.pwd = event.output_dir process.cmd = [event.hook_path, *event.hook_args] process.env = event.env process.timeout = event.timeout process.pid = event.pid or None - process.url = getattr(event, "url", "") or process.url - process.process_type = getattr(event, "process_type", "") or process.process_type - process.worker_type = getattr(event, "worker_type", "") or process.worker_type - process.started_at = parse_event_datetime(event.start_ts) or process.started_at or timezone.now() + process.url = event.url or process.url + process.process_type = process_type or process.process_type + process.worker_type = worker_type or process.worker_type + process.started_at = started_at process.status = process.StatusChoices.RUNNING process.retry_at = None - process.hydrate_binary_from_context(plugin_name=event.plugin_name, hook_path=event.hook_path) - process.save() + await sync_to_async(process.hydrate_binary_from_context, thread_sensitive=True)( + plugin_name=event.plugin_name, + hook_path=event.hook_path, + ) + await process.asave() + + async def on_ProcessCompletedEvent__save_to_db(self, event: ProcessCompletedEvent) -> None: + from archivebox.machine.models import NetworkInterface, Process + + iface = await sync_to_async(NetworkInterface.current, thread_sensitive=True)(refresh=True) + process_type = event.process_type or ( + Process.TypeChoices.BINARY if event.hook_name.startswith("on_BinaryRequest") else Process.TypeChoices.HOOK + ) + worker_type = event.worker_type or "" + started_at = parse_event_datetime(event.start_ts) + if started_at is None: + raise ValueError("ProcessCompletedEvent.start_ts is required") + process_query = Process.objects.filter( + process_type=process_type, + worker_type=worker_type, + pwd=event.output_dir, + cmd=[event.hook_path, *event.hook_args], + started_at=started_at, + ) + if event.pid: + process_query = process_query.filter(pid=event.pid) + process = await process_query.order_by("-modified_at").afirst() + if process is None: + process = await Process.objects.acreate( + machine=iface.machine, + iface=iface, + process_type=process_type, + worker_type=worker_type, + pwd=event.output_dir, + cmd=[event.hook_path, *event.hook_args], + env=event.env, + timeout=event.timeout, + pid=event.pid or None, + url=event.url or None, + started_at=started_at, + status=Process.StatusChoices.RUNNING, + retry_at=None, + ) + elif process.iface_id != iface.id or process.machine_id != iface.machine_id: + process.iface = iface + process.machine = iface.machine + await process.asave(update_fields=["iface", "machine", "modified_at"]) - def _project_completed(self, event: ProcessCompletedEvent) -> None: - process = self._get_or_create_process(event) process.pwd = event.output_dir if not process.cmd: process.cmd = [event.hook_path, *event.hook_args] process.env = event.env process.pid = event.pid or process.pid - process.url = getattr(event, "url", "") or process.url - process.process_type = getattr(event, "process_type", "") or process.process_type - process.worker_type = getattr(event, "worker_type", "") or process.worker_type - process.started_at = parse_event_datetime(event.start_ts) or process.started_at + process.url = event.url or process.url + process.process_type = process_type or process.process_type + process.worker_type = worker_type or process.worker_type + process.started_at = started_at process.ended_at = parse_event_datetime(event.end_ts) or timezone.now() process.stdout = event.stdout process.stderr = event.stderr process.exit_code = event.exit_code process.status = process.StatusChoices.EXITED process.retry_at = None - process.hydrate_binary_from_context(plugin_name=event.plugin_name, hook_path=event.hook_path) - process.save() + await sync_to_async(process.hydrate_binary_from_context, thread_sensitive=True)( + plugin_name=event.plugin_name, + hook_path=event.hook_path, + ) + await process.asave() diff --git a/archivebox/services/runner.py b/archivebox/services/runner.py index fdcb15cf..3db9f8f5 100644 --- a/archivebox/services/runner.py +++ b/archivebox/services/runner.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio import json import os -import re import shutil import subprocess import sys @@ -13,12 +12,13 @@ from pathlib import Path from tempfile import TemporaryDirectory from typing import Any +from asgiref.sync import sync_to_async from django.utils import timezone from rich.console import Console from abx_dl.events import BinaryRequestEvent from abx_dl.limits import CrawlLimitState -from abx_dl.models import Plugin, Snapshot as AbxSnapshot, discover_plugins, filter_plugins +from abx_dl.models import Plugin, discover_plugins, filter_plugins from abx_dl.orchestrator import ( create_bus, download, @@ -40,150 +40,9 @@ def _bus_name(prefix: str, identifier: str) -> str: return f"{prefix}_{normalized}" -def _selected_plugins_from_config(config: dict[str, Any]) -> list[str] | None: - raw = str(config.get("PLUGINS") or "").strip() - if not raw: - return None - return [name.strip() for name in raw.split(",") if name.strip()] - - def _count_selected_hooks(plugins: dict[str, Plugin], selected_plugins: list[str] | None) -> int: selected = filter_plugins(plugins, selected_plugins) if selected_plugins else plugins - return sum( - 1 - for plugin in selected.values() - for hook in plugin.hooks - if "Install" in hook.name or "CrawlSetup" in hook.name or "Snapshot" in hook.name - ) - - -_TEMPLATE_NAME_RE = re.compile(r"^\{([A-Z0-9_]+)\}$") - - -def _binary_config_keys_for_plugins(plugins: dict[str, Plugin], binary_name: str, config: dict[str, Any]) -> list[str]: - keys: list[str] = [] - - for plugin in plugins.values(): - for spec in plugin.binaries: - template_name = str(spec.get("name") or "").strip() - match = _TEMPLATE_NAME_RE.fullmatch(template_name) - if match is None: - continue - key = match.group(1) - configured_value = config.get(key) - if configured_value is not None and str(configured_value).strip() == binary_name: - keys.append(key) - for key, prop in plugin.config_schema.items(): - if key.endswith("_BINARY") and prop.get("default") == binary_name: - keys.append(key) - - return list(dict.fromkeys(keys)) - - -def _installed_binary_config_overrides(plugins: dict[str, Plugin], config: dict[str, Any] | None = None) -> dict[str, str]: - from archivebox.machine.models import Binary, Machine - - machine = Machine.current() - active_config = dict(config or {}) - overrides: dict[str, str] = {} - shared_lib_dir: Path | None = None - pip_home: Path | None = None - pip_bin_dir: Path | None = None - npm_home: Path | None = None - node_modules_dir: Path | None = None - npm_bin_dir: Path | None = None - binaries = ( - Binary.objects.filter(machine=machine, status=Binary.StatusChoices.INSTALLED).exclude(abspath="").exclude(abspath__isnull=True) - ) - - for binary in binaries: - try: - resolved_path = Path(binary.abspath).expanduser() - except (TypeError, ValueError): - continue - if not resolved_path.is_file() or not os.access(resolved_path, os.X_OK): - continue - for key in _binary_config_keys_for_plugins(plugins, binary.name, active_config): - overrides[key] = binary.abspath - - if resolved_path.parent.name == ".bin" and resolved_path.parent.parent.name == "node_modules": - npm_bin_dir = npm_bin_dir or resolved_path.parent - node_modules_dir = node_modules_dir or resolved_path.parent.parent - npm_home = npm_home or resolved_path.parent.parent.parent - shared_lib_dir = shared_lib_dir or resolved_path.parent.parent.parent.parent - elif ( - resolved_path.parent.name == "bin" - and resolved_path.parent.parent.name == "venv" - and resolved_path.parent.parent.parent.name == "pip" - ): - pip_bin_dir = pip_bin_dir or resolved_path.parent - pip_home = pip_home or resolved_path.parent.parent.parent - shared_lib_dir = shared_lib_dir or resolved_path.parent.parent.parent.parent - - if shared_lib_dir is not None: - overrides["LIB_DIR"] = str(shared_lib_dir) - overrides["LIB_BIN_DIR"] = str(shared_lib_dir / "bin") - if pip_home is not None: - overrides["PIP_HOME"] = str(pip_home) - if pip_bin_dir is not None: - overrides["PIP_BIN_DIR"] = str(pip_bin_dir) - if npm_home is not None: - overrides["NPM_HOME"] = str(npm_home) - if node_modules_dir is not None: - overrides["NODE_MODULES_DIR"] = str(node_modules_dir) - overrides["NODE_MODULE_DIR"] = str(node_modules_dir) - overrides["NODE_PATH"] = str(node_modules_dir) - if npm_bin_dir is not None: - overrides["NPM_BIN_DIR"] = str(npm_bin_dir) - - return overrides - - -def _limit_stop_reason(config: dict[str, Any]) -> str: - return CrawlLimitState.from_config(config).get_stop_reason() - - -def _attach_bus_trace(bus) -> None: - trace_target = (os.environ.get("ARCHIVEBOX_BUS_TRACE") or "").strip() - if not trace_target: - return - if getattr(bus, "_archivebox_trace_task", None) is not None: - return - - trace_path = None if trace_target in {"1", "-", "stderr"} else Path(trace_target) - stop_event = asyncio.Event() - - async def trace_loop() -> None: - seen_event_ids: set[str] = set() - while not stop_event.is_set(): - for event_id, event in list(bus.event_history.items()): - if event_id in seen_event_ids: - continue - seen_event_ids.add(event_id) - payload = event.model_dump(mode="json") - payload["bus_name"] = bus.name - line = json.dumps(payload, ensure_ascii=False, default=str, separators=(",", ":")) - if trace_path is None: - print(line, file=sys.stderr, flush=True) - else: - trace_path.parent.mkdir(parents=True, exist_ok=True) - with trace_path.open("a", encoding="utf-8") as handle: - handle.write(line + "\n") - await asyncio.sleep(0.05) - - bus._archivebox_trace_stop = stop_event - bus._archivebox_trace_task = asyncio.create_task(trace_loop()) - - -async def _stop_bus_trace(bus) -> None: - stop_event = getattr(bus, "_archivebox_trace_stop", None) - trace_task = getattr(bus, "_archivebox_trace_task", None) - if stop_event is None or trace_task is None: - return - stop_event.set() - await asyncio.gather(trace_task, return_exceptions=True) - bus._archivebox_trace_stop = None - bus._archivebox_trace_task = None + return sum(1 for plugin in selected.values() for hook in plugin.hooks if "CrawlSetup" in hook.name or "Snapshot" in hook.name) def ensure_background_runner(*, allow_under_pytest: bool = False) -> bool: @@ -235,22 +94,25 @@ class CrawlRunner: self.crawl = crawl self.bus = create_bus(name=_bus_name("ArchiveBox", str(crawl.id)), total_timeout=3600.0) self.plugins = discover_plugins() - self.process_service = ProcessService(self.bus) - self.binary_service = BinaryService(self.bus) - self.tag_service = TagService(self.bus) - self.crawl_service = CrawlService(self.bus, crawl_id=str(crawl.id)) + ProcessService(self.bus) + BinaryService(self.bus) + TagService(self.bus) + CrawlService(self.bus, crawl_id=str(crawl.id)) self.process_discovered_snapshots_inline = process_discovered_snapshots_inline - self.snapshot_service = SnapshotService( + + async def ignore_snapshot(_snapshot_id: str) -> None: + return None + + SnapshotService( self.bus, crawl_id=str(crawl.id), - schedule_snapshot=self.enqueue_snapshot if process_discovered_snapshots_inline else self.leave_snapshot_queued, + schedule_snapshot=self.enqueue_snapshot if process_discovered_snapshots_inline else ignore_snapshot, ) - self.archive_result_service = ArchiveResultService(self.bus, process_service=self.process_service) + ArchiveResultService(self.bus) self.selected_plugins = selected_plugins self.initial_snapshot_ids = snapshot_ids self.snapshot_tasks: dict[str, asyncio.Task[None]] = {} self.snapshot_semaphore = asyncio.Semaphore(self.MAX_CONCURRENT_SNAPSHOTS) - self.abx_services = None self.persona = None self.base_config: dict[str, Any] = {} self.derived_config: dict[str, Any] = {} @@ -258,15 +120,11 @@ class CrawlRunner: self._live_stream = None async def run(self) -> None: - from asgiref.sync import sync_to_async - from archivebox.crawls.models import Crawl - try: - await sync_to_async(self._prepare, thread_sensitive=True)() + snapshot_ids = await sync_to_async(self.load_run_state, thread_sensitive=True)() live_ui = self._create_live_ui() with live_ui if live_ui is not None else nullcontext(): - _attach_bus_trace(self.bus) - self.abx_services = setup_abx_services( + setup_abx_services( self.bus, plugins=self.plugins, config_overrides={ @@ -278,18 +136,14 @@ class CrawlRunner: auto_install=True, emit_jsonl=False, ) - snapshot_ids = await sync_to_async(self._initial_snapshot_ids, thread_sensitive=True)() if snapshot_ids: root_snapshot_id = snapshot_ids[0] - await self._run_crawl_setup(root_snapshot_id) + await self.run_crawl_setup(root_snapshot_id) for snapshot_id in snapshot_ids: await self.enqueue_snapshot(snapshot_id) - await self._wait_for_snapshot_tasks() - await self._run_crawl_cleanup(root_snapshot_id) - if self.abx_services is not None: - await self.abx_services.process.wait_for_background_monitors() + await self.wait_for_snapshot_tasks() + await self.run_crawl_cleanup(root_snapshot_id) finally: - await _stop_bus_trace(self.bus) await self.bus.stop() if self._live_stream is not None: try: @@ -297,33 +151,16 @@ class CrawlRunner: except Exception: pass self._live_stream = None - await sync_to_async(self._cleanup_persona, thread_sensitive=True)() - crawl = await sync_to_async(Crawl.objects.get, thread_sensitive=True)(id=self.crawl.id) - crawl_is_finished = await sync_to_async(crawl.is_finished, thread_sensitive=True)() - if crawl_is_finished: - if crawl.status != Crawl.StatusChoices.SEALED: - crawl.status = Crawl.StatusChoices.SEALED - crawl.retry_at = None - await sync_to_async(crawl.save, thread_sensitive=True)(update_fields=["status", "retry_at", "modified_at"]) - else: - if crawl.status == Crawl.StatusChoices.SEALED: - crawl.status = Crawl.StatusChoices.QUEUED - elif crawl.status != Crawl.StatusChoices.STARTED: - crawl.status = Crawl.StatusChoices.STARTED - crawl.retry_at = crawl.retry_at or timezone.now() - await sync_to_async(crawl.save, thread_sensitive=True)(update_fields=["status", "retry_at", "modified_at"]) + await sync_to_async(self.finalize_run_state, thread_sensitive=True)() async def enqueue_snapshot(self, snapshot_id: str) -> None: task = self.snapshot_tasks.get(snapshot_id) if task is not None and not task.done(): return - task = asyncio.create_task(self._run_snapshot(snapshot_id)) + task = asyncio.create_task(self.run_snapshot(snapshot_id)) self.snapshot_tasks[snapshot_id] = task - async def leave_snapshot_queued(self, snapshot_id: str) -> None: - return None - - async def _wait_for_snapshot_tasks(self) -> None: + async def wait_for_snapshot_tasks(self) -> None: while True: pending_tasks: list[asyncio.Task[None]] = [] for snapshot_id, task in list(self.snapshot_tasks.items()): @@ -339,9 +176,9 @@ class CrawlRunner: for task in done: task.result() - def _prepare(self) -> None: + def load_run_state(self) -> list[str]: from archivebox.config.configset import get_config - from archivebox.machine.models import NetworkInterface, Process + from archivebox.machine.models import Machine, NetworkInterface, Process self.primary_url = self.crawl.get_urls_list()[0] if self.crawl.get_urls_list() else "" current_iface = NetworkInterface.current(refresh=True) @@ -352,17 +189,42 @@ class CrawlRunner: current_process.save(update_fields=["iface", "machine", "modified_at"]) self.persona = self.crawl.resolve_persona() self.base_config = get_config(crawl=self.crawl) - self.derived_config = _installed_binary_config_overrides(self.plugins, self.base_config) + self.derived_config = dict(Machine.current().config) self.base_config["ABX_RUNTIME"] = "archivebox" if self.selected_plugins is None: - self.selected_plugins = _selected_plugins_from_config(self.base_config) + raw_plugins = self.base_config["PLUGINS"].strip() + self.selected_plugins = [name.strip() for name in raw_plugins.split(",") if name.strip()] if raw_plugins else None if self.persona: - chrome_binary = str(self.base_config.get("CHROME_BINARY") or "") - self.base_config.update(self.persona.prepare_runtime_for_crawl(self.crawl, chrome_binary=chrome_binary)) + self.base_config.update( + self.persona.prepare_runtime_for_crawl( + self.crawl, + chrome_binary=self.base_config["CHROME_BINARY"], + ), + ) + if self.initial_snapshot_ids: + return [str(snapshot_id) for snapshot_id in self.initial_snapshot_ids] + created = self.crawl.create_snapshots_from_urls() + snapshots = created or list(self.crawl.snapshot_set.filter(depth=0).order_by("created_at")) + return [str(snapshot.id) for snapshot in snapshots] + + def finalize_run_state(self) -> None: + from archivebox.crawls.models import Crawl - def _cleanup_persona(self) -> None: if self.persona: self.persona.cleanup_runtime_for_crawl(self.crawl) + crawl = Crawl.objects.get(id=self.crawl.id) + if crawl.is_finished(): + if crawl.status != Crawl.StatusChoices.SEALED: + crawl.status = Crawl.StatusChoices.SEALED + crawl.retry_at = None + crawl.save(update_fields=["status", "retry_at", "modified_at"]) + return + if crawl.status == Crawl.StatusChoices.SEALED: + crawl.status = Crawl.StatusChoices.QUEUED + elif crawl.status != Crawl.StatusChoices.STARTED: + crawl.status = Crawl.StatusChoices.STARTED + crawl.retry_at = crawl.retry_at or timezone.now() + crawl.save(update_fields=["status", "retry_at", "modified_at"]) def _create_live_ui(self) -> LiveBusUI | None: stdout_is_tty = sys.stdout.isatty() @@ -373,7 +235,7 @@ class CrawlRunner: stream = sys.stderr if stderr_is_tty else sys.stdout if os.path.exists("/dev/tty"): try: - self._live_stream = open("/dev/tty", "w", buffering=1, encoding=getattr(stream, "encoding", None) or "utf-8") + self._live_stream = open("/dev/tty", "w", buffering=1, encoding=stream.encoding or "utf-8") stream = self._live_stream except OSError: self._live_stream = None @@ -399,7 +261,7 @@ class CrawlRunner: live_ui = LiveBusUI( self.bus, total_hooks=_count_selected_hooks(self.plugins, self.selected_plugins), - timeout_seconds=int(self.base_config.get("TIMEOUT") or 60), + timeout_seconds=self.base_config["TIMEOUT"], ui_console=ui_console, interactive_tty=True, ) @@ -410,128 +272,24 @@ class CrawlRunner: ) return live_ui - def _create_root_snapshots(self) -> list[str]: - created = self.crawl.create_snapshots_from_urls() - snapshots = created or list(self.crawl.snapshot_set.filter(depth=0).order_by("created_at")) - return [str(snapshot.id) for snapshot in snapshots] - - def _initial_snapshot_ids(self) -> list[str]: - if self.initial_snapshot_ids: - return [str(snapshot_id) for snapshot_id in self.initial_snapshot_ids] - return self._create_root_snapshots() - - def _snapshot_config(self, snapshot) -> dict[str, Any]: + def load_snapshot_payload(self, snapshot_id: str) -> dict[str, Any]: + from archivebox.core.models import Snapshot from archivebox.config.configset import get_config + snapshot = Snapshot.objects.select_related("crawl").get(id=snapshot_id) config = get_config(crawl=self.crawl, snapshot=snapshot) config.update(self.base_config) config["CRAWL_DIR"] = str(self.crawl.output_dir) config["SNAP_DIR"] = str(snapshot.output_dir) - config["SNAPSHOT_ID"] = str(snapshot.id) - config["SNAPSHOT_DEPTH"] = snapshot.depth - config["CRAWL_ID"] = str(self.crawl.id) - config["SOURCE_URL"] = snapshot.url - if snapshot.parent_snapshot_id: - config["PARENT_SNAPSHOT_ID"] = str(snapshot.parent_snapshot_id) - return config - - async def _run_crawl_setup(self, snapshot_id: str) -> None: - from asgiref.sync import sync_to_async - - snapshot = await sync_to_async(self._load_snapshot_run_data, thread_sensitive=True)(snapshot_id) - setup_snapshot = AbxSnapshot( - url=snapshot["url"], - id=snapshot["id"], - title=snapshot["title"], - timestamp=snapshot["timestamp"], - bookmarked_at=snapshot["bookmarked_at"], - created_at=snapshot["created_at"], - tags=snapshot["tags"], - depth=snapshot["depth"], - parent_snapshot_id=snapshot["parent_snapshot_id"], - crawl_id=str(self.crawl.id), - ) - await download( - url=snapshot["url"], - plugins=self.plugins, - output_dir=Path(snapshot["output_dir"]), - selected_plugins=self.selected_plugins, - bus=self.bus, - emit_jsonl=False, - snapshot=setup_snapshot, - crawl_setup_only=True, - ) - - async def _run_crawl_cleanup(self, snapshot_id: str) -> None: - from asgiref.sync import sync_to_async - - snapshot = await sync_to_async(self._load_snapshot_run_data, thread_sensitive=True)(snapshot_id) - cleanup_snapshot = AbxSnapshot( - url=snapshot["url"], - id=snapshot["id"], - title=snapshot["title"], - timestamp=snapshot["timestamp"], - bookmarked_at=snapshot["bookmarked_at"], - created_at=snapshot["created_at"], - tags=snapshot["tags"], - depth=snapshot["depth"], - parent_snapshot_id=snapshot["parent_snapshot_id"], - crawl_id=str(self.crawl.id), - ) - await download( - url=snapshot["url"], - plugins=self.plugins, - output_dir=Path(snapshot["output_dir"]), - selected_plugins=self.selected_plugins, - bus=self.bus, - emit_jsonl=False, - snapshot=cleanup_snapshot, - crawl_cleanup_only=True, - ) - - async def _run_snapshot(self, snapshot_id: str) -> None: - from asgiref.sync import sync_to_async - - async with self.snapshot_semaphore: - snapshot = await sync_to_async(self._load_snapshot_run_data, thread_sensitive=True)(snapshot_id) - if snapshot["status"] == "sealed": - return - if snapshot["depth"] > 0 and _limit_stop_reason(snapshot["config"]) == "max_size": - await sync_to_async(self._cancel_snapshot_due_to_limit, thread_sensitive=True)(snapshot_id) - return - abx_snapshot = AbxSnapshot( - url=snapshot["url"], - id=snapshot["id"], - title=snapshot["title"], - timestamp=snapshot["timestamp"], - bookmarked_at=snapshot["bookmarked_at"], - created_at=snapshot["created_at"], - tags=snapshot["tags"], - depth=snapshot["depth"], - parent_snapshot_id=snapshot["parent_snapshot_id"], - crawl_id=str(self.crawl.id), - ) - try: - await download( - url=snapshot["url"], - plugins=self.plugins, - output_dir=Path(snapshot["output_dir"]), - selected_plugins=self.selected_plugins, - bus=self.bus, - emit_jsonl=False, - snapshot=abx_snapshot, - skip_crawl_setup=True, - skip_crawl_cleanup=True, - ) - finally: - current_task = asyncio.current_task() - if current_task is not None and self.snapshot_tasks.get(snapshot_id) is current_task: - self.snapshot_tasks.pop(snapshot_id, None) - - def _load_snapshot_run_data(self, snapshot_id: str): - from archivebox.core.models import Snapshot - - snapshot = Snapshot.objects.select_related("crawl").get(id=snapshot_id) + extra_context: dict[str, Any] = {} + if config.get("EXTRA_CONTEXT"): + parsed_extra_context = json.loads(str(config["EXTRA_CONTEXT"])) + if not isinstance(parsed_extra_context, dict): + raise TypeError("EXTRA_CONTEXT must decode to an object") + extra_context = parsed_extra_context + extra_context["snapshot_id"] = str(snapshot.id) + extra_context["snapshot_depth"] = snapshot.depth + config["EXTRA_CONTEXT"] = json.dumps(extra_context, separators=(",", ":"), sort_keys=True) return { "id": str(snapshot.id), "url": snapshot.url, @@ -542,12 +300,91 @@ class CrawlRunner: "tags": snapshot.tags_str(), "depth": snapshot.depth, "status": snapshot.status, - "parent_snapshot_id": str(snapshot.parent_snapshot_id) if snapshot.parent_snapshot_id else None, "output_dir": str(snapshot.output_dir), - "config": self._snapshot_config(snapshot), + "config": config, } - def _cancel_snapshot_due_to_limit(self, snapshot_id: str) -> None: + async def run_crawl_setup(self, snapshot_id: str) -> None: + snapshot = await sync_to_async(self.load_snapshot_payload, thread_sensitive=True)(snapshot_id) + await download( + url=snapshot["url"], + plugins=self.plugins, + output_dir=Path(snapshot["output_dir"]), + selected_plugins=self.selected_plugins, + config_overrides=snapshot["config"], + derived_config_overrides=self.derived_config, + bus=self.bus, + emit_jsonl=False, + install_enabled=True, + crawl_setup_enabled=True, + crawl_start_enabled=False, + snapshot_cleanup_enabled=False, + crawl_cleanup_enabled=False, + machine_service=None, + binary_service=None, + process_service=None, + archive_result_service=None, + tag_service=None, + ) + + async def run_crawl_cleanup(self, snapshot_id: str) -> None: + snapshot = await sync_to_async(self.load_snapshot_payload, thread_sensitive=True)(snapshot_id) + await download( + bus=self.bus, + url=snapshot["url"], + output_dir=Path(snapshot["output_dir"]), + plugins=self.plugins, + selected_plugins=self.selected_plugins, + config_overrides=snapshot["config"], + derived_config_overrides=self.derived_config, + emit_jsonl=False, + install_enabled=False, + crawl_setup_enabled=False, + crawl_start_enabled=False, + snapshot_cleanup_enabled=False, + crawl_cleanup_enabled=True, + machine_service=None, + binary_service=None, + process_service=None, + archive_result_service=None, + tag_service=None, + ) + + async def run_snapshot(self, snapshot_id: str) -> None: + async with self.snapshot_semaphore: + snapshot = await sync_to_async(self.load_snapshot_payload, thread_sensitive=True)(snapshot_id) + if snapshot["status"] == "sealed": + return + if snapshot["depth"] > 0 and CrawlLimitState.from_config(snapshot["config"]).get_stop_reason() == "max_size": + await sync_to_async(self.seal_snapshot_due_to_limit, thread_sensitive=True)(snapshot_id) + return + try: + await download( + url=snapshot["url"], + plugins=self.plugins, + output_dir=Path(snapshot["output_dir"]), + selected_plugins=self.selected_plugins, + config_overrides=snapshot["config"], + derived_config_overrides=self.derived_config, + bus=self.bus, + emit_jsonl=False, + install_enabled=False, + crawl_setup_enabled=False, + crawl_start_enabled=True, + snapshot_cleanup_enabled=True, + crawl_cleanup_enabled=False, + machine_service=None, + binary_service=None, + process_service=None, + archive_result_service=None, + tag_service=None, + ) + finally: + current_task = asyncio.current_task() + if current_task is not None and self.snapshot_tasks.get(snapshot_id) is current_task: + self.snapshot_tasks.pop(snapshot_id, None) + + def seal_snapshot_due_to_limit(self, snapshot_id: str) -> None: from archivebox.core.models import Snapshot snapshot = Snapshot.objects.filter(id=snapshot_id).first() @@ -579,21 +416,20 @@ def run_crawl( async def _run_binary(binary_id: str) -> None: - from asgiref.sync import sync_to_async - from archivebox.config.configset import get_config - from archivebox.machine.models import Binary + from archivebox.machine.models import Binary, Machine - binary = await sync_to_async(Binary.objects.get, thread_sensitive=True)(id=binary_id) + binary = await Binary.objects.aget(id=binary_id) plugins = discover_plugins() config = get_config() - derived_config = await sync_to_async(_installed_binary_config_overrides, thread_sensitive=True)(plugins, config) + machine = await sync_to_async(Machine.current, thread_sensitive=True)() + derived_config = dict(machine.config) config["ABX_RUNTIME"] = "archivebox" bus = create_bus(name=_bus_name("ArchiveBox_binary", str(binary.id)), total_timeout=1800.0) - process_service = ProcessService(bus) + ProcessService(bus) BinaryService(bus) TagService(bus) - ArchiveResultService(bus, process_service=process_service) + ArchiveResultService(bus) setup_abx_services( bus, plugins=plugins, @@ -605,7 +441,6 @@ async def _run_binary(binary_id: str) -> None: ) try: - _attach_bus_trace(bus) await bus.emit( BinaryRequestEvent( name=binary.name, @@ -619,7 +454,6 @@ async def _run_binary(binary_id: str) -> None: ), ) finally: - await _stop_bus_trace(bus) await bus.stop() @@ -628,20 +462,20 @@ def run_binary(binary_id: str) -> None: async def _run_install(plugin_names: list[str] | None = None) -> None: - from asgiref.sync import sync_to_async - from archivebox.config.configset import get_config + from archivebox.machine.models import Machine plugins = discover_plugins() config = get_config() - derived_config = await sync_to_async(_installed_binary_config_overrides, thread_sensitive=True)(plugins, config) + machine = await sync_to_async(Machine.current, thread_sensitive=True)() + derived_config = dict(machine.config) config["ABX_RUNTIME"] = "archivebox" bus = create_bus(name="ArchiveBox_install", total_timeout=3600.0) - process_service = ProcessService(bus) + ProcessService(bus) BinaryService(bus) TagService(bus) - ArchiveResultService(bus, process_service=process_service) - abx_services = setup_abx_services( + ArchiveResultService(bus) + setup_abx_services( bus, plugins=plugins, config_overrides=config, @@ -657,7 +491,7 @@ async def _run_install(plugin_names: list[str] | None = None) -> None: if not selected_plugins: return plugins_label = ", ".join(plugin_names) if plugin_names else f"all ({len(plugins)} available)" - timeout_seconds = int(config.get("TIMEOUT") or 60) + timeout_seconds = config["TIMEOUT"] stdout_is_tty = sys.stdout.isatty() stderr_is_tty = sys.stderr.isatty() interactive_tty = stdout_is_tty or stderr_is_tty @@ -668,7 +502,7 @@ async def _run_install(plugin_names: list[str] | None = None) -> None: stream = sys.stderr if stderr_is_tty else sys.stdout if os.path.exists("/dev/tty"): try: - live_stream = open("/dev/tty", "w", buffering=1, encoding=getattr(stream, "encoding", None) or "utf-8") + live_stream = open("/dev/tty", "w", buffering=1, encoding=stream.encoding or "utf-8") stream = live_stream except OSError: live_stream = None @@ -707,20 +541,21 @@ async def _run_install(plugin_names: list[str] | None = None) -> None: plugins_label=plugins_label, ) with live_ui if live_ui is not None else nullcontext(): - _attach_bus_trace(bus) results = await abx_install_plugins( plugin_names=plugin_names, plugins=plugins, output_dir=output_dir, config_overrides=config, + derived_config_overrides=derived_config, emit_jsonl=False, bus=bus, + machine_service=None, + binary_service=None, + process_service=None, ) - await abx_services.process.wait_for_background_monitors() if live_ui is not None: live_ui.print_summary(results, output_dir=output_dir) finally: - await _stop_bus_trace(bus) await bus.stop() try: if live_stream is not None: @@ -739,6 +574,12 @@ def recover_orphaned_crawls() -> int: from archivebox.machine.models import Process active_crawl_ids: set[str] = set() + orphaned_crawls = list( + Crawl.objects.filter( + status=Crawl.StatusChoices.STARTED, + retry_at__isnull=True, + ).prefetch_related("snapshot_set"), + ) running_processes = Process.objects.filter( status=Process.StatusChoices.RUNNING, process_type__in=[ @@ -746,23 +587,27 @@ def recover_orphaned_crawls() -> int: Process.TypeChoices.HOOK, Process.TypeChoices.BINARY, ], - ).only("env") + ).only("pwd") for proc in running_processes: - env = proc.env or {} - if not isinstance(env, dict): + if not proc.pwd: continue - crawl_id = env.get("CRAWL_ID") - if crawl_id: - active_crawl_ids.add(str(crawl_id)) + proc_pwd = Path(proc.pwd) + for crawl in orphaned_crawls: + matched_snapshot = None + for snapshot in crawl.snapshot_set.all(): + try: + proc_pwd.relative_to(snapshot.output_dir) + matched_snapshot = snapshot + break + except ValueError: + continue + if matched_snapshot is not None: + active_crawl_ids.add(str(crawl.id)) + break recovered = 0 now = timezone.now() - orphaned_crawls = Crawl.objects.filter( - status=Crawl.StatusChoices.STARTED, - retry_at__isnull=True, - ).prefetch_related("snapshot_set") - for crawl in orphaned_crawls: if str(crawl.id) in active_crawl_ids: continue @@ -788,6 +633,11 @@ def recover_orphaned_snapshots() -> int: from archivebox.machine.models import Process active_snapshot_ids: set[str] = set() + orphaned_snapshots = list( + Snapshot.objects.filter(status=Snapshot.StatusChoices.STARTED, retry_at__isnull=True) + .select_related("crawl") + .prefetch_related("archiveresult_set"), + ) running_processes = Process.objects.filter( status=Process.StatusChoices.RUNNING, process_type__in=[ @@ -795,24 +645,22 @@ def recover_orphaned_snapshots() -> int: Process.TypeChoices.HOOK, Process.TypeChoices.BINARY, ], - ).only("env") + ).only("pwd") for proc in running_processes: - env = proc.env or {} - if not isinstance(env, dict): + if not proc.pwd: continue - snapshot_id = env.get("SNAPSHOT_ID") - if snapshot_id: - active_snapshot_ids.add(str(snapshot_id)) + proc_pwd = Path(proc.pwd) + for snapshot in orphaned_snapshots: + try: + proc_pwd.relative_to(snapshot.output_dir) + active_snapshot_ids.add(str(snapshot.id)) + break + except ValueError: + continue recovered = 0 now = timezone.now() - orphaned_snapshots = ( - Snapshot.objects.filter(status=Snapshot.StatusChoices.STARTED, retry_at__isnull=True) - .select_related("crawl") - .prefetch_related("archiveresult_set") - ) - for snapshot in orphaned_snapshots: if str(snapshot.id) in active_snapshot_ids: continue diff --git a/archivebox/services/snapshot_service.py b/archivebox/services/snapshot_service.py index 4cd103e8..f84632ba 100644 --- a/archivebox/services/snapshot_service.py +++ b/archivebox/services/snapshot_service.py @@ -7,8 +7,6 @@ from abx_dl.events import SnapshotCompletedEvent, SnapshotEvent from abx_dl.limits import CrawlLimitState from abx_dl.services.base import BaseService -from .db import run_db_op - class SnapshotService(BaseService): LISTENS_TO = [SnapshotEvent, SnapshotCompletedEvent] @@ -18,120 +16,96 @@ class SnapshotService(BaseService): self.crawl_id = crawl_id self.schedule_snapshot = schedule_snapshot super().__init__(bus) + self.bus.on(SnapshotEvent, self.on_SnapshotEvent) + self.bus.on(SnapshotCompletedEvent, self.on_SnapshotCompletedEvent) - async def on_SnapshotEvent__Outer(self, event: SnapshotEvent) -> None: - snapshot_id = await run_db_op(self._project_snapshot, event) - if snapshot_id: - await sync_to_async(self._ensure_crawl_symlink)(snapshot_id) - if snapshot_id and event.depth > 0: - await self.schedule_snapshot(snapshot_id) - - async def on_SnapshotCompletedEvent__Outer(self, event: SnapshotCompletedEvent) -> None: - snapshot_id = await run_db_op(self._seal_snapshot, event.snapshot_id) - if snapshot_id: - await sync_to_async(self._write_snapshot_details)(snapshot_id) - - def _project_snapshot(self, event: SnapshotEvent) -> str | None: + async def on_SnapshotEvent(self, event: SnapshotEvent) -> None: from archivebox.core.models import Snapshot from archivebox.crawls.models import Crawl - crawl = Crawl.objects.get(id=self.crawl_id) + crawl = await Crawl.objects.aget(id=self.crawl_id) + snapshot_id: str | None = None + snapshot = await Snapshot.objects.filter(id=event.snapshot_id, crawl=crawl).afirst() - if event.depth == 0: - snapshot = Snapshot.objects.filter(id=event.snapshot_id, crawl=crawl).first() - if snapshot is None: - return None + if snapshot is not None: snapshot.status = Snapshot.StatusChoices.STARTED snapshot.retry_at = None - snapshot.save(update_fields=["status", "retry_at", "modified_at"]) - return str(snapshot.id) + await snapshot.asave(update_fields=["status", "retry_at", "modified_at"]) + snapshot_id = str(snapshot.id) + elif event.depth > 0: + if event.depth <= crawl.max_depth and self._crawl_limit_stop_reason(crawl) != "max_size": + parent_event = await self.bus.find( + SnapshotEvent, + past=True, + future=False, + where=lambda candidate: candidate.depth == event.depth - 1 and self.bus.event_is_child_of(event, candidate), + ) + parent_snapshot = None + if parent_event is not None: + parent_snapshot = await Snapshot.objects.filter(id=parent_event.snapshot_id, crawl=crawl).afirst() + if parent_snapshot is not None and self._url_passes_filters(crawl, parent_snapshot, event.url): + snapshot = await sync_to_async(Snapshot.from_json, thread_sensitive=True)( + { + "url": event.url, + "depth": event.depth, + "parent_snapshot_id": str(parent_snapshot.id), + "crawl_id": str(crawl.id), + }, + overrides={ + "crawl": crawl, + "snapshot": parent_snapshot, + "created_by_id": crawl.created_by_id, + }, + queue_for_extraction=False, + ) + if snapshot is not None and snapshot.status != Snapshot.StatusChoices.SEALED: + snapshot.retry_at = None + snapshot.status = Snapshot.StatusChoices.QUEUED + await snapshot.asave(update_fields=["status", "retry_at", "modified_at"]) + snapshot_id = str(snapshot.id) - if event.depth > crawl.max_depth: - return None - if self._crawl_limit_stop_reason(crawl) == "max_size": - return None + if snapshot_id: + snapshot = await Snapshot.objects.filter(id=snapshot_id).select_related("crawl", "crawl__created_by").afirst() + if snapshot is not None: + await sync_to_async(snapshot.ensure_crawl_symlink, thread_sensitive=True)() + if snapshot_id and event.depth > 0: + await self.schedule_snapshot(snapshot_id) - parent_snapshot = Snapshot.objects.filter(id=event.parent_snapshot_id, crawl=crawl).first() - if parent_snapshot is None: - return None - if not self._url_passes_filters(crawl, parent_snapshot, event.url): - return None + async def on_SnapshotCompletedEvent(self, event: SnapshotCompletedEvent) -> None: + from archivebox.core.models import Snapshot - snapshot = Snapshot.from_json( - { - "url": event.url, - "depth": event.depth, - "parent_snapshot_id": str(parent_snapshot.id), - "crawl_id": str(crawl.id), - }, - overrides={ - "crawl": crawl, - "snapshot": parent_snapshot, - "created_by_id": crawl.created_by_id, - }, - queue_for_extraction=False, - ) - if snapshot is None: - return None - if snapshot.status == Snapshot.StatusChoices.SEALED: - return None - snapshot.retry_at = None - if snapshot.status != Snapshot.StatusChoices.SEALED: - snapshot.status = Snapshot.StatusChoices.QUEUED - snapshot.save(update_fields=["status", "retry_at", "modified_at"]) - return str(snapshot.id) + snapshot = await Snapshot.objects.select_related("crawl").filter(id=event.snapshot_id).afirst() + snapshot_id: str | None = None + if snapshot is not None: + snapshot.status = Snapshot.StatusChoices.SEALED + snapshot.retry_at = None + snapshot.downloaded_at = snapshot.downloaded_at or timezone.now() + await snapshot.asave(update_fields=["status", "retry_at", "downloaded_at", "modified_at"]) + if snapshot.crawl_id and self._crawl_limit_stop_reason(snapshot.crawl) == "max_size": + await ( + Snapshot.objects.filter( + crawl_id=snapshot.crawl_id, + status=Snapshot.StatusChoices.QUEUED, + ) + .exclude(id=snapshot.id) + .aupdate( + status=Snapshot.StatusChoices.SEALED, + retry_at=None, + modified_at=timezone.now(), + ) + ) + snapshot_id = str(snapshot.id) + if snapshot_id: + snapshot = await Snapshot.objects.filter(id=snapshot_id).select_related("crawl", "crawl__created_by").afirst() + if snapshot is not None: + await sync_to_async(snapshot.write_index_jsonl, thread_sensitive=True)() + await sync_to_async(snapshot.write_json_details, thread_sensitive=True)() + await sync_to_async(snapshot.write_html_details, thread_sensitive=True)() def _url_passes_filters(self, crawl, parent_snapshot, url: str) -> bool: return crawl.url_passes_filters(url, snapshot=parent_snapshot) - def _seal_snapshot(self, snapshot_id: str) -> str | None: - from archivebox.core.models import Snapshot - - snapshot = Snapshot.objects.select_related("crawl").filter(id=snapshot_id).first() - if snapshot is None: - return None - snapshot.status = Snapshot.StatusChoices.SEALED - snapshot.retry_at = None - snapshot.downloaded_at = snapshot.downloaded_at or timezone.now() - snapshot.save(update_fields=["status", "retry_at", "downloaded_at", "modified_at"]) - if snapshot.crawl_id and self._crawl_limit_stop_reason(snapshot.crawl) == "max_size": - self._cancel_pending_snapshots(snapshot.crawl_id, exclude_snapshot_id=snapshot.id) - return str(snapshot.id) - def _crawl_limit_stop_reason(self, crawl) -> str: config = dict(crawl.config or {}) config["CRAWL_DIR"] = str(crawl.output_dir) return CrawlLimitState.from_config(config).get_stop_reason() - - def _cancel_pending_snapshots(self, crawl_id: str, *, exclude_snapshot_id) -> int: - from archivebox.core.models import Snapshot - - return ( - Snapshot.objects.filter( - crawl_id=crawl_id, - status=Snapshot.StatusChoices.QUEUED, - ) - .exclude(id=exclude_snapshot_id) - .update( - status=Snapshot.StatusChoices.SEALED, - retry_at=None, - modified_at=timezone.now(), - ) - ) - - def _ensure_crawl_symlink(self, snapshot_id: str) -> None: - from archivebox.core.models import Snapshot - - snapshot = Snapshot.objects.filter(id=snapshot_id).select_related("crawl", "crawl__created_by").first() - if snapshot is not None: - snapshot.ensure_crawl_symlink() - - def _write_snapshot_details(self, snapshot_id: str) -> None: - from archivebox.core.models import Snapshot - - snapshot = Snapshot.objects.filter(id=snapshot_id).select_related("crawl", "crawl__created_by").first() - if snapshot is None: - return - snapshot.write_index_jsonl() - snapshot.write_json_details() - snapshot.write_html_details() diff --git a/archivebox/services/tag_service.py b/archivebox/services/tag_service.py index 78622609..22d6685d 100644 --- a/archivebox/services/tag_service.py +++ b/archivebox/services/tag_service.py @@ -3,20 +3,20 @@ from __future__ import annotations from abx_dl.events import TagEvent from abx_dl.services.base import BaseService -from .db import run_db_op - class TagService(BaseService): LISTENS_TO = [TagEvent] EMITS = [] - async def on_TagEvent__Outer(self, event: TagEvent) -> None: - await run_db_op(self._project, event) + def __init__(self, bus): + super().__init__(bus) + self.bus.on(TagEvent, self.on_TagEvent__save_to_db) - def _project(self, event: TagEvent) -> None: - from archivebox.core.models import Snapshot, Tag + async def on_TagEvent__save_to_db(self, event: TagEvent) -> None: + from archivebox.core.models import Snapshot, SnapshotTag, Tag - snapshot = Snapshot.objects.filter(id=event.snapshot_id).first() + snapshot = await Snapshot.objects.filter(id=event.snapshot_id).afirst() if snapshot is None: return - Tag.from_json({"name": event.name}, overrides={"snapshot": snapshot}) + tag, _ = await Tag.objects.aget_or_create(name=event.name) + await SnapshotTag.objects.aget_or_create(snapshot=snapshot, tag=tag) diff --git a/archivebox/tests/migrations_helpers.py b/archivebox/tests/migrations_helpers.py index 50ca0b89..42d2a6c0 100644 --- a/archivebox/tests/migrations_helpers.py +++ b/archivebox/tests/migrations_helpers.py @@ -312,7 +312,7 @@ CREATE TABLE IF NOT EXISTS machine_dependency ( modified_at DATETIME, bin_name VARCHAR(63) NOT NULL UNIQUE, bin_providers VARCHAR(127) NOT NULL DEFAULT '*', - custom_cmds TEXT DEFAULT '{}', + overrides TEXT DEFAULT '{}', config TEXT DEFAULT '{}' ); @@ -973,7 +973,6 @@ def seed_0_8_data(db_path: Path) -> dict[str, list[dict]]: ("machine", "0003_alter_installedbinary_options_and_more"), ("machine", "0004_alter_installedbinary_abspath_and_more"), # Then the new migrations after squashing - ("machine", "0002_rename_custom_cmds_to_overrides"), ("machine", "0003_alter_dependency_id_alter_installedbinary_dependency_and_more"), ("machine", "0004_drop_dependency_table"), # Crawls must come before core.0024 because 0024_b depends on it diff --git a/archivebox/tests/test_admin_links.py b/archivebox/tests/test_admin_links.py index 3373271f..22b09db0 100644 --- a/archivebox/tests/test_admin_links.py +++ b/archivebox/tests/test_admin_links.py @@ -144,13 +144,13 @@ def test_archiveresult_admin_copy_command_redacts_sensitive_env_keys(): pwd=str(snapshot.output_dir / "wget"), cmd=["/tmp/on_Snapshot__06_wget.finite.bg.py", "--url=https://example.com"], env={ - "SOURCE_URL": "https://example.com", "SAFE_FLAG": "1", "API_KEY": "super-secret-key", "ACCESS_TOKEN": "super-secret-token", "SHARED_SECRET": "super-secret-secret", }, status=Process.StatusChoices.EXITED, + url="https://example.com", ) result = ArchiveResult.objects.create( snapshot=snapshot, @@ -164,7 +164,7 @@ def test_archiveresult_admin_copy_command_redacts_sensitive_env_keys(): cmd_html = str(admin.cmd_str(result)) assert "SAFE_FLAG=1" in cmd_html - assert "SOURCE_URL=https://example.com" in cmd_html + assert "https://example.com" in cmd_html assert "API_KEY" not in cmd_html assert "ACCESS_TOKEN" not in cmd_html assert "SHARED_SECRET" not in cmd_html diff --git a/archivebox/tests/test_admin_views.py b/archivebox/tests/test_admin_views.py index 2e231361..0bfc0158 100644 --- a/archivebox/tests/test_admin_views.py +++ b/archivebox/tests/test_admin_views.py @@ -8,6 +8,7 @@ Tests cover: - Snapshot progress statistics """ +import json import pytest import uuid from pathlib import Path @@ -822,7 +823,6 @@ class TestAdminSnapshotListView: pwd="/tmp/archivebox", cmd=["python", "/tmp/job.py", "--url=https://example.com"], env={ - "SNAPSHOT_ID": "abc123", "ENABLED": True, "API_KEY": "super-secret-key", "ACCESS_TOKEN": "super-secret-token", @@ -843,7 +843,6 @@ class TestAdminSnapshotListView: assert response.status_code == 200 assert b"Kill" in response.content assert b"python /tmp/job.py --url=https://example.com" in response.content - assert b"SNAPSHOT_ID=abc123" in response.content assert b"ENABLED=True" in response.content assert b"52s" in response.content assert b"API_KEY=" not in response.content @@ -1065,7 +1064,7 @@ class TestAdminSnapshotListView: pid=54321, exit_code=0, cmd=["/plugins/title/on_Snapshot__54_title.js", "--url=https://example.com"], - env={"SNAPSHOT_ID": str(snapshot.id)}, + env={"EXTRA_CONTEXT": json.dumps({"snapshot_id": str(snapshot.id)})}, started_at=timezone.now(), ended_at=timezone.now(), ) @@ -1252,11 +1251,8 @@ class TestLiveProgressView: process_type=Process.TypeChoices.HOOK, status=Process.StatusChoices.RUNNING, pid=pid, + pwd=str(snapshot.output_dir / "chrome"), cmd=["/plugins/chrome/on_CrawlSetup__91_chrome_wait.js", "--url=https://example.com"], - env={ - "CRAWL_ID": str(snapshot.crawl_id), - "SNAPSHOT_ID": str(snapshot.id), - }, started_at=timezone.now(), ) @@ -1290,11 +1286,8 @@ class TestLiveProgressView: process_type=Process.TypeChoices.HOOK, status=Process.StatusChoices.RUNNING, pid=pid, + pwd=str(snapshot.output_dir / "title"), cmd=["/plugins/title/on_Snapshot__10_title.py", "--url=https://example.com"], - env={ - "CRAWL_ID": str(snapshot.crawl_id), - "SNAPSHOT_ID": str(snapshot.id), - }, started_at=timezone.now(), ) @@ -1327,11 +1320,8 @@ class TestLiveProgressView: process_type=Process.TypeChoices.HOOK, status=Process.StatusChoices.RUNNING, pid=os.getpid(), + pwd=str(snapshot.output_dir / "chrome"), cmd=["/plugins/chrome/on_Snapshot__11_chrome_wait.js", "--url=https://example.com"], - env={ - "CRAWL_ID": str(snapshot.crawl_id), - "SNAPSHOT_ID": str(snapshot.id), - }, started_at=timezone.now(), ) ArchiveResult.objects.create( @@ -1369,11 +1359,8 @@ class TestLiveProgressView: status=Process.StatusChoices.EXITED, exit_code=0, pid=99999, + pwd=str(snapshot.output_dir / "title"), cmd=["/plugins/title/on_Snapshot__10_title.py", "--url=https://example.com"], - env={ - "CRAWL_ID": str(snapshot.crawl_id), - "SNAPSHOT_ID": str(snapshot.id), - }, started_at=timezone.now(), ended_at=timezone.now(), ) diff --git a/archivebox/tests/test_archive_result_service.py b/archivebox/tests/test_archive_result_service.py index 3c3aaad5..bae88398 100644 --- a/archivebox/tests/test_archive_result_service.py +++ b/archivebox/tests/test_archive_result_service.py @@ -5,12 +5,12 @@ import pytest from django.db import connection -from abx_dl.events import BinaryRequestEvent, ProcessCompletedEvent, ProcessStartedEvent +from abx_dl.events import ArchiveResultEvent, BinaryRequestEvent, ProcessEvent, ProcessStartedEvent from abx_dl.orchestrator import create_bus from abx_dl.output_files import OutputFile -pytestmark = pytest.mark.django_db +pytestmark = pytest.mark.django_db(transaction=True) def _cleanup_machine_process_rows() -> None: @@ -75,8 +75,8 @@ def _create_iface(machine): def test_process_completed_projects_inline_archiveresult(): from archivebox.core.models import ArchiveResult - from archivebox.services.archive_result_service import ArchiveResultService, _collect_output_metadata - from archivebox.services.process_service import ProcessService + from archivebox.services.archive_result_service import ArchiveResultService + import asyncio snapshot = _create_snapshot() plugin_dir = Path(snapshot.output_dir) / "wget" @@ -84,37 +84,23 @@ def test_process_completed_projects_inline_archiveresult(): (plugin_dir / "index.html").write_text("ok") bus = create_bus(name="test_inline_archiveresult") - process_service = ProcessService(bus) - service = ArchiveResultService(bus, process_service=process_service) + service = ArchiveResultService(bus) - event = ProcessCompletedEvent( - plugin_name="wget", - hook_name="on_Snapshot__06_wget.finite.bg", - stdout='{"snapshot_id":"%s","type":"ArchiveResult","status":"succeeded","output_str":"wget/index.html"}\n' % snapshot.id, - stderr="", - exit_code=0, - output_dir=str(plugin_dir), - output_files=[OutputFile(path="index.html", extension="html", mimetype="text/html", size=15)], - process_id="proc-inline", + event = ArchiveResultEvent( snapshot_id=str(snapshot.id), + plugin="wget", + hook_name="on_Snapshot__06_wget.finite.bg", + status="succeeded", + output_str="wget/index.html", + output_files=[OutputFile(path="index.html", extension="html", mimetype="text/html", size=15)], start_ts="2026-03-22T12:00:00+00:00", end_ts="2026-03-22T12:00:01+00:00", ) - output_files, output_size, output_mimetypes = _collect_output_metadata(plugin_dir) - service._project_from_process_completed( - event, - { - "snapshot_id": str(snapshot.id), - "plugin": "wget", - "hook_name": "on_Snapshot__06_wget.finite.bg", - "status": "succeeded", - "output_str": "wget/index.html", - }, - output_files, - output_size, - output_mimetypes, - ) + async def emit_event() -> None: + await service.on_ArchiveResultEvent__save_to_db(event) + + asyncio.run(emit_event()) result = ArchiveResult.objects.get(snapshot=snapshot, plugin="wget", hook_name="on_Snapshot__06_wget.finite.bg") assert result.status == ArchiveResult.StatusChoices.SUCCEEDED @@ -127,45 +113,31 @@ def test_process_completed_projects_inline_archiveresult(): def test_process_completed_projects_synthetic_failed_archiveresult(): from archivebox.core.models import ArchiveResult - from archivebox.services.archive_result_service import ArchiveResultService, _collect_output_metadata - from archivebox.services.process_service import ProcessService + from archivebox.services.archive_result_service import ArchiveResultService + import asyncio snapshot = _create_snapshot() plugin_dir = Path(snapshot.output_dir) / "chrome" plugin_dir.mkdir(parents=True, exist_ok=True) bus = create_bus(name="test_synthetic_archiveresult") - process_service = ProcessService(bus) - service = ArchiveResultService(bus, process_service=process_service) + service = ArchiveResultService(bus) - event = ProcessCompletedEvent( - plugin_name="chrome", - hook_name="on_Snapshot__11_chrome_wait", - stdout="", - stderr="Hook timed out after 60 seconds", - exit_code=-1, - output_dir=str(plugin_dir), - output_files=[], - process_id="proc-failed", + event = ArchiveResultEvent( snapshot_id=str(snapshot.id), + plugin="chrome", + hook_name="on_Snapshot__11_chrome_wait", + status="failed", + output_str="Hook timed out after 60 seconds", + error="Hook timed out after 60 seconds", start_ts="2026-03-22T12:00:00+00:00", end_ts="2026-03-22T12:01:00+00:00", ) - output_files, output_size, output_mimetypes = _collect_output_metadata(plugin_dir) - service._project_from_process_completed( - event, - { - "plugin": "chrome", - "hook_name": "on_Snapshot__11_chrome_wait", - "status": "failed", - "output_str": "Hook timed out after 60 seconds", - "error": "Hook timed out after 60 seconds", - }, - output_files, - output_size, - output_mimetypes, - ) + async def emit_event() -> None: + await service.on_ArchiveResultEvent__save_to_db(event) + + asyncio.run(emit_event()) result = ArchiveResult.objects.get(snapshot=snapshot, plugin="chrome", hook_name="on_Snapshot__11_chrome_wait") assert result.status == ArchiveResult.StatusChoices.FAILED @@ -176,45 +148,30 @@ def test_process_completed_projects_synthetic_failed_archiveresult(): def test_process_completed_projects_noresults_archiveresult(): from archivebox.core.models import ArchiveResult - from archivebox.services.archive_result_service import ArchiveResultService, _collect_output_metadata - from archivebox.services.process_service import ProcessService + from archivebox.services.archive_result_service import ArchiveResultService + import asyncio snapshot = _create_snapshot() plugin_dir = Path(snapshot.output_dir) / "title" plugin_dir.mkdir(parents=True, exist_ok=True) bus = create_bus(name="test_noresults_archiveresult") - process_service = ProcessService(bus) - service = ArchiveResultService(bus, process_service=process_service) + service = ArchiveResultService(bus) - event = ProcessCompletedEvent( - plugin_name="title", - hook_name="on_Snapshot__54_title.js", - stdout='{"snapshot_id":"%s","type":"ArchiveResult","status":"noresults","output_str":"No title found"}\n' % snapshot.id, - stderr="", - exit_code=0, - output_dir=str(plugin_dir), - output_files=[], - process_id="proc-noresults", + event = ArchiveResultEvent( snapshot_id=str(snapshot.id), + plugin="title", + hook_name="on_Snapshot__54_title.js", + status="noresults", + output_str="No title found", start_ts="2026-03-22T12:00:00+00:00", end_ts="2026-03-22T12:00:01+00:00", ) - output_files, output_size, output_mimetypes = _collect_output_metadata(plugin_dir) - service._project_from_process_completed( - event, - { - "snapshot_id": str(snapshot.id), - "plugin": "title", - "hook_name": "on_Snapshot__54_title.js", - "status": "noresults", - "output_str": "No title found", - }, - output_files, - output_size, - output_mimetypes, - ) + async def emit_event() -> None: + await service.on_ArchiveResultEvent__save_to_db(event) + + asyncio.run(emit_event()) result = ArchiveResult.objects.get(snapshot=snapshot, plugin="title", hook_name="on_Snapshot__54_title.js") assert result.status == ArchiveResult.StatusChoices.NORESULTS @@ -258,45 +215,30 @@ def test_retry_failed_archiveresults_requeues_snapshot_in_queued_state(): def test_process_completed_projects_snapshot_title_from_output_str(): - from archivebox.services.archive_result_service import ArchiveResultService, _collect_output_metadata - from archivebox.services.process_service import ProcessService + from archivebox.services.archive_result_service import ArchiveResultService + import asyncio snapshot = _create_snapshot() plugin_dir = Path(snapshot.output_dir) / "title" plugin_dir.mkdir(parents=True, exist_ok=True) bus = create_bus(name="test_snapshot_title_output_str") - process_service = ProcessService(bus) - service = ArchiveResultService(bus, process_service=process_service) + service = ArchiveResultService(bus) - event = ProcessCompletedEvent( - plugin_name="title", - hook_name="on_Snapshot__54_title.js", - stdout='{"snapshot_id":"%s","type":"ArchiveResult","status":"succeeded","output_str":"Example Domain"}\n' % snapshot.id, - stderr="", - exit_code=0, - output_dir=str(plugin_dir), - output_files=[], - process_id="proc-title-output-str", + event = ArchiveResultEvent( snapshot_id=str(snapshot.id), + plugin="title", + hook_name="on_Snapshot__54_title.js", + status="succeeded", + output_str="Example Domain", start_ts="2026-03-22T12:00:00+00:00", end_ts="2026-03-22T12:00:01+00:00", ) - output_files, output_size, output_mimetypes = _collect_output_metadata(plugin_dir) - service._project_from_process_completed( - event, - { - "snapshot_id": str(snapshot.id), - "plugin": "title", - "hook_name": "on_Snapshot__54_title.js", - "status": "succeeded", - "output_str": "Example Domain", - }, - output_files, - output_size, - output_mimetypes, - ) + async def emit_event() -> None: + await service.on_ArchiveResultEvent__save_to_db(event) + + asyncio.run(emit_event()) snapshot.refresh_from_db() assert snapshot.title == "Example Domain" @@ -304,8 +246,8 @@ def test_process_completed_projects_snapshot_title_from_output_str(): def test_process_completed_projects_snapshot_title_from_title_file(): - from archivebox.services.archive_result_service import ArchiveResultService, _collect_output_metadata - from archivebox.services.process_service import ProcessService + from archivebox.services.archive_result_service import ArchiveResultService + import asyncio snapshot = _create_snapshot() plugin_dir = Path(snapshot.output_dir) / "title" @@ -313,37 +255,23 @@ def test_process_completed_projects_snapshot_title_from_title_file(): (plugin_dir / "title.txt").write_text("Example Domain") bus = create_bus(name="test_snapshot_title_file") - process_service = ProcessService(bus) - service = ArchiveResultService(bus, process_service=process_service) + service = ArchiveResultService(bus) - event = ProcessCompletedEvent( - plugin_name="title", - hook_name="on_Snapshot__54_title.js", - stdout='{"snapshot_id":"%s","type":"ArchiveResult","status":"noresults","output_str":"No title found"}\n' % snapshot.id, - stderr="", - exit_code=0, - output_dir=str(plugin_dir), - output_files=[OutputFile(path="title.txt", extension="txt", mimetype="text/plain", size=14)], - process_id="proc-title-file", + event = ArchiveResultEvent( snapshot_id=str(snapshot.id), + plugin="title", + hook_name="on_Snapshot__54_title.js", + status="noresults", + output_str="No title found", + output_files=[OutputFile(path="title.txt", extension="txt", mimetype="text/plain", size=14)], start_ts="2026-03-22T12:00:00+00:00", end_ts="2026-03-22T12:00:01+00:00", ) - output_files, output_size, output_mimetypes = _collect_output_metadata(plugin_dir) - service._project_from_process_completed( - event, - { - "snapshot_id": str(snapshot.id), - "plugin": "title", - "hook_name": "on_Snapshot__54_title.js", - "status": "noresults", - "output_str": "No title found", - }, - output_files, - output_size, - output_mimetypes, - ) + async def emit_event() -> None: + await service.on_ArchiveResultEvent__save_to_db(event) + + asyncio.run(emit_event()) snapshot.refresh_from_db() assert snapshot.title == "Example Domain" @@ -410,9 +338,12 @@ def test_collect_output_metadata_detects_warc_gz_mimetype(tmp_path): assert output_mimetypes == "application/warc" -def test_process_started_hydrates_binary_and_iface_from_existing_binary_records(monkeypatch): +@pytest.mark.django_db(transaction=True) +def test_process_started_hydrates_binary_and_iface_from_existing_binary_records(monkeypatch, tmp_path): from archivebox.machine.models import Binary, NetworkInterface - from archivebox.services.process_service import ProcessService + from archivebox.machine.models import Process as MachineProcess + from archivebox.services.process_service import ProcessService as ArchiveBoxProcessService + from abx_dl.services.process_service import ProcessService as DlProcessService machine = _create_machine() iface = _create_iface(machine) @@ -428,35 +359,60 @@ def test_process_started_hydrates_binary_and_iface_from_existing_binary_records( status=Binary.StatusChoices.INSTALLED, ) + hook_path = tmp_path / "on_Snapshot__57_mercury.py" + hook_path.write_text("#!/bin/bash\nexit 0\n", encoding="utf-8") + hook_path.chmod(0o755) + output_dir = tmp_path / "mercury" + output_dir.mkdir() + bus = create_bus(name="test_process_started_binary_hydration") - service = ProcessService(bus) - event = ProcessStartedEvent( - plugin_name="mercury", - hook_name="on_Snapshot__57_mercury.py", - hook_path="/plugins/mercury/on_Snapshot__57_mercury.py", - hook_args=["--url=https://example.com"], - output_dir="/tmp/mercury", - env={ - "MERCURY_BINARY": binary.abspath, - "NODE_BINARY": "/tmp/node", - }, - timeout=60, - pid=4321, - process_id="proc-mercury", - snapshot_id="", - start_ts="2026-03-22T12:00:00+00:00", + DlProcessService(bus, emit_jsonl=False, stderr_is_tty=False) + ArchiveBoxProcessService(bus) + + async def run_test() -> None: + await bus.emit( + ProcessEvent( + plugin_name="mercury", + hook_name="on_Snapshot__57_mercury.py", + hook_path=str(hook_path), + hook_args=["--url=https://example.com"], + is_background=False, + output_dir=str(output_dir), + env={ + "MERCURY_BINARY": binary.abspath, + "NODE_BINARY": "/tmp/node", + }, + timeout=60, + url="https://example.com", + ), + ) + started = await bus.find( + ProcessStartedEvent, + past=True, + future=False, + hook_name="on_Snapshot__57_mercury.py", + output_dir=str(output_dir), + ) + assert started is not None + + import asyncio + + asyncio.run(run_test()) + + process = MachineProcess.objects.get( + pwd=str(output_dir), + cmd=[str(hook_path), "--url=https://example.com"], ) - - service._project_started(event) - - process = service._get_or_create_process(event) assert process.binary_id == binary.id assert process.iface_id == iface.id -def test_process_started_uses_node_binary_for_js_hooks_without_plugin_binary(monkeypatch): +@pytest.mark.django_db(transaction=True) +def test_process_started_uses_node_binary_for_js_hooks_without_plugin_binary(monkeypatch, tmp_path): from archivebox.machine.models import Binary, NetworkInterface - from archivebox.services.process_service import ProcessService + from archivebox.machine.models import Process as MachineProcess + from archivebox.services.process_service import ProcessService as ArchiveBoxProcessService + from abx_dl.services.process_service import ProcessService as DlProcessService machine = _create_machine() iface = _create_iface(machine) @@ -472,27 +428,47 @@ def test_process_started_uses_node_binary_for_js_hooks_without_plugin_binary(mon status=Binary.StatusChoices.INSTALLED, ) + hook_path = tmp_path / "on_Snapshot__75_parse_dom_outlinks.js" + hook_path.write_text("#!/bin/bash\nexit 0\n", encoding="utf-8") + hook_path.chmod(0o755) + output_dir = tmp_path / "parse-dom-outlinks" + output_dir.mkdir() + bus = create_bus(name="test_process_started_node_fallback") - service = ProcessService(bus) - event = ProcessStartedEvent( - plugin_name="parse_dom_outlinks", - hook_name="on_Snapshot__75_parse_dom_outlinks.js", - hook_path="/plugins/parse_dom_outlinks/on_Snapshot__75_parse_dom_outlinks.js", - hook_args=["--url=https://example.com"], - output_dir="/tmp/parse-dom-outlinks", - env={ - "NODE_BINARY": node.abspath, - }, - timeout=60, - pid=9876, - process_id="proc-parse-dom-outlinks", - snapshot_id="", - start_ts="2026-03-22T12:00:00+00:00", + DlProcessService(bus, emit_jsonl=False, stderr_is_tty=False) + ArchiveBoxProcessService(bus) + + async def run_test() -> None: + await bus.emit( + ProcessEvent( + plugin_name="parse_dom_outlinks", + hook_name="on_Snapshot__75_parse_dom_outlinks.js", + hook_path=str(hook_path), + hook_args=["--url=https://example.com"], + is_background=False, + output_dir=str(output_dir), + env={"NODE_BINARY": node.abspath}, + timeout=60, + url="https://example.com", + ), + ) + started = await bus.find( + ProcessStartedEvent, + past=True, + future=False, + hook_name="on_Snapshot__75_parse_dom_outlinks.js", + output_dir=str(output_dir), + ) + assert started is not None + + import asyncio + + asyncio.run(run_test()) + + process = MachineProcess.objects.get( + pwd=str(output_dir), + cmd=[str(hook_path), "--url=https://example.com"], ) - - service._project_started(event) - - process = service._get_or_create_process(event) assert process.binary_id == node.id assert process.iface_id == iface.id @@ -500,6 +476,7 @@ def test_process_started_uses_node_binary_for_js_hooks_without_plugin_binary(mon def test_binary_event_reuses_existing_installed_binary_row(monkeypatch): from archivebox.machine.models import Binary, Machine from archivebox.services.binary_service import BinaryService as ArchiveBoxBinaryService + import asyncio machine = _create_machine() monkeypatch.setattr(Machine, "current", classmethod(lambda cls: machine)) @@ -522,7 +499,7 @@ def test_binary_event_reuses_existing_installed_binary_row(monkeypatch): binproviders="provider", ) - service._project_binary(event) + asyncio.run(service.on_BinaryRequestEvent(event)) binary.refresh_from_db() assert Binary.objects.filter(machine=machine, name="wget").count() == 1 diff --git a/archivebox/tests/test_cli_run.py b/archivebox/tests/test_cli_run.py index 664d116a..8fa0c887 100644 --- a/archivebox/tests/test_cli_run.py +++ b/archivebox/tests/test_cli_run.py @@ -378,11 +378,8 @@ class TestRecoverOrphanedCrawls: machine=machine, process_type=Process.TypeChoices.HOOK, status=Process.StatusChoices.RUNNING, + pwd=str(snapshot.output_dir / "chrome"), cmd=["/plugins/chrome/on_CrawlSetup__91_chrome_wait.js"], - env={ - "CRAWL_ID": str(crawl.id), - "SNAPSHOT_ID": str(snapshot.id), - }, started_at=timezone.now(), ) diff --git a/archivebox/tests/test_hooks.py b/archivebox/tests/test_hooks.py index 363b580f..d0bd8f83 100755 --- a/archivebox/tests/test_hooks.py +++ b/archivebox/tests/test_hooks.py @@ -464,23 +464,24 @@ class TestDependencyRecordOutput(unittest.TestCase): self.assertEqual(data["name"], "wget") self.assertTrue(data["abspath"].startswith("/")) - def test_dependency_record_outputs_machine_config(self): - """Dependency resolution should output Machine config update JSONL.""" + def test_dependency_record_outputs_binary_jsonl(self): + """Dependency resolution should output Binary JSONL.""" hook_output = json.dumps( { - "type": "Machine", - "config": { - "WGET_BINARY": "/usr/bin/wget", - }, + "type": "Binary", + "name": "wget", + "abspath": "/usr/bin/wget", + "version": "1.21.3", + "binprovider": "env", }, ) from archivebox.machine.models import Process data = Process.parse_records_from_text(hook_output)[0] - self.assertEqual(data["type"], "Machine") - self.assertIn("config", data) - self.assertEqual(data["config"]["WGET_BINARY"], "/usr/bin/wget") + self.assertEqual(data["type"], "Binary") + self.assertEqual(data["name"], "wget") + self.assertEqual(data["abspath"], "/usr/bin/wget") class TestSnapshotHookOutput(unittest.TestCase): diff --git a/archivebox/tests/test_machine_models.py b/archivebox/tests/test_machine_models.py index 325d483c..b50edcf1 100644 --- a/archivebox/tests/test_machine_models.py +++ b/archivebox/tests/test_machine_models.py @@ -269,12 +269,12 @@ class TestBinaryModel(TestCase): self.assertEqual(binary.status, Binary.StatusChoices.QUEUED) self.assertGreater(binary.modified_at, old_modified) - def test_binary_from_json_preserves_install_args_overrides(self): - """Binary.from_json() should persist canonical install_args overrides unchanged.""" + def test_binary_from_json_preserves_provider_overrides(self): + """Binary.from_json() should persist provider overrides unchanged.""" overrides = { "apt": {"install_args": ["chromium"]}, "npm": {"install_args": "puppeteer"}, - "custom": {"install_args": ["bash", "-lc", "echo ok"]}, + "custom": {"install": "bash -lc 'echo ok'"}, } binary = Binary.from_json( diff --git a/archivebox/tests/test_process_service.py b/archivebox/tests/test_process_service.py index 89ddd88f..577f8eb1 100644 --- a/archivebox/tests/test_process_service.py +++ b/archivebox/tests/test_process_service.py @@ -1,69 +1,4 @@ -import asyncio -import json - import pytest -from abx_dl.events import ProcessStartedEvent, ProcessStdoutEvent -from abx_dl.orchestrator import create_bus - pytestmark = pytest.mark.django_db - - -def test_process_service_emits_process_started_from_inline_process_event(monkeypatch): - from archivebox.services import process_service as process_service_module - from archivebox.services.process_service import ProcessService - - bus = create_bus(name="test_process_service_inline_process_event") - ProcessService(bus) - - monkeypatch.setattr( - process_service_module, - "_ensure_worker", - lambda event: { - "pid": 4321, - "start": 1711111111.0, - "statename": "RUNNING", - "exitstatus": 0, - }, - ) - - async def run_test(): - await bus.emit( - ProcessStdoutEvent( - line=json.dumps( - { - "type": "ProcessEvent", - "plugin_name": "search_backend_sonic", - "hook_name": "worker_sonic", - "hook_path": "/usr/bin/sonic", - "hook_args": ["-c", "/tmp/sonic/config.cfg"], - "is_background": True, - "daemon": True, - "url": "tcp://127.0.0.1:1491", - "output_dir": "/tmp/sonic", - "env": {}, - "process_type": "worker", - "worker_type": "sonic", - "process_id": "worker:sonic", - "output_str": "127.0.0.1:1491", - }, - ), - plugin_name="search_backend_sonic", - hook_name="on_CrawlSetup__55_sonic_start.py", - output_dir="/tmp/search_backend_sonic", - snapshot_id="snap-1", - process_id="proc-hook", - ), - ) - started = await bus.find(ProcessStartedEvent, process_id="worker:sonic") - await bus.stop() - return started - - started = asyncio.run(run_test()) - assert started is not None - assert started.hook_name == "worker_sonic" - assert started.process_type == "worker" - assert started.worker_type == "sonic" - assert getattr(started, "url", "") == "tcp://127.0.0.1:1491" - assert getattr(started, "output_str", "") == "127.0.0.1:1491" diff --git a/archivebox/tests/test_runner.py b/archivebox/tests/test_runner.py index 21835ba4..2af22c73 100644 --- a/archivebox/tests/test_runner.py +++ b/archivebox/tests/test_runner.py @@ -34,18 +34,6 @@ class _DummyService: pass -class _DummyAbxServices: - def __init__(self): - self.process = SimpleNamespace(wait_for_background_monitors=self._wait) - - async def _wait(self): - return None - - -async def _call_sync(func, *args, **kwargs): - return func(*args, **kwargs) - - def test_run_snapshot_reuses_crawl_bus_for_all_snapshots(monkeypatch): from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.crawls.models import Crawl @@ -82,18 +70,18 @@ def test_run_snapshot_reuses_crawl_bus_for_all_snapshots(monkeypatch): monkeypatch.setattr(runner_module, "CrawlService", _DummyService) monkeypatch.setattr(runner_module, "SnapshotService", _DummyService) monkeypatch.setattr(runner_module, "ArchiveResultService", _DummyService) - monkeypatch.setattr(runner_module, "setup_abx_services", lambda *args, **kwargs: _DummyAbxServices()) + monkeypatch.setattr(runner_module, "setup_abx_services", lambda *args, **kwargs: None) download_calls = [] - async def fake_download(*, url, bus, snapshot, **kwargs): + async def fake_download(*, url, bus, config_overrides, **kwargs): + extra_context = json.loads(config_overrides["EXTRA_CONTEXT"]) download_calls.append( { "url": url, "bus": bus, - "snapshot_id": snapshot.id, - "source_url": snapshot.url, - "abx_snapshot_id": snapshot.id, + "snapshot_id": extra_context["snapshot_id"], + "source_url": url, }, ) await asyncio.sleep(0) @@ -113,9 +101,8 @@ def test_run_snapshot_reuses_crawl_bus_for_all_snapshots(monkeypatch): "created_at": snapshot_a.created_at.isoformat() if snapshot_a.created_at else "", "tags": snapshot_a.tags_str(), "depth": snapshot_a.depth, - "parent_snapshot_id": str(snapshot_a.parent_snapshot_id) if snapshot_a.parent_snapshot_id else None, "output_dir": str(snapshot_a.output_dir), - "config": crawl_runner._snapshot_config(snapshot_a), + "config": crawl_runner.load_snapshot_payload(str(snapshot_a.id))["config"], }, str(snapshot_b.id): { "id": str(snapshot_b.id), @@ -127,17 +114,16 @@ def test_run_snapshot_reuses_crawl_bus_for_all_snapshots(monkeypatch): "created_at": snapshot_b.created_at.isoformat() if snapshot_b.created_at else "", "tags": snapshot_b.tags_str(), "depth": snapshot_b.depth, - "parent_snapshot_id": str(snapshot_b.parent_snapshot_id) if snapshot_b.parent_snapshot_id else None, "output_dir": str(snapshot_b.output_dir), - "config": crawl_runner._snapshot_config(snapshot_b), + "config": crawl_runner.load_snapshot_payload(str(snapshot_b.id))["config"], }, } - monkeypatch.setattr(crawl_runner, "_load_snapshot_run_data", lambda snapshot_id: snapshot_data[snapshot_id]) + monkeypatch.setattr(crawl_runner, "load_snapshot_payload", lambda snapshot_id: snapshot_data[snapshot_id]) async def run_both(): await asyncio.gather( - crawl_runner._run_snapshot(str(snapshot_a.id)), - crawl_runner._run_snapshot(str(snapshot_b.id)), + crawl_runner.run_snapshot(str(snapshot_a.id)), + crawl_runner.run_snapshot(str(snapshot_b.id)), ) asyncio.run(run_both()) @@ -243,10 +229,10 @@ def test_runner_prepare_refreshes_network_interface_and_attaches_current_process refresh_calls = [] monkeypatch.setattr(NetworkInterface, "current", classmethod(lambda cls, refresh=False: refresh_calls.append(refresh) or _Iface())) monkeypatch.setattr(Process, "current", classmethod(lambda cls: proc)) - monkeypatch.setattr(configset_module, "get_config", lambda **kwargs: {}) + monkeypatch.setattr(configset_module, "get_config", lambda **kwargs: {"PLUGINS": "", "CHROME_BINARY": "", "TIMEOUT": 60}) crawl_runner = runner_module.CrawlRunner(crawl) - crawl_runner._prepare() + crawl_runner.load_run_state() assert refresh_calls == [True] assert proc.iface is not None @@ -254,10 +240,12 @@ def test_runner_prepare_refreshes_network_interface_and_attaches_current_process assert saved_updates == [("iface", "machine", "modified_at")] -def test_installed_binary_config_overrides_include_valid_installed_binaries(monkeypatch): - from archivebox.machine.models import Binary, Machine +def test_load_run_state_uses_machine_config_as_derived_config(monkeypatch): + from archivebox.machine.models import Machine, NetworkInterface, Process from archivebox.services import runner as runner_module - from abx_dl.models import Plugin + from archivebox.config import configset as configset_module + from archivebox.base_models.models import get_or_create_system_user_pk + from archivebox.crawls.models import Crawl machine = Machine.objects.create( guid="test-guid-runner-overrides", @@ -273,143 +261,30 @@ def test_installed_binary_config_overrides_include_valid_installed_binaries(monk os_release="14.0", os_kernel="Darwin", stats={}, - config={}, + config={"WGET_BINARY": "/tmp/wget", "ABX_INSTALL_CACHE": {"wget": "2026-03-24T00:00:00+00:00"}}, ) - mercury_binary = Binary.objects.create( - machine=machine, - name="postlight-parser", - abspath=sys.executable, - version="2.0.0", - binprovider="pip", - binproviders="env,pip", - status=Binary.StatusChoices.INSTALLED, - ) - wget_binary = Binary.objects.create( - machine=machine, - name="wget", - abspath="/tmp/not-an-executable", - version="1.0.0", - binprovider="env", - binproviders="env", - status=Binary.StatusChoices.INSTALLED, - ) - puppeteer_binary = Binary.objects.create( - machine=machine, - name="puppeteer", - abspath="/tmp/shared-lib/npm/node_modules/.bin/puppeteer", - version="24.40.0", - binprovider="npm", - binproviders="npm", - status=Binary.StatusChoices.INSTALLED, - ) - ytdlp_binary = Binary.objects.create( - machine=machine, - name="yt-dlp", - abspath="/tmp/shared-lib/pip/venv/bin/yt-dlp", - version="2026.3.17", - binprovider="pip", - binproviders="pip", - status=Binary.StatusChoices.INSTALLED, + crawl = Crawl.objects.create( + urls="https://example.com", + created_by_id=get_or_create_system_user_pk(), ) + proc = SimpleNamespace(iface_id=str(machine.id), machine_id=str(machine.id), iface=None, machine=machine, save=lambda **kwargs: None) - monkeypatch.setattr(Machine, "current", classmethod(lambda cls: machine)) monkeypatch.setattr( - Path, - "is_file", - lambda self: ( - str(self) in {sys.executable, mercury_binary.abspath, wget_binary.abspath, puppeteer_binary.abspath, ytdlp_binary.abspath} - ), + NetworkInterface, + "current", + classmethod(lambda cls, refresh=False: SimpleNamespace(id=machine.id, machine=machine)), ) - monkeypatch.setattr( - runner_module.os, - "access", - lambda path, mode: str(path) in {sys.executable, puppeteer_binary.abspath, ytdlp_binary.abspath}, - ) - - overrides = runner_module._installed_binary_config_overrides( - { - "mercury": Plugin( - name="mercury", - path=Path("."), - hooks=[], - config_schema={"MERCURY_BINARY": {"type": "string", "default": "postlight-parser"}}, - ), - }, - ) - - assert overrides["MERCURY_BINARY"] == sys.executable - assert "POSTLIGHT_PARSER_BINARY" not in overrides - assert "WGET_BINARY" not in overrides - assert overrides["LIB_DIR"] == "/tmp/shared-lib" - assert overrides["LIB_BIN_DIR"] == "/tmp/shared-lib/bin" - assert overrides["PIP_HOME"] == "/tmp/shared-lib/pip" - assert overrides["PIP_BIN_DIR"] == "/tmp/shared-lib/pip/venv/bin" - assert overrides["NPM_HOME"] == "/tmp/shared-lib/npm" - assert overrides["NPM_BIN_DIR"] == "/tmp/shared-lib/npm/node_modules/.bin" - assert overrides["NODE_MODULES_DIR"] == "/tmp/shared-lib/npm/node_modules" - assert overrides["NODE_MODULE_DIR"] == "/tmp/shared-lib/npm/node_modules" - assert overrides["NODE_PATH"] == "/tmp/shared-lib/npm/node_modules" - - -def test_installed_binary_config_overrides_do_not_map_hardcoded_artifacts_to_configurable_binary_keys(monkeypatch): - from archivebox.machine.models import Binary, Machine - from archivebox.services import runner as runner_module - from abx_dl.models import Plugin - - machine = Machine.objects.create( - guid="test-guid-runner-singlefile-cache", - hostname="runner-host-singlefile", - hw_in_docker=False, - hw_in_vm=False, - hw_manufacturer="Test", - hw_product="Test Product", - hw_uuid="test-hw-runner-singlefile-cache", - os_arch="arm64", - os_family="darwin", - os_platform="macOS", - os_release="14.0", - os_kernel="Darwin", - stats={}, - config={}, - ) - singlefile_extension = Binary.objects.create( - machine=machine, - name="singlefile", - abspath="/tmp/shared-lib/bin/singlefile", - version="1.0.0", - binprovider="chromewebstore", - binproviders="chromewebstore", - status=Binary.StatusChoices.INSTALLED, - ) - + monkeypatch.setattr(Process, "current", classmethod(lambda cls: proc)) monkeypatch.setattr(Machine, "current", classmethod(lambda cls: machine)) - monkeypatch.setattr(Path, "is_file", lambda self: str(self) == singlefile_extension.abspath) - monkeypatch.setattr(runner_module.os, "access", lambda path, mode: str(path) == singlefile_extension.abspath) + monkeypatch.setattr(configset_module, "get_config", lambda **kwargs: {"PLUGINS": "", "CHROME_BINARY": "", "TIMEOUT": 60}) - overrides = runner_module._installed_binary_config_overrides( - { - "singlefile": Plugin( - name="singlefile", - path=Path("."), - hooks=[], - config_schema={"SINGLEFILE_BINARY": {"type": "string", "default": "single-file"}}, - binaries=[ - {"name": "{SINGLEFILE_BINARY}", "binproviders": "env,npm"}, - {"name": "singlefile", "binproviders": "chromewebstore"}, - ], - ), - }, - config={"SINGLEFILE_BINARY": "single-file"}, - ) + crawl_runner = runner_module.CrawlRunner(crawl) + crawl_runner.load_run_state() - assert "SINGLEFILE_BINARY" not in overrides - assert "LIB_DIR" not in overrides - assert "LIB_BIN_DIR" not in overrides + assert crawl_runner.derived_config == machine.config -def test_run_snapshot_skips_descendant_when_max_size_already_reached(monkeypatch): - import asgiref.sync - +def test_run_snapshot_skips_descendant_when_max_size_already_reached(monkeypatch, tmp_path): from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.crawls.models import Crawl from archivebox.services import runner as runner_module @@ -428,12 +303,6 @@ def test_run_snapshot_skips_descendant_when_max_size_already_reached(monkeypatch monkeypatch.setattr(runner_module, "CrawlService", _DummyService) monkeypatch.setattr(runner_module, "SnapshotService", _DummyService) monkeypatch.setattr(runner_module, "ArchiveResultService", _DummyService) - monkeypatch.setattr(runner_module, "_limit_stop_reason", lambda config: "max_size") - monkeypatch.setattr( - asgiref.sync, - "sync_to_async", - lambda func, thread_sensitive=True: lambda *args, **kwargs: _call_sync(func, *args, **kwargs), - ) monkeypatch.setattr( runner_module, "download", @@ -441,8 +310,21 @@ def test_run_snapshot_skips_descendant_when_max_size_already_reached(monkeypatch ) crawl_runner = runner_module.CrawlRunner(crawl) + state_dir = tmp_path / ".abx-dl" + state_dir.mkdir(parents=True, exist_ok=True) + (state_dir / "limits.json").write_text( + json.dumps( + { + "admitted_snapshot_ids": ["child-1"], + "counted_process_ids": ["proc-1"], + "total_size": 32, + "stop_reason": "max_size", + }, + ), + encoding="utf-8", + ) cancelled: list[str] = [] - crawl_runner._load_snapshot_run_data = lambda snapshot_id: { + crawl_runner.load_snapshot_payload = lambda snapshot_id: { "id": snapshot_id, "url": "https://example.com/child", "title": "", @@ -452,22 +334,23 @@ def test_run_snapshot_skips_descendant_when_max_size_already_reached(monkeypatch "tags": "", "depth": 1, "status": "queued", - "parent_snapshot_id": None, "output_dir": "/tmp/child", - "config": {"CRAWL_DIR": "/tmp/crawl", "MAX_SIZE": 16}, + "config": {"CRAWL_DIR": str(tmp_path), "MAX_SIZE": 16}, } - crawl_runner._cancel_snapshot_due_to_limit = lambda snapshot_id: cancelled.append(snapshot_id) + crawl_runner.seal_snapshot_due_to_limit = lambda snapshot_id: cancelled.append(snapshot_id) - asyncio.run(crawl_runner._run_snapshot("child-1")) + asyncio.run(crawl_runner.run_snapshot("child-1")) assert cancelled == ["child-1"] +@pytest.mark.django_db(transaction=True) def test_seal_snapshot_cancels_queued_descendants_after_max_size(): from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.crawls.models import Crawl from archivebox.core.models import Snapshot from archivebox.services.snapshot_service import SnapshotService + from abx_dl.events import SnapshotCompletedEvent from abx_dl.orchestrator import create_bus crawl = Crawl.objects.create( @@ -505,13 +388,22 @@ def test_seal_snapshot_cancels_queued_descendants_after_max_size(): bus = create_bus(name="test_snapshot_limit_cancel") service = SnapshotService(bus, crawl_id=str(crawl.id), schedule_snapshot=lambda snapshot_id: None) try: - sealed_id = service._seal_snapshot(str(root.id)) + + async def emit_event() -> None: + await service.on_SnapshotCompletedEvent( + SnapshotCompletedEvent( + url=root.url, + snapshot_id=str(root.id), + output_dir=str(root.output_dir), + ), + ) + + asyncio.run(emit_event()) finally: asyncio.run(bus.stop()) root.refresh_from_db() child.refresh_from_db() - assert sealed_id == str(root.id) assert root.status == Snapshot.StatusChoices.SEALED assert child.status == Snapshot.StatusChoices.SEALED assert child.retry_at is None @@ -548,7 +440,6 @@ def test_create_crawl_api_queues_crawl_without_spawning_runner(monkeypatch): def test_crawl_runner_does_not_seal_unfinished_crawl(monkeypatch): - import asgiref.sync from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.crawls.models import Crawl from archivebox.core.models import Snapshot @@ -565,35 +456,23 @@ def test_crawl_runner_does_not_seal_unfinished_crawl(monkeypatch): status=Snapshot.StatusChoices.STARTED, ) - monkeypatch.setattr(runner_module, "_attach_bus_trace", lambda bus: None) - monkeypatch.setattr(runner_module, "_stop_bus_trace", lambda bus: asyncio.sleep(0)) - monkeypatch.setattr(runner_module, "setup_abx_services", lambda *args, **kwargs: _DummyAbxServices()) - monkeypatch.setenv("DJANGO_ALLOW_ASYNC_UNSAFE", "true") - monkeypatch.setattr( - asgiref.sync, - "sync_to_async", - lambda func, thread_sensitive=True: lambda *args, **kwargs: _call_sync(func, *args, **kwargs), - ) - monkeypatch.setattr(Crawl.objects, "get", lambda id: crawl) - monkeypatch.setattr(crawl, "is_finished", lambda: False) - monkeypatch.setattr(crawl, "save", lambda *args, **kwargs: None) - monkeypatch.setattr(runner_module.CrawlRunner, "_prepare", lambda self: None) + monkeypatch.setattr(runner_module, "setup_abx_services", lambda *args, **kwargs: None) + monkeypatch.setattr(runner_module.CrawlRunner, "load_run_state", lambda self: [str(snapshot.id)]) monkeypatch.setattr(runner_module.CrawlRunner, "_create_live_ui", lambda self: None) - monkeypatch.setattr(runner_module.CrawlRunner, "_initial_snapshot_ids", lambda self: [str(snapshot.id)]) - monkeypatch.setattr(runner_module.CrawlRunner, "_run_crawl_setup", lambda self, snapshot_id: asyncio.sleep(0)) + monkeypatch.setattr(runner_module.CrawlRunner, "run_crawl_setup", lambda self, snapshot_id: asyncio.sleep(0)) monkeypatch.setattr(runner_module.CrawlRunner, "enqueue_snapshot", lambda self, snapshot_id: asyncio.sleep(0)) - monkeypatch.setattr(runner_module.CrawlRunner, "_wait_for_snapshot_tasks", lambda self: asyncio.sleep(0)) - monkeypatch.setattr(runner_module.CrawlRunner, "_run_crawl_cleanup", lambda self, snapshot_id: asyncio.sleep(0)) - monkeypatch.setattr(runner_module.CrawlRunner, "_cleanup_persona", lambda self: None) + monkeypatch.setattr(runner_module.CrawlRunner, "wait_for_snapshot_tasks", lambda self: asyncio.sleep(0)) + monkeypatch.setattr(runner_module.CrawlRunner, "run_crawl_cleanup", lambda self, snapshot_id: asyncio.sleep(0)) + monkeypatch.setattr(runner_module.CrawlRunner, "finalize_run_state", lambda self: None) asyncio.run(runner_module.CrawlRunner(crawl, snapshot_ids=[str(snapshot.id)]).run()) + crawl.refresh_from_db() assert crawl.status != Crawl.StatusChoices.SEALED assert crawl.retry_at is not None -def test_crawl_runner_finalizes_with_sync_to_async_for_is_finished(monkeypatch): - import asgiref.sync +def test_crawl_runner_calls_load_and_finalize_run_state(monkeypatch): from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.crawls.models import Crawl from archivebox.core.models import Snapshot @@ -618,50 +497,34 @@ def test_crawl_runner_finalizes_with_sync_to_async_for_is_finished(monkeypatch): monkeypatch.setattr(runner_module, "CrawlService", _DummyService) monkeypatch.setattr(runner_module, "SnapshotService", _DummyService) monkeypatch.setattr(runner_module, "ArchiveResultService", _DummyService) - monkeypatch.setattr(runner_module, "_attach_bus_trace", lambda bus: None) - monkeypatch.setattr(runner_module, "_stop_bus_trace", lambda bus: asyncio.sleep(0)) - monkeypatch.setattr(runner_module, "setup_abx_services", lambda *args, **kwargs: _DummyAbxServices()) - monkeypatch.setattr(Crawl.objects, "get", lambda id: crawl) - monkeypatch.setattr(crawl, "save", lambda *args, **kwargs: None) - monkeypatch.setattr(crawl, "cleanup", lambda: None) - monkeypatch.setattr(runner_module.CrawlRunner, "_prepare", lambda self: None) + monkeypatch.setattr(runner_module, "setup_abx_services", lambda *args, **kwargs: None) + monkeypatch.setattr(runner_module.CrawlRunner, "load_run_state", lambda self: [str(snapshot.id)]) monkeypatch.setattr(runner_module.CrawlRunner, "_create_live_ui", lambda self: None) - monkeypatch.setattr(runner_module.CrawlRunner, "_initial_snapshot_ids", lambda self: [str(snapshot.id)]) - monkeypatch.setattr(runner_module.CrawlRunner, "_run_crawl_setup", lambda self, snapshot_id: asyncio.sleep(0)) + monkeypatch.setattr(runner_module.CrawlRunner, "run_crawl_setup", lambda self, snapshot_id: asyncio.sleep(0)) monkeypatch.setattr(runner_module.CrawlRunner, "enqueue_snapshot", lambda self, snapshot_id: asyncio.sleep(0)) - monkeypatch.setattr(runner_module.CrawlRunner, "_wait_for_snapshot_tasks", lambda self: asyncio.sleep(0)) - monkeypatch.setattr(runner_module.CrawlRunner, "_run_crawl_cleanup", lambda self, snapshot_id: asyncio.sleep(0)) - monkeypatch.setattr(runner_module.CrawlRunner, "_cleanup_persona", lambda self: None) + monkeypatch.setattr(runner_module.CrawlRunner, "wait_for_snapshot_tasks", lambda self: asyncio.sleep(0)) + monkeypatch.setattr(runner_module.CrawlRunner, "run_crawl_cleanup", lambda self, snapshot_id: asyncio.sleep(0)) + monkeypatch.setenv("DJANGO_ALLOW_ASYNC_UNSAFE", "true") - sync_to_async_wrapped: list[str] = [] - sync_to_async_active = False + method_calls: list[str] = [] - def fake_sync_to_async(func, thread_sensitive=True): - async def wrapper(*args, **kwargs): - nonlocal sync_to_async_active - sync_to_async_wrapped.append(getattr(func, "__name__", repr(func))) - previous = sync_to_async_active - sync_to_async_active = True - try: - return func(*args, **kwargs) - finally: - sync_to_async_active = previous + def wrapped_finalize(self): + method_calls.append("finalize_run_state") + return None - return wrapper + def wrapped_load(self): + method_calls.append("load_run_state") + return [str(snapshot.id)] - def guarded_is_finished(): - assert sync_to_async_active is True - return False - - monkeypatch.setattr(asgiref.sync, "sync_to_async", fake_sync_to_async) - monkeypatch.setattr(crawl, "is_finished", guarded_is_finished) + monkeypatch.setattr(runner_module.CrawlRunner, "finalize_run_state", wrapped_finalize) + monkeypatch.setattr(runner_module.CrawlRunner, "load_run_state", wrapped_load) asyncio.run(runner_module.CrawlRunner(crawl, snapshot_ids=[str(snapshot.id)]).run()) crawl.refresh_from_db() assert crawl.status == Crawl.StatusChoices.STARTED assert crawl.retry_at is not None - assert "guarded_is_finished" in sync_to_async_wrapped + assert method_calls == ["load_run_state", "finalize_run_state"] def test_wait_for_snapshot_tasks_surfaces_already_failed_task(): @@ -680,7 +543,7 @@ def test_wait_for_snapshot_tasks_surfaces_already_failed_task(): task.set_exception(RuntimeError("snapshot failed")) crawl_runner.snapshot_tasks["snap-1"] = task with pytest.raises(RuntimeError, match="snapshot failed"): - await crawl_runner._wait_for_snapshot_tasks() + await crawl_runner.wait_for_snapshot_tasks() asyncio.run(run_test()) @@ -702,14 +565,13 @@ def test_wait_for_snapshot_tasks_returns_after_completed_tasks_are_pruned(): async def run_test(): task = asyncio.create_task(finish_snapshot()) crawl_runner.snapshot_tasks["snap-1"] = task - await asyncio.wait_for(crawl_runner._wait_for_snapshot_tasks(), timeout=0.5) + await asyncio.wait_for(crawl_runner.wait_for_snapshot_tasks(), timeout=0.5) assert crawl_runner.snapshot_tasks == {} asyncio.run(run_test()) def test_crawl_runner_calls_crawl_cleanup_after_snapshot_phase(monkeypatch): - import asgiref.sync from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.crawls.models import Crawl from archivebox.core.models import Snapshot @@ -726,30 +588,18 @@ def test_crawl_runner_calls_crawl_cleanup_after_snapshot_phase(monkeypatch): status=Snapshot.StatusChoices.STARTED, ) - monkeypatch.setattr(runner_module, "_attach_bus_trace", lambda bus: None) - monkeypatch.setattr(runner_module, "_stop_bus_trace", lambda bus: asyncio.sleep(0)) - monkeypatch.setattr(runner_module, "setup_abx_services", lambda *args, **kwargs: _DummyAbxServices()) - monkeypatch.setenv("DJANGO_ALLOW_ASYNC_UNSAFE", "true") - monkeypatch.setattr( - asgiref.sync, - "sync_to_async", - lambda func, thread_sensitive=True: lambda *args, **kwargs: _call_sync(func, *args, **kwargs), - ) - monkeypatch.setattr(Crawl.objects, "get", lambda id: crawl) - monkeypatch.setattr(crawl, "is_finished", lambda: False) - monkeypatch.setattr(crawl, "save", lambda *args, **kwargs: None) - monkeypatch.setattr(runner_module.CrawlRunner, "_prepare", lambda self: None) + monkeypatch.setattr(runner_module, "setup_abx_services", lambda *args, **kwargs: None) + monkeypatch.setattr(runner_module.CrawlRunner, "load_run_state", lambda self: [str(snapshot.id)]) monkeypatch.setattr(runner_module.CrawlRunner, "_create_live_ui", lambda self: None) - monkeypatch.setattr(runner_module.CrawlRunner, "_initial_snapshot_ids", lambda self: [str(snapshot.id)]) - monkeypatch.setattr(runner_module.CrawlRunner, "_run_crawl_setup", lambda self, snapshot_id: asyncio.sleep(0)) + monkeypatch.setattr(runner_module.CrawlRunner, "run_crawl_setup", lambda self, snapshot_id: asyncio.sleep(0)) monkeypatch.setattr(runner_module.CrawlRunner, "enqueue_snapshot", lambda self, snapshot_id: asyncio.sleep(0)) - monkeypatch.setattr(runner_module.CrawlRunner, "_wait_for_snapshot_tasks", lambda self: asyncio.sleep(0)) - monkeypatch.setattr(runner_module.CrawlRunner, "_cleanup_persona", lambda self: None) + monkeypatch.setattr(runner_module.CrawlRunner, "wait_for_snapshot_tasks", lambda self: asyncio.sleep(0)) + monkeypatch.setattr(runner_module.CrawlRunner, "finalize_run_state", lambda self: None) cleanup_calls = [] monkeypatch.setattr( runner_module.CrawlRunner, - "_run_crawl_cleanup", + "run_crawl_cleanup", lambda self, snapshot_id: cleanup_calls.append("abx_cleanup") or asyncio.sleep(0), ) asyncio.run(runner_module.CrawlRunner(crawl, snapshot_ids=[str(snapshot.id)]).run()) @@ -757,17 +607,20 @@ def test_crawl_runner_calls_crawl_cleanup_after_snapshot_phase(monkeypatch): assert cleanup_calls == ["abx_cleanup"] -def test_abx_process_service_background_monitor_finishes_after_process_exit(monkeypatch, tmp_path): +def test_abx_process_service_background_process_finishes_after_process_exit(monkeypatch, tmp_path): from abx_dl.models import Process as AbxProcess, now_iso from abx_dl.services.process_service import ProcessService - from abx_dl.events import ProcessCompletedEvent + from abx_dl.events import ProcessCompletedEvent, ProcessStartedEvent service = object.__new__(ProcessService) service.emit_jsonl = False emitted_events = [] - async def fake_emit_event(event, *, detach_from_parent): - emitted_events.append((event, detach_from_parent)) + class FakeBus: + async def emit(self, event): + emitted_events.append(event) + + service.bus = FakeBus() async def fake_stream_stdout(**kwargs): try: @@ -775,19 +628,8 @@ def test_abx_process_service_background_monitor_finishes_after_process_exit(monk except asyncio.CancelledError: return ["daemon output\n"] - service._emit_event = fake_emit_event monkeypatch.setattr(service, "_stream_stdout", fake_stream_stdout) - class FakeAsyncProcess: - def __init__(self): - self.pid = 42424 - self.returncode = None - - async def wait(self): - await asyncio.sleep(0) - self.returncode = 0 - return 0 - plugin_output_dir = tmp_path / "chrome" plugin_output_dir.mkdir() stdout_file = plugin_output_dir / "on_CrawlSetup__90_chrome_launch.daemon.bg.stdout.log" @@ -804,41 +646,45 @@ def test_abx_process_service_background_monitor_finishes_after_process_exit(monk plugin="chrome", hook_name="on_CrawlSetup__90_chrome_launch.daemon.bg", ) - process = FakeAsyncProcess() - event = SimpleNamespace( - plugin_name="chrome", - hook_name="on_CrawlSetup__90_chrome_launch.daemon.bg", - hook_path="hook", - hook_args=["--url=https://example.org/"], - env={}, - output_dir=str(plugin_output_dir), - timeout=60, - snapshot_id="snap-1", - is_background=True, - url="https://example.org/", - process_type="hook", - worker_type="hook", - ) async def run_test(): + process = await asyncio.create_subprocess_exec( + sys.executable, + "-c", + "pass", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + event = ProcessStartedEvent( + plugin_name="chrome", + hook_name="on_CrawlSetup__90_chrome_launch.daemon.bg", + hook_path="hook", + hook_args=["--url=https://example.org/"], + env={}, + output_dir=str(plugin_output_dir), + timeout=60, + pid=process.pid, + is_background=True, + url="https://example.org/", + process_type="hook", + worker_type="hook", + start_ts=proc.started_at or "", + subprocess=process, + stdout_file=stdout_file, + stderr_file=stderr_file, + pid_file=pid_file, + cmd_file=plugin_output_dir / "on_CrawlSetup__90_chrome_launch.daemon.bg.sh", + files_before=set(), + ) await asyncio.wait_for( - service._monitor_background_process( - event=event, - proc=proc, - process=process, - plugin_output_dir=plugin_output_dir, - stdout_file=stdout_file, - stderr_file=stderr_file, - pid_file=pid_file, - files_before=set(), - ), + service.on_ProcessStartedEvent(event), timeout=0.5, ) asyncio.run(run_test()) assert pid_file.exists() is False - assert any(isinstance(event, ProcessCompletedEvent) for event, _ in emitted_events) + assert any(isinstance(event, ProcessCompletedEvent) for event in emitted_events) def test_run_pending_crawls_runs_due_snapshot_in_place(monkeypatch): diff --git a/archivebox/tests/test_tag_service.py b/archivebox/tests/test_tag_service.py new file mode 100644 index 00000000..defa6f4d --- /dev/null +++ b/archivebox/tests/test_tag_service.py @@ -0,0 +1,48 @@ +import asyncio + +import pytest + +from abx_dl.events import TagEvent +from abx_dl.orchestrator import create_bus + + +pytestmark = pytest.mark.django_db(transaction=True) + + +def _create_snapshot(): + from archivebox.base_models.models import get_or_create_system_user_pk + from archivebox.crawls.models import Crawl + from archivebox.core.models import Snapshot + + crawl = Crawl.objects.create( + urls="https://example.com", + created_by_id=get_or_create_system_user_pk(), + ) + return Snapshot.objects.create( + url="https://example.com", + crawl=crawl, + status=Snapshot.StatusChoices.STARTED, + ) + + +def test_tag_event_projects_tag_to_snapshot(): + from archivebox.core.models import Tag + from archivebox.services.tag_service import TagService + + snapshot = _create_snapshot() + bus = create_bus(name="test_tag_service") + TagService(bus) + + async def emit_tag_event() -> None: + await bus.emit( + TagEvent( + name="example", + snapshot_id=str(snapshot.id), + ), + ) + + asyncio.run(emit_tag_event()) + + snapshot.refresh_from_db() + assert snapshot.tags.filter(name="example").exists() + assert Tag.objects.filter(name="example").exists() diff --git a/docs b/docs index be25d9bf..7244076e 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit be25d9bfa2d0f98b6b5b788c43d9629d1b31d217 +Subproject commit 7244076ecec0264dddfba14930f5f8bfe4fb4ef0 diff --git a/old/TODO_hook_architecture.md b/old/TODO_hook_architecture.md index 00f3b86a..bb6b87cc 100755 --- a/old/TODO_hook_architecture.md +++ b/old/TODO_hook_architecture.md @@ -42,7 +42,7 @@ Crawl.run() {'type': 'Dependency', 'bin_name': 'wget', 'bin_providers': 'apt,brew', 'overrides': {...}} # ❌ WRONG - uses different field names - {'type': 'Dependency', 'name': 'wget', 'providers': 'apt,brew', 'custom_cmds': {...}} + {'type': 'Dependency', 'name': 'wget', 'providers': 'apt,brew', 'overrides': {...}} ``` 4. **No hardcoding** - Never hardcode binary names, provider names, or anything else. Use discovery. @@ -84,7 +84,7 @@ Crawl.run() # ❌ WRONG - complex transformation logic if obj.get('type') == 'Dependency': dep = Dependency.objects.create(name=obj['bin_name']) # renaming fields - dep.custom_commands = transform_overrides(obj['overrides']) # transforming data + dep.overrides = transform_overrides(obj['overrides']) # transforming data ``` ### Pattern Consistency diff --git a/pyproject.toml b/pyproject.toml index e805ebe3..1ccff1f3 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -159,6 +159,11 @@ environments = ["sys_platform == 'darwin'", "sys_platform == 'linux'"] package = true # compile-bytecode = true +[tool.uv.sources] +abx-pkg = { path = "../abx-pkg", editable = true } +abx-plugins = { path = "../abx-plugins", editable = true } +abx-dl = { path = "../abx-dl", editable = true } + [build-system] requires = ["pdm-backend"] build-backend = "pdm.backend"