From c2afb4035087fbc5c017d5a13216f3d82af2e1d9 Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Thu, 1 Jan 2026 16:58:47 -0800 Subject: [PATCH] fix lib bin dir and archivebox add hanging --- archivebox/cli/archivebox_add.py | 2 +- archivebox/config/common.py | 4 ++ archivebox/config/constants.py | 3 +- archivebox/config/paths.py | 5 +++ archivebox/hooks.py | 24 ++++++++++- .../screenshot/tests/test_screenshot.py | 4 +- archivebox/workers/orchestrator.py | 41 +++++++++---------- archivebox/workers/worker.py | 12 +++++- 8 files changed, 69 insertions(+), 26 deletions(-) diff --git a/archivebox/cli/archivebox_add.py b/archivebox/cli/archivebox_add.py index ce255b04..5043f3ed 100644 --- a/archivebox/cli/archivebox_add.py +++ b/archivebox/cli/archivebox_add.py @@ -131,7 +131,7 @@ def add(urls: str | list[str], else: # Foreground mode: run orchestrator inline until all work is done print(f'[green]\\[*] Starting orchestrator to process crawl...[/green]') - orchestrator = Orchestrator(exit_on_idle=True) + orchestrator = Orchestrator(exit_on_idle=True, crawl_id=str(crawl.id)) orchestrator.runloop() # Block until complete # 6. Return the list of Snapshots in this crawl diff --git a/archivebox/config/common.py b/archivebox/config/common.py index 0c457b7d..edf7b602 100644 --- a/archivebox/config/common.py +++ b/archivebox/config/common.py @@ -66,6 +66,10 @@ class StorageConfig(BaseConfigSet): # should not be a remote/network/FUSE mount for speed reasons, otherwise extractors will be slow LIB_DIR: Path = Field(default=CONSTANTS.DEFAULT_LIB_DIR) + # LIB_BIN_DIR is where all installed binaries are symlinked for easy PATH management + # Derived from LIB_DIR / 'bin', should be prepended to PATH for all hook executions + LIB_BIN_DIR: Path = Field(default=CONSTANTS.DEFAULT_LIB_BIN_DIR) + # CUSTOM_TEMPLATES_DIR allows users to override default templates # defaults to DATA_DIR / 'user_templates' but can be configured CUSTOM_TEMPLATES_DIR: Path = Field(default=CONSTANTS.CUSTOM_TEMPLATES_DIR) diff --git a/archivebox/config/constants.py b/archivebox/config/constants.py index a863837c..607ff2e7 100644 --- a/archivebox/config/constants.py +++ b/archivebox/config/constants.py @@ -108,9 +108,10 @@ class ConstantsDict(Mapping): # Runtime dirs TMP_DIR_NAME: str = 'tmp' DEFAULT_TMP_DIR: Path = DATA_DIR / TMP_DIR_NAME / MACHINE_ID # ./data/tmp/abc3244323 - + LIB_DIR_NAME: str = 'lib' DEFAULT_LIB_DIR: Path = DATA_DIR / LIB_DIR_NAME / MACHINE_TYPE # ./data/lib/arm64-linux-docker + DEFAULT_LIB_BIN_DIR: Path = DEFAULT_LIB_DIR / 'bin' # ./data/lib/arm64-linux-docker/bin # Config constants TIMEZONE: str = 'UTC' diff --git a/archivebox/config/paths.py b/archivebox/config/paths.py index 284c03da..74d50c86 100644 --- a/archivebox/config/paths.py +++ b/archivebox/config/paths.py @@ -288,6 +288,11 @@ def get_code_locations(): 'enabled': True, 'is_valid': os.path.isdir(STORAGE_CONFIG.LIB_DIR) and os.access(STORAGE_CONFIG.LIB_DIR, os.R_OK) and os.access(STORAGE_CONFIG.LIB_DIR, os.W_OK), # read + write }, + 'LIB_BIN_DIR': { + 'path': STORAGE_CONFIG.LIB_BIN_DIR.resolve(), + 'enabled': True, + 'is_valid': os.path.isdir(STORAGE_CONFIG.LIB_BIN_DIR) and os.access(STORAGE_CONFIG.LIB_BIN_DIR, os.R_OK) and os.access(STORAGE_CONFIG.LIB_BIN_DIR, os.W_OK), # read + write + }, }) diff --git a/archivebox/hooks.py b/archivebox/hooks.py index 116671ac..e6778670 100644 --- a/archivebox/hooks.py +++ b/archivebox/hooks.py @@ -328,6 +328,24 @@ def run_hook( env['ARCHIVE_DIR'] = str(getattr(settings, 'ARCHIVE_DIR', Path.cwd() / 'archive')) env.setdefault('MACHINE_ID', getattr(settings, 'MACHINE_ID', '') or os.environ.get('MACHINE_ID', '')) + # Get LIB_DIR and LIB_BIN_DIR from config + lib_dir = config.get('LIB_DIR', getattr(settings, 'LIB_DIR', None)) + lib_bin_dir = config.get('LIB_BIN_DIR', getattr(settings, 'LIB_BIN_DIR', None)) + if lib_dir: + env['LIB_DIR'] = str(lib_dir) + if not lib_bin_dir and lib_dir: + # Derive LIB_BIN_DIR from LIB_DIR if not set + lib_bin_dir = Path(lib_dir) / 'bin' + + # Prepend LIB_BIN_DIR to PATH so symlinked binaries take priority + if lib_bin_dir: + lib_bin_dir = str(lib_bin_dir) + env['LIB_BIN_DIR'] = lib_bin_dir + current_path = env.get('PATH', '') + # Only prepend if not already at the beginning + if not current_path.startswith(f'{lib_bin_dir}:'): + env['PATH'] = f'{lib_bin_dir}:{current_path}' if current_path else lib_bin_dir + # Use Machine.config.PATH if set (includes pip/npm bin dirs from providers) try: from archivebox.machine.models import Machine @@ -335,7 +353,11 @@ def run_hook( if machine and machine.config: machine_path = machine.config.get('config/PATH') if machine_path: - env['PATH'] = machine_path + # Prepend LIB_BIN_DIR to machine PATH as well + if lib_bin_dir and not machine_path.startswith(f'{lib_bin_dir}:'): + env['PATH'] = f'{lib_bin_dir}:{machine_path}' + else: + env['PATH'] = machine_path # Also set NODE_MODULES_DIR if configured node_modules_dir = machine.config.get('config/NODE_MODULES_DIR') if node_modules_dir: diff --git a/archivebox/plugins/screenshot/tests/test_screenshot.py b/archivebox/plugins/screenshot/tests/test_screenshot.py index 1aed35e6..be431803 100644 --- a/archivebox/plugins/screenshot/tests/test_screenshot.py +++ b/archivebox/plugins/screenshot/tests/test_screenshot.py @@ -60,13 +60,15 @@ def test_extracts_screenshot_from_example_com(): tmpdir = Path(tmpdir) # Run screenshot extraction hook + env = get_test_env() + print(f"\n[DEBUG] NODE_V8_COVERAGE={env.get('NODE_V8_COVERAGE', 'NOT SET')}", file=sys.stderr) result = subprocess.run( ['node', str(SCREENSHOT_HOOK), f'--url={TEST_URL}', '--snapshot-id=test789'], cwd=tmpdir, capture_output=True, text=True, timeout=120, - env=get_test_env() + env=env ) assert result.returncode == 0, f"Extraction failed: {result.stderr}" diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index 99d4e27e..7dbe9a0d 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -182,7 +182,7 @@ class Orchestrator: def spawn_worker(self, WorkerClass: Type[Worker]) -> int | None: """Spawn a new worker process. Returns PID or None if spawn failed.""" try: - pid = WorkerClass.start(daemon=False) + pid = WorkerClass.start(daemon=False, crawl_id=self.crawl_id) # CRITICAL: Block until worker registers itself in Process table # This prevents race condition where orchestrator spawns multiple workers @@ -248,11 +248,11 @@ class Orchestrator: for WorkerClass in self.WORKER_TYPES: # Get queue for this worker type # Need to instantiate worker to get queue (for model access) - worker = WorkerClass(worker_id=-1) # temp instance just for queue access + worker = WorkerClass(worker_id=-1, crawl_id=self.crawl_id) # temp instance just for queue access queue = worker.get_queue() queue_count = queue.count() queue_sizes[WorkerClass.name] = queue_count - + # Spawn worker if needed if self.should_spawn_worker(WorkerClass, queue_count): self.spawn_worker(WorkerClass) @@ -270,15 +270,26 @@ class Orchestrator: def has_future_work(self) -> bool: """Check if there's work scheduled for the future (retry_at > now).""" for WorkerClass in self.WORKER_TYPES: - worker = WorkerClass(worker_id=-1) + worker = WorkerClass(worker_id=-1, crawl_id=self.crawl_id) Model = worker.get_model() - # Check for items not in final state with future retry_at - future_count = Model.objects.filter( + + # Build filter for future work, respecting crawl_id if set + qs = Model.objects.filter( retry_at__gt=timezone.now() ).exclude( status__in=Model.FINAL_STATES - ).count() - if future_count > 0: + ) + + # Apply crawl_id filter if set + if self.crawl_id: + if WorkerClass.name == 'crawl': + qs = qs.filter(id=self.crawl_id) + elif WorkerClass.name == 'snapshot': + qs = qs.filter(crawl_id=self.crawl_id) + elif WorkerClass.name == 'archiveresult': + qs = qs.filter(snapshot__crawl_id=self.crawl_id) + + if qs.count() > 0: return True return False @@ -404,14 +415,6 @@ class Orchestrator: # Track which snapshots are still active active_ids = set() - # Debug: check for duplicates - snapshot_urls = [s.url for s in active_snapshots] - if len(active_snapshots) != len(set(snapshot_urls)): - # We have duplicate URLs - let's deduplicate by showing snapshot ID - show_id = True - else: - show_id = False - for snapshot in active_snapshots: active_ids.add(snapshot.id) @@ -442,11 +445,7 @@ class Orchestrator: # Build description with URL + current plugin url = snapshot.url[:50] + '...' if len(snapshot.url) > 50 else snapshot.url - if show_id: - # Show snapshot ID if there are duplicate URLs - description = f"[{str(snapshot.id)[:8]}] {url}{current_plugin}" - else: - description = f"{url}{current_plugin}" + description = f"{url}{current_plugin}" # Create or update task if snapshot.id not in task_ids: diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index 898c4210..7b1127cc 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -238,7 +238,7 @@ class Worker: log_worker_event( worker_type=worker_type_name, - event='Starting...', + event='Processing', indent_level=indent_level, pid=self.pid, worker_id=str(self.worker_id), @@ -365,6 +365,13 @@ class SnapshotWorker(Worker): from archivebox.core.models import Snapshot return Snapshot + def get_queue(self) -> QuerySet: + """Get queue of Snapshots ready for processing, optionally filtered by crawl_id.""" + qs = super().get_queue() + if self.crawl_id: + qs = qs.filter(crawl_id=self.crawl_id) + return qs + class ArchiveResultWorker(Worker): """Worker for processing ArchiveResult objects.""" @@ -392,6 +399,9 @@ class ArchiveResultWorker(Worker): qs = super().get_queue() + if self.crawl_id: + qs = qs.filter(snapshot__crawl_id=self.crawl_id) + if self.plugin: qs = qs.filter(plugin=self.plugin)