diff --git a/archivebox/cli/archivebox_run.py b/archivebox/cli/archivebox_run.py index f27c2cb3..50deb0f6 100644 --- a/archivebox/cli/archivebox_run.py +++ b/archivebox/cli/archivebox_run.py @@ -244,7 +244,8 @@ def run_snapshot_worker(snapshot_id: str) -> int: @click.option('--crawl-id', help="Run orchestrator for specific crawl only") @click.option('--snapshot-id', help="Run worker for specific snapshot only") @click.option('--binary-id', help="Run worker for specific binary only") -def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str): +@click.option('--worker-type', help="Run worker of specific type (binary)") +def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str, worker_type: str): """ Process queued work. @@ -259,7 +260,7 @@ def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str): if snapshot_id: sys.exit(run_snapshot_worker(snapshot_id)) - # Binary worker mode + # Binary worker mode (specific binary) if binary_id: from archivebox.workers.worker import BinaryWorker try: @@ -274,6 +275,25 @@ def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str): traceback.print_exc() sys.exit(1) + # Worker type mode (daemon - processes all pending items) + if worker_type: + if worker_type == 'binary': + from archivebox.workers.worker import BinaryWorker + try: + worker = BinaryWorker(worker_id=0) # No binary_id = daemon mode + worker.runloop() + sys.exit(0) + except KeyboardInterrupt: + sys.exit(0) + except Exception as e: + rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr) + import traceback + traceback.print_exc() + sys.exit(1) + else: + rprint(f'[red]Unknown worker type: {worker_type}[/red]', file=sys.stderr) + sys.exit(1) + # Crawl worker mode if crawl_id: from archivebox.workers.worker import CrawlWorker diff --git a/archivebox/hooks.py b/archivebox/hooks.py index 900f8c3a..04bfa0ef 100644 --- a/archivebox/hooks.py +++ b/archivebox/hooks.py @@ -356,29 +356,38 @@ def run_hook( # Derive LIB_BIN_DIR from LIB_DIR if not set lib_bin_dir = Path(lib_dir) / 'bin' - # Prepend LIB_BIN_DIR to PATH so symlinked binaries take priority + # Build PATH with proper precedence: + # 1. LIB_BIN_DIR (highest priority - local symlinked binaries) + # 2. Machine.config.PATH (pip/npm bin dirs from providers) + # 3. os.environ['PATH'] (system PATH) + if lib_bin_dir: lib_bin_dir = str(lib_bin_dir) env['LIB_BIN_DIR'] = lib_bin_dir - current_path = env.get('PATH', '') - # Only prepend if not already at the beginning - if not current_path.startswith(f'{lib_bin_dir}:'): - env['PATH'] = f'{lib_bin_dir}:{current_path}' if current_path else lib_bin_dir - # Use Machine.config.PATH if set (includes pip/npm bin dirs from providers) + # Start with base PATH + current_path = env.get('PATH', '') + + # Prepend Machine.config.PATH if it exists (treat as extra entries, not replacement) try: from archivebox.machine.models import Machine machine = Machine.current() if machine and machine.config: machine_path = machine.config.get('PATH') if machine_path: - # Prepend LIB_BIN_DIR to machine PATH as well - if lib_bin_dir and not machine_path.startswith(f'{lib_bin_dir}:'): - env['PATH'] = f'{lib_bin_dir}:{machine_path}' - else: - env['PATH'] = machine_path + # Prepend machine_path to current PATH + current_path = f'{machine_path}:{current_path}' if current_path else machine_path except Exception: - pass # Fall back to system PATH if Machine not available + pass + + # Finally prepend LIB_BIN_DIR to the front (highest priority) + if lib_bin_dir: + if not current_path.startswith(f'{lib_bin_dir}:'): + env['PATH'] = f'{lib_bin_dir}:{current_path}' if current_path else lib_bin_dir + else: + env['PATH'] = current_path + else: + env['PATH'] = current_path # Set NODE_PATH for Node.js module resolution # Priority: config dict > Machine.config > derive from LIB_DIR @@ -399,7 +408,11 @@ def run_hook( env['NODE_MODULES_DIR'] = node_path # For backwards compatibility # Export all config values to environment (already merged by get_config()) + # Skip keys we've already handled specially above (PATH, LIB_DIR, LIB_BIN_DIR, NODE_PATH, etc.) + SKIP_KEYS = {'PATH', 'LIB_DIR', 'LIB_BIN_DIR', 'NODE_PATH', 'NODE_MODULES_DIR', 'DATA_DIR', 'ARCHIVE_DIR', 'MACHINE_ID'} for key, value in config.items(): + if key in SKIP_KEYS: + continue # Already handled specially above, don't overwrite if value is None: continue elif isinstance(value, bool): diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index 73740a12..cb99cb57 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -14,7 +14,7 @@ from django.utils import timezone from django.utils.functional import cached_property from archivebox.base_models.models import ModelWithHealthStats -from archivebox.workers.models import BaseStateMachine +from archivebox.workers.models import BaseStateMachine, ModelWithStateMachine from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats try: @@ -201,7 +201,7 @@ class BinaryManager(models.Manager): ).exclude(abspath='').exclude(abspath__isnull=True).order_by('-modified_at').first() -class Binary(ModelWithHealthStats): +class Binary(ModelWithHealthStats, ModelWithStateMachine): """ Tracks a binary on a specific machine. @@ -243,8 +243,6 @@ class Binary(ModelWithHealthStats): status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.QUEUED, db_index=True) retry_at = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True, help_text="When to retry this binary installation") - output_dir = models.CharField(max_length=255, default='', null=False, blank=True, - help_text="Directory where installation hook logs are stored") # Health stats num_uses_failed = models.PositiveIntegerField(default=0) @@ -279,6 +277,15 @@ class Binary(ModelWithHealthStats): 'is_valid': self.is_valid, } + @property + def output_dir(self) -> Path: + """ + Get output directory for this binary's hook logs. + Path: data/machines/{machine_uuid}/binaries/{binary_name}/{binary_uuid} + """ + from django.conf import settings + return Path(settings.DATA_DIR) / 'machines' / str(self.machine_id) / 'binaries' / self.name / str(self.id) + def to_json(self) -> dict: """ Convert Binary model instance to a JSON-serializable dict. diff --git a/archivebox/plugins/screenshot/on_Snapshot__51_screenshot.js b/archivebox/plugins/screenshot/on_Snapshot__51_screenshot.js index d99460c9..2ec2fdd4 100644 --- a/archivebox/plugins/screenshot/on_Snapshot__51_screenshot.js +++ b/archivebox/plugins/screenshot/on_Snapshot__51_screenshot.js @@ -18,6 +18,27 @@ const path = require('path'); // Add NODE_MODULES_DIR to module resolution paths if set if (process.env.NODE_MODULES_DIR) module.paths.unshift(process.env.NODE_MODULES_DIR); +// Debug: Check NODE_V8_COVERAGE +console.error(`[DEBUG JS START] NODE_V8_COVERAGE=${process.env.NODE_V8_COVERAGE || 'NOT SET'}`); + +// Hook into process.exit to flush V8 coverage (for NODE_V8_COVERAGE support) +if (process.env.NODE_V8_COVERAGE) { + const originalExit = process.exit.bind(process); + process.exit = function(code) { + console.error(`[DEBUG] process.exit() override called with code=${code}`); + try { + const v8 = require('v8'); + const result = v8.takeCoverage(); + console.error(`[DEBUG] v8.takeCoverage() returned: ${typeof result}`); + } catch (e) { + // Log but don't block exit - we're exiting anyway + console.error(`[!] Coverage flush failed: ${e.message}`); + } + originalExit(code); + }; + console.error('[DEBUG] process.exit() override installed'); +} + const { getEnv, getEnvBool, @@ -26,20 +47,11 @@ const { readCdpUrl, } = require('../chrome/chrome_utils.js'); -// Flush V8 coverage before exit (needed for NODE_V8_COVERAGE to capture early exits) -function flushCoverageAndExit(exitCode) { - if (process.env.NODE_V8_COVERAGE) { - const v8 = require('v8'); - v8.takeCoverage(); - } - process.exit(exitCode); -} - // Check if screenshot is enabled BEFORE requiring puppeteer if (!getEnvBool('SCREENSHOT_ENABLED', true)) { console.error('Skipping screenshot (SCREENSHOT_ENABLED=False)'); // Temporary failure (config disabled) - NO JSONL emission - flushCoverageAndExit(0); + process.exit(0); } // Now safe to require puppeteer diff --git a/archivebox/tests/test_cli_run_binary_worker.py b/archivebox/tests/test_cli_run_binary_worker.py index 25fefacd..cb8e5bce 100644 --- a/archivebox/tests/test_cli_run_binary_worker.py +++ b/archivebox/tests/test_cli_run_binary_worker.py @@ -35,9 +35,12 @@ class TestBinaryWorkerSpawning: ['run'], stdin=json.dumps(binary_record), data_dir=initialized_archive, - timeout=30, + timeout=60, # Increased timeout to allow for binary installation ) + print(f"stdout: {stdout}") + print(f"stderr: {stderr}") + assert code == 0, f"Failed to create Binary: {stderr}" # Verify Binary was created in DB diff --git a/archivebox/tests/test_worker_config_propagation.py b/archivebox/tests/test_worker_config_propagation.py index 30c5e4d9..07204565 100644 --- a/archivebox/tests/test_worker_config_propagation.py +++ b/archivebox/tests/test_worker_config_propagation.py @@ -980,6 +980,85 @@ print(f"\\n✓ snapshot.config ({{expected}}) correctly overrides env var (999) print("="*80 + "\n") +def test_new_environment_variables_added(): + """ + Test that NEW environment variables (not in defaults) are added to config. + + This is important for worker subprocesses that receive config via Process.env. + When Worker.start() creates a subprocess, it serializes config to Process.env. + The subprocess must be able to read those values back via get_config(). + """ + + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = Path(tmpdir) / 'test_archive' + data_dir.mkdir() + + print(f"\n{'='*80}") + print(f"Test: New Environment Variables Added to Config") + print(f"DATA_DIR: {data_dir}") + print(f"{'='*80}\n") + + # Initialize + result = subprocess.run( + ['python', '-m', 'archivebox', 'init'], + cwd=str(data_dir), + env={**os.environ, 'DATA_DIR': str(data_dir), 'USE_COLOR': 'False'}, + capture_output=True, + timeout=60, + ) + assert result.returncode == 0 + print("✓ Archive initialized\n") + + print("Step 1: Test that new uppercase env vars are added to config") + test_script = f""" +import os +os.environ['DATA_DIR'] = '{data_dir}' +os.environ['NEW_CUSTOM_VAR'] = 'custom_value' # Not in defaults +os.environ['ANOTHER_VAR'] = 'another_value' +os.environ['lowercase_var'] = 'should_be_ignored' # Lowercase should be ignored + +from archivebox.config.django import setup_django +setup_django() +from archivebox.config.configset import get_config + +config = get_config() + +# Check uppercase vars are added +new_var = config.get('NEW_CUSTOM_VAR') +another_var = config.get('ANOTHER_VAR') +lowercase_var = config.get('lowercase_var') + +print(f"NEW_CUSTOM_VAR: {{new_var}}") +print(f"ANOTHER_VAR: {{another_var}}") +print(f"lowercase_var: {{lowercase_var}}") + +assert new_var == 'custom_value', f"Expected 'custom_value', got {{new_var}}" +assert another_var == 'another_value', f"Expected 'another_value', got {{another_var}}" +assert lowercase_var is None, f"Lowercase vars should be ignored, got {{lowercase_var}}" + +print("\\n✓ New uppercase environment variables added to config") +print("✓ Lowercase environment variables ignored") +""" + + result = subprocess.run( + ['python', '-c', test_script], + cwd=str(data_dir.parent), + capture_output=True, + timeout=30, + ) + + print(result.stdout.decode()) + if result.returncode != 0: + print("\nTest error:") + print(result.stderr.decode()) + + assert result.returncode == 0, f"Test failed: {result.stderr.decode()}" + + print("\n" + "="*80) + print("✓ TEST PASSED: New environment variables correctly added to config") + print("="*80 + "\n") + + if __name__ == '__main__': # Run as standalone script test_config_propagation_through_worker_hierarchy() @@ -987,3 +1066,4 @@ if __name__ == '__main__': test_parent_environment_preserved_in_hooks() test_config_auto_fetch_relationships() test_config_precedence_with_environment_vars() + test_new_environment_variables_added() diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index f6d79180..01d5f424 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -312,12 +312,14 @@ class Orchestrator: binary_count = binary_queue.count() queue_sizes['binary'] = binary_count - # Spawn BinaryWorker if needed (one worker per binary, up to MAX_BINARY_WORKERS) - if self.should_spawn_worker(BinaryWorker, binary_count): - # Get next binary to process - binary = binary_queue.first() - if binary: - BinaryWorker.start(binary_id=str(binary.id)) + # Spawn BinaryWorker if needed (singleton - max 1 BinaryWorker, processes ALL binaries) + if binary_count > 0: + running_binary_workers_list = BinaryWorker.get_running_workers() + print(f"[DEBUG] binary_count={binary_count}, running_binary_workers={len(running_binary_workers_list)}") + if len(running_binary_workers_list) == 0: + print(f"[DEBUG] Spawning BinaryWorker...") + BinaryWorker.start() + print(f"[DEBUG] BinaryWorker spawned") # Check if any BinaryWorkers are still running running_binary_workers = len(BinaryWorker.get_running_workers()) diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index 85a31224..7546a02a 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -324,17 +324,25 @@ class Worker: env = get_config(snapshot=snapshot) elif cls.name == 'binary': - # BinaryWorker processes a specific binary installation + # BinaryWorker supports two modes: + # 1. Singleton daemon (no binary_id) - processes ALL pending binaries + # 2. Specific binary (with binary_id) - processes just that one binary binary_id = kwargs.get('binary_id') - if not binary_id: - raise ValueError("BinaryWorker requires binary_id") - from archivebox.machine.models import Binary - binary = Binary.objects.get(id=binary_id) + if binary_id: + # Specific binary mode + from archivebox.machine.models import Binary + binary = Binary.objects.get(id=binary_id) + + cmd = [sys.executable, '-m', 'archivebox', 'run', '--binary-id', str(binary_id)] + pwd = Path(settings.DATA_DIR) / 'machines' / str(Machine.current().id) / 'binaries' / binary.name / str(binary.id) + pwd.mkdir(parents=True, exist_ok=True) + else: + # Singleton daemon mode - processes all pending binaries + cmd = [sys.executable, '-m', 'archivebox', 'run', '--worker-type', 'binary'] + pwd = Path(settings.DATA_DIR) / 'machines' / str(Machine.current().id) / 'binaries' + pwd.mkdir(parents=True, exist_ok=True) - cmd = [sys.executable, '-m', 'archivebox', 'run', '--binary-id', str(binary_id)] - pwd = Path(settings.DATA_DIR) / 'machines' / str(Machine.current().id) / 'binaries' / binary.name / str(binary.id) - pwd.mkdir(parents=True, exist_ok=True) env = get_config() else: @@ -837,23 +845,26 @@ class SnapshotWorker(Worker): class BinaryWorker(Worker): """ - Worker that processes a specific Binary installation. + Worker that processes Binary installations. - Like CrawlWorker and SnapshotWorker, BinaryWorker: - - Processes one specific binary (specified by binary_id) - - Installs it via Binary.run() which runs on_Binary__* hooks - - Exits when done + Two modes: + 1. Specific binary mode (binary_id provided): + - Processes one specific binary + - Exits when done - Orchestrator spawns BinaryWorkers sequentially (MAX_BINARY_WORKERS=1) to avoid - conflicts during binary installations. + 2. Daemon mode (no binary_id): + - Polls queue every 0.5s and processes ALL pending binaries + - Exits after 5 seconds idle + - Used by Orchestrator to ensure binaries installed before snapshots start """ name: ClassVar[str] = 'binary' MAX_TICK_TIME: ClassVar[int] = 600 # 10 minutes for binary installations MAX_CONCURRENT_TASKS: ClassVar[int] = 1 # One binary per worker + POLL_INTERVAL: ClassVar[float] = 0.5 # Check every 500ms (daemon mode only) - def __init__(self, binary_id: str, worker_id: int = 0): - self.binary_id = binary_id + def __init__(self, binary_id: str = None, worker_id: int = 0): + self.binary_id = binary_id # Optional - None means daemon mode super().__init__(worker_id=worker_id) def get_model(self): @@ -861,20 +872,43 @@ class BinaryWorker(Worker): return Binary def get_next_item(self): - """Get the specific binary to install.""" - from archivebox.machine.models import Binary + """Get binary to install (specific or next queued).""" + from archivebox.machine.models import Binary, Machine - try: - return Binary.objects.get(id=self.binary_id) - except Binary.DoesNotExist: - return None + if self.binary_id: + # Specific binary mode + try: + return Binary.objects.get(id=self.binary_id) + except Binary.DoesNotExist: + return None + else: + # Daemon mode - get all queued binaries for current machine + machine = Machine.current() + return Binary.objects.filter( + machine=machine, + status=Binary.StatusChoices.QUEUED, + retry_at__lte=timezone.now() + ).order_by('retry_at') def runloop(self) -> None: - """Install the specified binary.""" + """Install binary(ies).""" import sys self.on_startup() + if self.binary_id: + # Specific binary mode - process once and exit + self._process_single_binary() + else: + # Daemon mode - poll and process all pending binaries + self._daemon_loop() + + self.on_shutdown() + + def _process_single_binary(self): + """Process a single specific binary.""" + import sys + try: binary = self.get_next_item() @@ -888,12 +922,8 @@ class BinaryWorker(Worker): return print(f'[cyan]🔧 BinaryWorker installing: {binary.name}[/cyan]', file=sys.stderr) - - # Tick the state machine to trigger installation - # This calls BinaryMachine.on_install() -> Binary.run() -> on_Binary__* hooks binary.sm.tick() - # Check result binary.refresh_from_db() if binary.status == Binary.StatusChoices.INSTALLED: log_worker_event( @@ -918,8 +948,78 @@ class BinaryWorker(Worker): pid=self.pid, error=e, ) - finally: - self.on_shutdown() + + def _daemon_loop(self): + """Poll and process all pending binaries until idle.""" + import sys + + idle_count = 0 + max_idle_ticks = 10 # Exit after 5 seconds idle (10 ticks * 0.5s) + + try: + while True: + # Get all pending binaries + pending_binaries = list(self.get_next_item()) + + if not pending_binaries: + idle_count += 1 + if idle_count >= max_idle_ticks: + log_worker_event( + worker_type='BinaryWorker', + event='No work for 5 seconds, exiting', + indent_level=1, + pid=self.pid, + ) + break + time.sleep(self.POLL_INTERVAL) + continue + + # Reset idle counter - we have work + idle_count = 0 + + # Process ALL pending binaries + for binary in pending_binaries: + try: + print(f'[cyan]🔧 BinaryWorker processing: {binary.name}[/cyan]', file=sys.stderr) + binary.sm.tick() + + binary.refresh_from_db() + if binary.status == Binary.StatusChoices.INSTALLED: + log_worker_event( + worker_type='BinaryWorker', + event=f'Installed: {binary.name} -> {binary.abspath}', + indent_level=1, + pid=self.pid, + ) + else: + log_worker_event( + worker_type='BinaryWorker', + event=f'Installation pending: {binary.name} (status={binary.status})', + indent_level=1, + pid=self.pid, + ) + + except Exception as e: + log_worker_event( + worker_type='BinaryWorker', + event=f'Failed to install {binary.name}', + indent_level=1, + pid=self.pid, + error=e, + ) + continue + + # Brief sleep before next poll + time.sleep(self.POLL_INTERVAL) + + except Exception as e: + log_worker_event( + worker_type='BinaryWorker', + event='Daemon loop error', + indent_level=1, + pid=self.pid, + error=e, + ) # Populate the registry