diff --git a/archivebox/machine/tests/__init__.py b/archivebox/machine/tests/__init__.py new file mode 100644 index 00000000..d7ce160b --- /dev/null +++ b/archivebox/machine/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the machine module (Machine, NetworkInterface, Binary, Process models).""" diff --git a/archivebox/machine/tests/test_machine_models.py b/archivebox/machine/tests/test_machine_models.py new file mode 100644 index 00000000..427c98d8 --- /dev/null +++ b/archivebox/machine/tests/test_machine_models.py @@ -0,0 +1,563 @@ +""" +Unit tests for machine module models: Machine, NetworkInterface, Binary, Process. + +Tests cover: +1. Machine model creation and current() method +2. NetworkInterface model and network detection +3. Binary model lifecycle and state machine +4. Process model lifecycle, hierarchy, and state machine +5. JSONL serialization/deserialization +6. Manager methods +7. Process tracking methods (replacing pid_utils) +""" + +import os +import sys +from pathlib import Path +from datetime import timedelta +from unittest.mock import patch + +import pytest +from django.test import TestCase +from django.utils import timezone + +from archivebox.machine.models import ( + Machine, + NetworkInterface, + Binary, + Process, + BinaryMachine, + ProcessMachine, + MACHINE_RECHECK_INTERVAL, + PROCESS_RECHECK_INTERVAL, + PID_REUSE_WINDOW, +) + + +class TestMachineModel(TestCase): + """Test the Machine model.""" + + def setUp(self): + """Reset cached machine between tests.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + + def test_machine_current_creates_machine(self): + """Machine.current() should create a machine if none exists.""" + machine = Machine.current() + + self.assertIsNotNone(machine) + self.assertIsNotNone(machine.id) + self.assertIsNotNone(machine.guid) + self.assertEqual(machine.hostname, os.uname().nodename) + self.assertIn(machine.os_family, ['linux', 'darwin', 'windows', 'freebsd']) + + def test_machine_current_returns_cached(self): + """Machine.current() should return cached machine within recheck interval.""" + machine1 = Machine.current() + machine2 = Machine.current() + + self.assertEqual(machine1.id, machine2.id) + + def test_machine_current_refreshes_after_interval(self): + """Machine.current() should refresh after recheck interval.""" + import archivebox.machine.models as models + + machine1 = Machine.current() + + # Manually expire the cache by modifying modified_at + machine1.modified_at = timezone.now() - timedelta(seconds=MACHINE_RECHECK_INTERVAL + 1) + machine1.save() + models._CURRENT_MACHINE = machine1 + + machine2 = Machine.current() + + # Should have fetched/updated the machine (same GUID) + self.assertEqual(machine1.guid, machine2.guid) + + def test_machine_from_jsonl_update(self): + """Machine.from_jsonl() should update machine config.""" + Machine.current() # Ensure machine exists + record = { + '_method': 'update', + 'key': 'WGET_BINARY', + 'value': '/usr/bin/wget', + } + + result = Machine.from_jsonl(record) + + self.assertIsNotNone(result) + self.assertEqual(result.config.get('WGET_BINARY'), '/usr/bin/wget') + + def test_machine_from_jsonl_invalid(self): + """Machine.from_jsonl() should return None for invalid records.""" + result = Machine.from_jsonl({'invalid': 'record'}) + self.assertIsNone(result) + + def test_machine_manager_current(self): + """Machine.objects.current() should return current machine.""" + machine = Machine.objects.current() + self.assertIsNotNone(machine) + self.assertEqual(machine.id, Machine.current().id) + + +class TestNetworkInterfaceModel(TestCase): + """Test the NetworkInterface model.""" + + def setUp(self): + """Reset cached interface between tests.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_INTERFACE = None + + def test_networkinterface_current_creates_interface(self): + """NetworkInterface.current() should create an interface if none exists.""" + interface = NetworkInterface.current() + + self.assertIsNotNone(interface) + self.assertIsNotNone(interface.id) + self.assertIsNotNone(interface.machine) + self.assertIsNotNone(interface.ip_local) + + def test_networkinterface_current_returns_cached(self): + """NetworkInterface.current() should return cached interface within recheck interval.""" + interface1 = NetworkInterface.current() + interface2 = NetworkInterface.current() + + self.assertEqual(interface1.id, interface2.id) + + def test_networkinterface_manager_current(self): + """NetworkInterface.objects.current() should return current interface.""" + interface = NetworkInterface.objects.current() + self.assertIsNotNone(interface) + + +class TestBinaryModel(TestCase): + """Test the Binary model.""" + + def setUp(self): + """Reset cached binaries and create a machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_BINARIES = {} + self.machine = Machine.current() + + def test_binary_creation(self): + """Binary should be created with default values.""" + binary = Binary.objects.create( + machine=self.machine, + name='wget', + binproviders='apt,brew,env', + ) + + self.assertIsNotNone(binary.id) + self.assertEqual(binary.name, 'wget') + self.assertEqual(binary.status, Binary.StatusChoices.QUEUED) + self.assertFalse(binary.is_valid) + + def test_binary_is_valid(self): + """Binary.is_valid should be True when abspath and version are set.""" + binary = Binary.objects.create( + machine=self.machine, + name='wget', + abspath='/usr/bin/wget', + version='1.21', + ) + + self.assertTrue(binary.is_valid) + + def test_binary_manager_get_valid_binary(self): + """BinaryManager.get_valid_binary() should find valid binaries.""" + # Create invalid binary (no abspath) + Binary.objects.create(machine=self.machine, name='wget') + + # Create valid binary + Binary.objects.create( + machine=self.machine, + name='wget', + abspath='/usr/bin/wget', + version='1.21', + ) + + result = Binary.objects.get_valid_binary('wget') + + self.assertIsNotNone(result) + self.assertEqual(result.abspath, '/usr/bin/wget') + + def test_binary_update_and_requeue(self): + """Binary.update_and_requeue() should update fields and save.""" + binary = Binary.objects.create(machine=self.machine, name='test') + old_modified = binary.modified_at + + binary.update_and_requeue( + status=Binary.StatusChoices.STARTED, + retry_at=timezone.now() + timedelta(seconds=60), + ) + + binary.refresh_from_db() + self.assertEqual(binary.status, Binary.StatusChoices.STARTED) + self.assertGreater(binary.modified_at, old_modified) + + +class TestBinaryStateMachine(TestCase): + """Test the BinaryMachine state machine.""" + + def setUp(self): + """Create a machine and binary for state machine tests.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + self.machine = Machine.current() + self.binary = Binary.objects.create( + machine=self.machine, + name='test-binary', + binproviders='env', + ) + + def test_binary_state_machine_initial_state(self): + """BinaryMachine should start in queued state.""" + sm = BinaryMachine(self.binary) + self.assertEqual(sm.current_state.value, Binary.StatusChoices.QUEUED) + + def test_binary_state_machine_can_start(self): + """BinaryMachine.can_start() should check name and binproviders.""" + sm = BinaryMachine(self.binary) + self.assertTrue(sm.can_start()) + + self.binary.binproviders = '' + self.binary.save() + sm = BinaryMachine(self.binary) + self.assertFalse(sm.can_start()) + + +class TestProcessModel(TestCase): + """Test the Process model.""" + + def setUp(self): + """Create a machine for process tests.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + self.machine = Machine.current() + + def test_process_creation(self): + """Process should be created with default values.""" + process = Process.objects.create( + machine=self.machine, + cmd=['echo', 'hello'], + pwd='/tmp', + ) + + self.assertIsNotNone(process.id) + self.assertEqual(process.cmd, ['echo', 'hello']) + self.assertEqual(process.status, Process.StatusChoices.QUEUED) + self.assertIsNone(process.pid) + self.assertIsNone(process.exit_code) + + def test_process_to_jsonl(self): + """Process.to_jsonl() should serialize correctly.""" + process = Process.objects.create( + machine=self.machine, + cmd=['echo', 'hello'], + pwd='/tmp', + timeout=60, + ) + json_data = process.to_jsonl() + + self.assertEqual(json_data['type'], 'Process') + self.assertEqual(json_data['cmd'], ['echo', 'hello']) + self.assertEqual(json_data['pwd'], '/tmp') + self.assertEqual(json_data['timeout'], 60) + + def test_process_update_and_requeue(self): + """Process.update_and_requeue() should update fields and save.""" + process = Process.objects.create(machine=self.machine, cmd=['test']) + old_modified = process.modified_at + + process.update_and_requeue( + status=Process.StatusChoices.RUNNING, + pid=12345, + started_at=timezone.now(), + ) + + process.refresh_from_db() + self.assertEqual(process.status, Process.StatusChoices.RUNNING) + self.assertEqual(process.pid, 12345) + self.assertIsNotNone(process.started_at) + + +class TestProcessCurrent(TestCase): + """Test Process.current() method.""" + + def setUp(self): + """Reset caches.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + + def test_process_current_creates_record(self): + """Process.current() should create a Process for current PID.""" + proc = Process.current() + + self.assertIsNotNone(proc) + self.assertEqual(proc.pid, os.getpid()) + self.assertEqual(proc.status, Process.StatusChoices.RUNNING) + self.assertIsNotNone(proc.machine) + self.assertIsNotNone(proc.started_at) + + def test_process_current_caches(self): + """Process.current() should cache the result.""" + proc1 = Process.current() + proc2 = Process.current() + + self.assertEqual(proc1.id, proc2.id) + + def test_process_detect_type_orchestrator(self): + """_detect_process_type should detect orchestrator.""" + with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR) + + def test_process_detect_type_cli(self): + """_detect_process_type should detect CLI commands.""" + with patch('sys.argv', ['archivebox', 'add', 'http://example.com']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.CLI) + + def test_process_detect_type_worker(self): + """_detect_process_type should detect workers.""" + with patch('sys.argv', ['python', '-m', 'crawl_worker']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.WORKER) + + +class TestProcessHierarchy(TestCase): + """Test Process parent/child relationships.""" + + def setUp(self): + """Create machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + self.machine = Machine.current() + + def test_process_parent_child(self): + """Process should track parent/child relationships.""" + parent = Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.CLI, + status=Process.StatusChoices.RUNNING, + pid=1, + started_at=timezone.now(), + ) + + child = Process.objects.create( + machine=self.machine, + parent=parent, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=2, + started_at=timezone.now(), + ) + + self.assertEqual(child.parent, parent) + self.assertIn(child, parent.children.all()) + + def test_process_root(self): + """Process.root should return the root of the hierarchy.""" + root = Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.CLI, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + child = Process.objects.create( + machine=self.machine, + parent=root, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + grandchild = Process.objects.create( + machine=self.machine, + parent=child, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + + self.assertEqual(grandchild.root, root) + self.assertEqual(child.root, root) + self.assertEqual(root.root, root) + + def test_process_depth(self): + """Process.depth should return depth in tree.""" + root = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + child = Process.objects.create( + machine=self.machine, + parent=root, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + + self.assertEqual(root.depth, 0) + self.assertEqual(child.depth, 1) + + +class TestProcessLifecycle(TestCase): + """Test Process lifecycle methods.""" + + def setUp(self): + """Create machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + self.machine = Machine.current() + + def test_process_is_running_current_pid(self): + """is_running should be True for current PID.""" + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=os.getpid(), + started_at=timezone.now(), + ) + + self.assertTrue(proc.is_running) + + def test_process_is_running_fake_pid(self): + """is_running should be False for non-existent PID.""" + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + self.assertFalse(proc.is_running) + + def test_process_poll_detects_exit(self): + """poll() should detect exited process.""" + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + exit_code = proc.poll() + + self.assertIsNotNone(exit_code) + proc.refresh_from_db() + self.assertEqual(proc.status, Process.StatusChoices.EXITED) + + def test_process_terminate_dead_process(self): + """terminate() should handle already-dead process.""" + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + result = proc.terminate() + + self.assertFalse(result) + proc.refresh_from_db() + self.assertEqual(proc.status, Process.StatusChoices.EXITED) + + +class TestProcessClassMethods(TestCase): + """Test Process class methods for querying.""" + + def setUp(self): + """Create machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + self.machine = Machine.current() + + def test_get_running(self): + """get_running should return running processes.""" + proc = Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + pid=99999, + started_at=timezone.now(), + ) + + running = Process.get_running(process_type=Process.TypeChoices.HOOK) + + self.assertIn(proc, running) + + def test_get_running_count(self): + """get_running_count should count running processes.""" + for i in range(3): + Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + pid=99900 + i, + started_at=timezone.now(), + ) + + count = Process.get_running_count(process_type=Process.TypeChoices.HOOK) + self.assertGreaterEqual(count, 3) + + def test_cleanup_stale_running(self): + """cleanup_stale_running should mark stale processes as exited.""" + stale = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1), + ) + + cleaned = Process.cleanup_stale_running() + + self.assertGreaterEqual(cleaned, 1) + stale.refresh_from_db() + self.assertEqual(stale.status, Process.StatusChoices.EXITED) + + +class TestProcessStateMachine(TestCase): + """Test the ProcessMachine state machine.""" + + def setUp(self): + """Create a machine and process for state machine tests.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + self.machine = Machine.current() + self.process = Process.objects.create( + machine=self.machine, + cmd=['echo', 'test'], + pwd='/tmp', + ) + + def test_process_state_machine_initial_state(self): + """ProcessMachine should start in queued state.""" + sm = ProcessMachine(self.process) + self.assertEqual(sm.current_state.value, Process.StatusChoices.QUEUED) + + def test_process_state_machine_can_start(self): + """ProcessMachine.can_start() should check cmd and machine.""" + sm = ProcessMachine(self.process) + self.assertTrue(sm.can_start()) + + self.process.cmd = [] + self.process.save() + sm = ProcessMachine(self.process) + self.assertFalse(sm.can_start()) + + def test_process_state_machine_is_exited(self): + """ProcessMachine.is_exited() should check exit_code.""" + sm = ProcessMachine(self.process) + self.assertFalse(sm.is_exited()) + + self.process.exit_code = 0 + self.process.save() + sm = ProcessMachine(self.process) + self.assertTrue(sm.is_exited()) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/plugins/apt/tests/__init__.py b/archivebox/plugins/apt/tests/__init__.py new file mode 100644 index 00000000..fdde694e --- /dev/null +++ b/archivebox/plugins/apt/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the apt binary provider plugin.""" diff --git a/archivebox/plugins/apt/tests/test_apt_provider.py b/archivebox/plugins/apt/tests/test_apt_provider.py new file mode 100644 index 00000000..be55e901 --- /dev/null +++ b/archivebox/plugins/apt/tests/test_apt_provider.py @@ -0,0 +1,154 @@ +""" +Tests for the apt binary provider plugin. + +Tests cover: +1. Hook script execution +2. apt package availability detection +3. JSONL output format +""" + +import json +import os +import shutil +import subprocess +import sys +import tempfile +from pathlib import Path + +import pytest +from django.test import TestCase + + +# Get the path to the apt provider hook +PLUGIN_DIR = Path(__file__).parent.parent +INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_apt_provider.py' + + +def apt_available() -> bool: + """Check if apt is installed.""" + return shutil.which('apt') is not None or shutil.which('apt-get') is not None + + +def is_linux() -> bool: + """Check if running on Linux.""" + import platform + return platform.system().lower() == 'linux' + + +class TestAptProviderHook(TestCase): + """Test the apt binary provider installation hook.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + """Clean up.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_hook_script_exists(self): + """Hook script should exist.""" + self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}") + + def test_hook_skips_when_apt_not_allowed(self): + """Hook should skip when apt not in allowed binproviders.""" + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=wget', + '--binary-id=test-uuid', + '--machine-id=test-machine', + '--binproviders=pip,npm', # apt not allowed + ], + capture_output=True, + text=True, + timeout=30 + ) + + # Should exit cleanly (code 0) when apt not allowed + self.assertIn('apt provider not allowed', result.stderr) + self.assertEqual(result.returncode, 0) + + @pytest.mark.skipif(not is_linux(), reason="apt only available on Linux") + @pytest.mark.skipif(not apt_available(), reason="apt not installed") + def test_hook_detects_apt(self): + """Hook should detect apt binary when available.""" + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=nonexistent-pkg-xyz123', + '--binary-id=test-uuid', + '--machine-id=test-machine', + ], + capture_output=True, + text=True, + timeout=30 + ) + + # Should not say apt is not available + self.assertNotIn('apt not available', result.stderr) + + def test_hook_handles_overrides(self): + """Hook should accept overrides JSON.""" + overrides = json.dumps({ + 'apt': {'packages': ['custom-package-name']} + }) + + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=test-pkg', + '--binary-id=test-uuid', + '--machine-id=test-machine', + f'--overrides={overrides}', + ], + capture_output=True, + text=True, + timeout=30 + ) + + # Should not crash parsing overrides + self.assertNotIn('Traceback', result.stderr) + + +@pytest.mark.skipif(not is_linux(), reason="apt only available on Linux") +@pytest.mark.skipif(not apt_available(), reason="apt not installed") +class TestAptProviderSystemBinaries(TestCase): + """Test apt provider with system binaries.""" + + def test_detect_existing_binary(self): + """apt provider should detect already-installed system binaries.""" + # Check for a binary that's almost certainly installed (like 'ls' or 'bash') + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=bash', + '--binary-id=test-uuid', + '--machine-id=test-machine', + ], + capture_output=True, + text=True, + timeout=60 + ) + + # Parse JSONL output + for line in result.stdout.split('\n'): + line = line.strip() + if line.startswith('{'): + try: + record = json.loads(line) + if record.get('type') == 'Binary' and record.get('name') == 'bash': + # Found bash + self.assertTrue(record.get('abspath')) + self.assertTrue(Path(record['abspath']).exists()) + return + except json.JSONDecodeError: + continue + + # apt may not be able to "install" bash (already installed) + # Just verify no crash + self.assertNotIn('Traceback', result.stderr) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/plugins/npm/tests/__init__.py b/archivebox/plugins/npm/tests/__init__.py new file mode 100644 index 00000000..08ccd028 --- /dev/null +++ b/archivebox/plugins/npm/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the npm binary provider plugin.""" diff --git a/archivebox/plugins/npm/tests/test_npm_provider.py b/archivebox/plugins/npm/tests/test_npm_provider.py new file mode 100644 index 00000000..c5099475 --- /dev/null +++ b/archivebox/plugins/npm/tests/test_npm_provider.py @@ -0,0 +1,144 @@ +""" +Tests for the npm binary provider plugin. + +Tests cover: +1. Hook script execution +2. npm package installation +3. PATH and NODE_MODULES_DIR updates +4. JSONL output format +""" + +import json +import os +import shutil +import subprocess +import sys +import tempfile +from pathlib import Path + +import pytest +from django.test import TestCase + + +# Get the path to the npm provider hook +PLUGIN_DIR = Path(__file__).parent.parent +INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_npm_provider.py' + + +def npm_available() -> bool: + """Check if npm is installed.""" + return shutil.which('npm') is not None + + +class TestNpmProviderHook(TestCase): + """Test the npm binary provider installation hook.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = tempfile.mkdtemp() + self.lib_dir = Path(self.temp_dir) / 'lib' / 'x86_64-linux' + self.lib_dir.mkdir(parents=True) + + def tearDown(self): + """Clean up.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_hook_script_exists(self): + """Hook script should exist.""" + self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}") + + def test_hook_requires_lib_dir(self): + """Hook should fail when LIB_DIR is not set.""" + env = os.environ.copy() + env.pop('LIB_DIR', None) # Remove LIB_DIR + + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=some-package', + '--binary-id=test-uuid', + '--machine-id=test-machine', + ], + capture_output=True, + text=True, + env=env, + timeout=30 + ) + + self.assertIn('LIB_DIR environment variable not set', result.stderr) + self.assertEqual(result.returncode, 1) + + def test_hook_skips_when_npm_not_allowed(self): + """Hook should skip when npm not in allowed binproviders.""" + env = os.environ.copy() + env['LIB_DIR'] = str(self.lib_dir) + + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=some-package', + '--binary-id=test-uuid', + '--machine-id=test-machine', + '--binproviders=pip,apt', # npm not allowed + ], + capture_output=True, + text=True, + env=env, + timeout=30 + ) + + # Should exit cleanly (code 0) when npm not allowed + self.assertIn('npm provider not allowed', result.stderr) + self.assertEqual(result.returncode, 0) + + @pytest.mark.skipif(not npm_available(), reason="npm not installed") + def test_hook_creates_npm_prefix(self): + """Hook should create npm prefix directory.""" + env = os.environ.copy() + env['LIB_DIR'] = str(self.lib_dir) + + # Even if installation fails, the npm prefix should be created + subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=nonexistent-xyz123', + '--binary-id=test-uuid', + '--machine-id=test-machine', + ], + capture_output=True, + text=True, + env=env, + timeout=60 + ) + + npm_prefix = self.lib_dir / 'npm' + self.assertTrue(npm_prefix.exists()) + + def test_hook_handles_overrides(self): + """Hook should accept overrides JSON.""" + env = os.environ.copy() + env['LIB_DIR'] = str(self.lib_dir) + + overrides = json.dumps({'npm': {'packages': ['custom-pkg']}}) + + # Just verify it doesn't crash with overrides + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=test-pkg', + '--binary-id=test-uuid', + '--machine-id=test-machine', + f'--overrides={overrides}', + ], + capture_output=True, + text=True, + env=env, + timeout=60 + ) + + # May fail to install, but should not crash parsing overrides + self.assertNotIn('Failed to parse overrides JSON', result.stderr) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/plugins/pip/tests/__init__.py b/archivebox/plugins/pip/tests/__init__.py new file mode 100644 index 00000000..28ac0d82 --- /dev/null +++ b/archivebox/plugins/pip/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the pip binary provider plugin.""" diff --git a/archivebox/plugins/pip/tests/test_pip_provider.py b/archivebox/plugins/pip/tests/test_pip_provider.py new file mode 100644 index 00000000..6e51a87c --- /dev/null +++ b/archivebox/plugins/pip/tests/test_pip_provider.py @@ -0,0 +1,175 @@ +""" +Tests for the pip binary provider plugin. + +Tests cover: +1. Hook script execution +2. pip package detection +3. Virtual environment handling +4. JSONL output format +""" + +import json +import os +import subprocess +import sys +import tempfile +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest +from django.test import TestCase + + +# Get the path to the pip provider hook +PLUGIN_DIR = Path(__file__).parent.parent +INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_pip_provider.py' + + +class TestPipProviderHook(TestCase): + """Test the pip binary provider installation hook.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = tempfile.mkdtemp() + self.output_dir = Path(self.temp_dir) / 'output' + self.output_dir.mkdir() + + def tearDown(self): + """Clean up.""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_hook_script_exists(self): + """Hook script should exist.""" + self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}") + + def test_hook_help(self): + """Hook should accept --help without error.""" + result = subprocess.run( + [sys.executable, str(INSTALL_HOOK), '--help'], + capture_output=True, + text=True, + timeout=30 + ) + # May succeed or fail depending on implementation + # At minimum should not crash with Python error + self.assertNotIn('Traceback', result.stderr) + + def test_hook_finds_python(self): + """Hook should find Python binary.""" + env = os.environ.copy() + env['DATA_DIR'] = self.temp_dir + + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=python3', + '--binproviders=pip,env', + ], + capture_output=True, + text=True, + cwd=str(self.output_dir), + env=env, + timeout=60 + ) + + # Check for JSONL output + jsonl_found = False + for line in result.stdout.split('\n'): + line = line.strip() + if line.startswith('{'): + try: + record = json.loads(line) + if record.get('type') == 'Binary' and record.get('name') == 'python3': + jsonl_found = True + # Verify structure + self.assertIn('abspath', record) + self.assertIn('version', record) + break + except json.JSONDecodeError: + continue + + # May or may not find python3 via pip, but should not crash + self.assertNotIn('Traceback', result.stderr) + + def test_hook_unknown_package(self): + """Hook should handle unknown packages gracefully.""" + env = os.environ.copy() + env['DATA_DIR'] = self.temp_dir + + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=nonexistent_package_xyz123', + '--binproviders=pip', + ], + capture_output=True, + text=True, + cwd=str(self.output_dir), + env=env, + timeout=60 + ) + + # Should not crash + self.assertNotIn('Traceback', result.stderr) + # May have non-zero exit code for missing package + + +class TestPipProviderIntegration(TestCase): + """Integration tests for pip provider with real packages.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = tempfile.mkdtemp() + self.output_dir = Path(self.temp_dir) / 'output' + self.output_dir.mkdir() + + def tearDown(self): + """Clean up.""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + @pytest.mark.skipif( + subprocess.run([sys.executable, '-m', 'pip', '--version'], + capture_output=True).returncode != 0, + reason="pip not available" + ) + def test_hook_finds_pip_installed_binary(self): + """Hook should find binaries installed via pip.""" + env = os.environ.copy() + env['DATA_DIR'] = self.temp_dir + + # Try to find 'pip' itself which should be available + result = subprocess.run( + [ + sys.executable, str(INSTALL_HOOK), + '--name=pip', + '--binproviders=pip,env', + ], + capture_output=True, + text=True, + cwd=str(self.output_dir), + env=env, + timeout=60 + ) + + # Look for success in output + for line in result.stdout.split('\n'): + line = line.strip() + if line.startswith('{'): + try: + record = json.loads(line) + if record.get('type') == 'Binary' and 'pip' in record.get('name', ''): + # Found pip binary + self.assertTrue(record.get('abspath')) + return + except json.JSONDecodeError: + continue + + # If we get here without finding pip, that's acceptable + # as long as the hook didn't crash + self.assertNotIn('Traceback', result.stderr) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/plugins/redirects/tests/__init__.py b/archivebox/plugins/redirects/tests/__init__.py new file mode 100644 index 00000000..6bc72141 --- /dev/null +++ b/archivebox/plugins/redirects/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the redirects plugin.""" diff --git a/archivebox/plugins/redirects/tests/test_redirects.py b/archivebox/plugins/redirects/tests/test_redirects.py new file mode 100644 index 00000000..06d95246 --- /dev/null +++ b/archivebox/plugins/redirects/tests/test_redirects.py @@ -0,0 +1,134 @@ +""" +Tests for the redirects plugin. + +Tests the real redirects hook with actual URLs to verify +redirect chain capture. +""" + +import json +import shutil +import subprocess +import sys +import tempfile +from pathlib import Path + +import pytest +from django.test import TestCase + +# Import chrome test helpers +sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests')) +from chrome_test_helpers import ( + chrome_session, + get_test_env, + get_plugin_dir, + get_hook_script, +) + + +def chrome_available() -> bool: + """Check if Chrome/Chromium is available.""" + for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']: + if shutil.which(name): + return True + return False + + +# Get the path to the redirects hook +PLUGIN_DIR = get_plugin_dir(__file__) +REDIRECTS_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_redirects.*') + + +class TestRedirectsPlugin(TestCase): + """Test the redirects plugin.""" + + def test_redirects_hook_exists(self): + """Redirects hook script should exist.""" + self.assertIsNotNone(REDIRECTS_HOOK, "Redirects hook not found in plugin directory") + self.assertTrue(REDIRECTS_HOOK.exists(), f"Hook not found: {REDIRECTS_HOOK}") + + +@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed") +class TestRedirectsWithChrome(TestCase): + """Integration tests for redirects plugin with Chrome.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = Path(tempfile.mkdtemp()) + + def tearDown(self): + """Clean up.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_redirects_captures_navigation(self): + """Redirects hook should capture URL navigation without errors.""" + # Use a URL that doesn't redirect (simple case) + test_url = 'https://example.com' + snapshot_id = 'test-redirects-snapshot' + + try: + with chrome_session( + self.temp_dir, + crawl_id='test-redirects-crawl', + snapshot_id=snapshot_id, + test_url=test_url, + navigate=True, + timeout=30, + ) as (chrome_process, chrome_pid, snapshot_chrome_dir): + # Get environment and run the redirects hook + env = get_test_env() + env['CHROME_HEADLESS'] = 'true' + + # Run redirects hook with the active Chrome session + result = subprocess.run( + ['node', str(REDIRECTS_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'], + cwd=str(snapshot_chrome_dir), + capture_output=True, + text=True, + timeout=60, + env=env + ) + + # Check for output file + redirects_output = snapshot_chrome_dir / 'redirects.jsonl' + + redirects_data = None + + # Try parsing from file first + if redirects_output.exists(): + with open(redirects_output) as f: + for line in f: + line = line.strip() + if line.startswith('{'): + try: + redirects_data = json.loads(line) + break + except json.JSONDecodeError: + continue + + # Try parsing from stdout if not in file + if not redirects_data: + for line in result.stdout.split('\n'): + line = line.strip() + if line.startswith('{'): + try: + record = json.loads(line) + if 'chain' in record or 'redirects' in record or record.get('type') == 'Redirects': + redirects_data = record + break + except json.JSONDecodeError: + continue + + # Verify hook ran successfully + # example.com typically doesn't redirect, so we just verify no errors + self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}") + self.assertNotIn('Traceback', result.stderr) + self.assertNotIn('Error:', result.stderr) + + except RuntimeError as e: + if 'Chrome' in str(e) or 'CDP' in str(e): + self.skipTest(f"Chrome session setup failed: {e}") + raise + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/plugins/search_backend_ripgrep/tests/test_ripgrep_search.py b/archivebox/plugins/search_backend_ripgrep/tests/test_ripgrep_search.py new file mode 100644 index 00000000..75513d34 --- /dev/null +++ b/archivebox/plugins/search_backend_ripgrep/tests/test_ripgrep_search.py @@ -0,0 +1,308 @@ +""" +Tests for the ripgrep search backend. + +Tests cover: +1. Search with ripgrep binary +2. Snapshot ID extraction from file paths +3. Timeout handling +4. Error handling +5. Environment variable configuration +""" + +import os +import shutil +import subprocess +import tempfile +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest +from django.test import TestCase + +from archivebox.plugins.search_backend_ripgrep.search import ( + search, + flush, + get_env, + get_env_int, + get_env_array, +) + + +class TestEnvHelpers(TestCase): + """Test environment variable helper functions.""" + + def test_get_env_default(self): + """get_env should return default for unset vars.""" + result = get_env('NONEXISTENT_VAR_12345', 'default') + self.assertEqual(result, 'default') + + def test_get_env_set(self): + """get_env should return value for set vars.""" + with patch.dict(os.environ, {'TEST_VAR': 'value'}): + result = get_env('TEST_VAR', 'default') + self.assertEqual(result, 'value') + + def test_get_env_strips_whitespace(self): + """get_env should strip whitespace.""" + with patch.dict(os.environ, {'TEST_VAR': ' value '}): + result = get_env('TEST_VAR', '') + self.assertEqual(result, 'value') + + def test_get_env_int_default(self): + """get_env_int should return default for unset vars.""" + result = get_env_int('NONEXISTENT_VAR_12345', 42) + self.assertEqual(result, 42) + + def test_get_env_int_valid(self): + """get_env_int should parse integer values.""" + with patch.dict(os.environ, {'TEST_INT': '100'}): + result = get_env_int('TEST_INT', 0) + self.assertEqual(result, 100) + + def test_get_env_int_invalid(self): + """get_env_int should return default for invalid integers.""" + with patch.dict(os.environ, {'TEST_INT': 'not a number'}): + result = get_env_int('TEST_INT', 42) + self.assertEqual(result, 42) + + def test_get_env_array_default(self): + """get_env_array should return default for unset vars.""" + result = get_env_array('NONEXISTENT_VAR_12345', ['default']) + self.assertEqual(result, ['default']) + + def test_get_env_array_valid(self): + """get_env_array should parse JSON arrays.""" + with patch.dict(os.environ, {'TEST_ARRAY': '["a", "b", "c"]'}): + result = get_env_array('TEST_ARRAY', []) + self.assertEqual(result, ['a', 'b', 'c']) + + def test_get_env_array_invalid_json(self): + """get_env_array should return default for invalid JSON.""" + with patch.dict(os.environ, {'TEST_ARRAY': 'not json'}): + result = get_env_array('TEST_ARRAY', ['default']) + self.assertEqual(result, ['default']) + + def test_get_env_array_not_array(self): + """get_env_array should return default for non-array JSON.""" + with patch.dict(os.environ, {'TEST_ARRAY': '{"key": "value"}'}): + result = get_env_array('TEST_ARRAY', ['default']) + self.assertEqual(result, ['default']) + + +class TestRipgrepFlush(TestCase): + """Test the flush function.""" + + def test_flush_is_noop(self): + """flush should be a no-op for ripgrep backend.""" + # Should not raise + flush(['snap-001', 'snap-002']) + + +class TestRipgrepSearch(TestCase): + """Test the ripgrep search function.""" + + def setUp(self): + """Create temporary archive directory with test files.""" + self.temp_dir = tempfile.mkdtemp() + self.archive_dir = Path(self.temp_dir) / 'archive' + self.archive_dir.mkdir() + + # Create snapshot directories with searchable content + self._create_snapshot('snap-001', { + 'singlefile/index.html': 'Python programming tutorial', + 'title/title.txt': 'Learn Python Programming', + }) + self._create_snapshot('snap-002', { + 'singlefile/index.html': 'JavaScript guide', + 'title/title.txt': 'JavaScript Basics', + }) + self._create_snapshot('snap-003', { + 'wget/index.html': 'Web archiving best practices', + 'title/title.txt': 'Web Archiving Guide', + }) + + # Patch settings + self.settings_patch = patch( + 'archivebox.plugins.search_backend_ripgrep.search.settings' + ) + self.mock_settings = self.settings_patch.start() + self.mock_settings.ARCHIVE_DIR = str(self.archive_dir) + + def tearDown(self): + """Clean up temporary directory.""" + self.settings_patch.stop() + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_snapshot(self, snapshot_id: str, files: dict): + """Create a snapshot directory with files.""" + snap_dir = self.archive_dir / snapshot_id + for path, content in files.items(): + file_path = snap_dir / path + file_path.parent.mkdir(parents=True, exist_ok=True) + file_path.write_text(content) + + def _has_ripgrep(self) -> bool: + """Check if ripgrep is available.""" + return shutil.which('rg') is not None + + def test_search_no_archive_dir(self): + """search should return empty list when archive dir doesn't exist.""" + self.mock_settings.ARCHIVE_DIR = '/nonexistent/path' + results = search('test') + self.assertEqual(results, []) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_single_match(self): + """search should find matching snapshot.""" + results = search('Python programming') + + self.assertIn('snap-001', results) + self.assertNotIn('snap-002', results) + self.assertNotIn('snap-003', results) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_multiple_matches(self): + """search should find all matching snapshots.""" + # 'guide' appears in snap-002 (JavaScript guide) and snap-003 (Archiving Guide) + results = search('guide') + + self.assertIn('snap-002', results) + self.assertIn('snap-003', results) + self.assertNotIn('snap-001', results) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_case_insensitive_by_default(self): + """search should be case-sensitive (ripgrep default).""" + # By default rg is case-sensitive + results_upper = search('PYTHON') + results_lower = search('python') + + # Depending on ripgrep config, results may differ + self.assertIsInstance(results_upper, list) + self.assertIsInstance(results_lower, list) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_no_results(self): + """search should return empty list for no matches.""" + results = search('xyznonexistent123') + self.assertEqual(results, []) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_regex(self): + """search should support regex patterns.""" + results = search('(Python|JavaScript)') + + self.assertIn('snap-001', results) + self.assertIn('snap-002', results) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_distinct_snapshots(self): + """search should return distinct snapshot IDs.""" + # Query matches both files in snap-001 + results = search('Python') + + # Should only appear once + self.assertEqual(results.count('snap-001'), 1) + + def test_search_missing_binary(self): + """search should raise when ripgrep binary not found.""" + with patch.dict(os.environ, {'RIPGREP_BINARY': '/nonexistent/rg'}): + with patch('shutil.which', return_value=None): + with self.assertRaises(RuntimeError) as context: + search('test') + self.assertIn('ripgrep binary not found', str(context.exception)) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_with_custom_args(self): + """search should use custom RIPGREP_ARGS.""" + with patch.dict(os.environ, {'RIPGREP_ARGS': '["-i"]'}): # Case insensitive + results = search('PYTHON') + # With -i flag, should find regardless of case + self.assertIn('snap-001', results) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_timeout(self): + """search should handle timeout gracefully.""" + with patch.dict(os.environ, {'RIPGREP_TIMEOUT': '1'}): + # Short timeout, should still complete for small archive + results = search('Python') + self.assertIsInstance(results, list) + + +class TestRipgrepSearchIntegration(TestCase): + """Integration tests with realistic archive structure.""" + + def setUp(self): + """Create archive with realistic structure.""" + self.temp_dir = tempfile.mkdtemp() + self.archive_dir = Path(self.temp_dir) / 'archive' + self.archive_dir.mkdir() + + # Realistic snapshot structure + self._create_snapshot('1704067200.123456', { # 2024-01-01 + 'singlefile.html': ''' + +ArchiveBox Documentation + +

Getting Started with ArchiveBox

+

ArchiveBox is a powerful, self-hosted web archiving tool.

+

Install with: pip install archivebox

+ +''', + 'title/title.txt': 'ArchiveBox Documentation', + 'screenshot/screenshot.png': b'PNG IMAGE DATA', # Binary file + }) + self._create_snapshot('1704153600.654321', { # 2024-01-02 + 'wget/index.html': ''' +Python News + +

Python 3.12 Released

+

New features include improved error messages and performance.

+ +''', + 'readability/content.html': '

Python 3.12 has been released with exciting new features.

', + }) + + self.settings_patch = patch( + 'archivebox.plugins.search_backend_ripgrep.search.settings' + ) + self.mock_settings = self.settings_patch.start() + self.mock_settings.ARCHIVE_DIR = str(self.archive_dir) + + def tearDown(self): + """Clean up.""" + self.settings_patch.stop() + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_snapshot(self, timestamp: str, files: dict): + """Create snapshot with timestamp-based ID.""" + snap_dir = self.archive_dir / timestamp + for path, content in files.items(): + file_path = snap_dir / path + file_path.parent.mkdir(parents=True, exist_ok=True) + if isinstance(content, bytes): + file_path.write_bytes(content) + else: + file_path.write_text(content) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_archivebox(self): + """Search for archivebox should find documentation snapshot.""" + results = search('archivebox') + self.assertIn('1704067200.123456', results) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_python(self): + """Search for python should find Python news snapshot.""" + results = search('Python') + self.assertIn('1704153600.654321', results) + + @pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed") + def test_search_pip_install(self): + """Search for installation command.""" + results = search('pip install') + self.assertIn('1704067200.123456', results) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/plugins/search_backend_sqlite/tests/__init__.py b/archivebox/plugins/search_backend_sqlite/tests/__init__.py new file mode 100644 index 00000000..6bef82e4 --- /dev/null +++ b/archivebox/plugins/search_backend_sqlite/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the SQLite FTS5 search backend.""" diff --git a/archivebox/plugins/search_backend_sqlite/tests/test_sqlite_search.py b/archivebox/plugins/search_backend_sqlite/tests/test_sqlite_search.py new file mode 100644 index 00000000..d8d6035f --- /dev/null +++ b/archivebox/plugins/search_backend_sqlite/tests/test_sqlite_search.py @@ -0,0 +1,351 @@ +""" +Tests for the SQLite FTS5 search backend. + +Tests cover: +1. Search index creation +2. Indexing snapshots +3. Search queries with real test data +4. Flush operations +5. Edge cases (empty index, special characters) +""" + +import os +import sqlite3 +import tempfile +from pathlib import Path +from unittest.mock import patch + +import pytest +from django.test import TestCase, override_settings + +from archivebox.plugins.search_backend_sqlite.search import ( + get_db_path, + search, + flush, + SQLITEFTS_DB, + FTS_TOKENIZERS, +) + + +class TestSqliteSearchBackend(TestCase): + """Test SQLite FTS5 search backend.""" + + def setUp(self): + """Create a temporary data directory with search index.""" + self.temp_dir = tempfile.mkdtemp() + self.db_path = Path(self.temp_dir) / SQLITEFTS_DB + + # Patch DATA_DIR + self.settings_patch = patch( + 'archivebox.plugins.search_backend_sqlite.search.settings' + ) + self.mock_settings = self.settings_patch.start() + self.mock_settings.DATA_DIR = self.temp_dir + + # Create FTS5 table + self._create_index() + + def tearDown(self): + """Clean up temporary directory.""" + self.settings_patch.stop() + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_index(self): + """Create the FTS5 search index table.""" + conn = sqlite3.connect(str(self.db_path)) + try: + conn.execute(f''' + CREATE VIRTUAL TABLE IF NOT EXISTS search_index + USING fts5( + snapshot_id, + url, + title, + content, + tokenize = '{FTS_TOKENIZERS}' + ) + ''') + conn.commit() + finally: + conn.close() + + def _index_snapshot(self, snapshot_id: str, url: str, title: str, content: str): + """Add a snapshot to the index.""" + conn = sqlite3.connect(str(self.db_path)) + try: + conn.execute( + 'INSERT INTO search_index (snapshot_id, url, title, content) VALUES (?, ?, ?, ?)', + (snapshot_id, url, title, content) + ) + conn.commit() + finally: + conn.close() + + def test_get_db_path(self): + """get_db_path should return correct path.""" + path = get_db_path() + self.assertEqual(path, Path(self.temp_dir) / SQLITEFTS_DB) + + def test_search_empty_index(self): + """search should return empty list for empty index.""" + results = search('nonexistent') + self.assertEqual(results, []) + + def test_search_no_index_file(self): + """search should return empty list when index file doesn't exist.""" + os.remove(self.db_path) + results = search('test') + self.assertEqual(results, []) + + def test_search_single_result(self): + """search should find matching snapshot.""" + self._index_snapshot( + 'snap-001', + 'https://example.com/page1', + 'Example Page', + 'This is example content about testing.' + ) + + results = search('example') + self.assertEqual(len(results), 1) + self.assertEqual(results[0], 'snap-001') + + def test_search_multiple_results(self): + """search should find all matching snapshots.""" + self._index_snapshot('snap-001', 'https://example.com/1', 'Python Tutorial', 'Learn Python programming') + self._index_snapshot('snap-002', 'https://example.com/2', 'Python Guide', 'Advanced Python concepts') + self._index_snapshot('snap-003', 'https://example.com/3', 'JavaScript Basics', 'Learn JavaScript') + + results = search('Python') + self.assertEqual(len(results), 2) + self.assertIn('snap-001', results) + self.assertIn('snap-002', results) + self.assertNotIn('snap-003', results) + + def test_search_title_match(self): + """search should match against title.""" + self._index_snapshot('snap-001', 'https://example.com', 'Django Web Framework', 'Content here') + + results = search('Django') + self.assertEqual(len(results), 1) + self.assertEqual(results[0], 'snap-001') + + def test_search_url_match(self): + """search should match against URL.""" + self._index_snapshot('snap-001', 'https://archivebox.io/docs', 'Title', 'Content') + + results = search('archivebox') + self.assertEqual(len(results), 1) + + def test_search_content_match(self): + """search should match against content.""" + self._index_snapshot( + 'snap-001', + 'https://example.com', + 'Generic Title', + 'This document contains information about cryptography and security.' + ) + + results = search('cryptography') + self.assertEqual(len(results), 1) + + def test_search_case_insensitive(self): + """search should be case insensitive.""" + self._index_snapshot('snap-001', 'https://example.com', 'Title', 'PYTHON programming') + + results = search('python') + self.assertEqual(len(results), 1) + + def test_search_stemming(self): + """search should use porter stemmer for word stems.""" + self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Programming concepts') + + # 'program' should match 'programming' with porter stemmer + results = search('program') + self.assertEqual(len(results), 1) + + def test_search_multiple_words(self): + """search should match documents with all words.""" + self._index_snapshot('snap-001', 'https://example.com', 'Web Development', 'Learn web development skills') + self._index_snapshot('snap-002', 'https://example.com', 'Web Design', 'Design beautiful websites') + + results = search('web development') + # FTS5 defaults to OR, so both might match + # With porter stemmer, both should match 'web' + self.assertIn('snap-001', results) + + def test_search_phrase(self): + """search should support phrase queries.""" + self._index_snapshot('snap-001', 'https://example.com', 'Title', 'machine learning algorithms') + self._index_snapshot('snap-002', 'https://example.com', 'Title', 'machine algorithms learning') + + # Phrase search with quotes + results = search('"machine learning"') + self.assertEqual(len(results), 1) + self.assertEqual(results[0], 'snap-001') + + def test_search_distinct_results(self): + """search should return distinct snapshot IDs.""" + # Index same snapshot twice (could happen with multiple fields matching) + self._index_snapshot('snap-001', 'https://python.org', 'Python', 'Python programming language') + + results = search('Python') + self.assertEqual(len(results), 1) + + def test_flush_single(self): + """flush should remove snapshot from index.""" + self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Content') + self._index_snapshot('snap-002', 'https://example.com', 'Title', 'Content') + + flush(['snap-001']) + + results = search('Content') + self.assertEqual(len(results), 1) + self.assertEqual(results[0], 'snap-002') + + def test_flush_multiple(self): + """flush should remove multiple snapshots.""" + self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Test') + self._index_snapshot('snap-002', 'https://example.com', 'Title', 'Test') + self._index_snapshot('snap-003', 'https://example.com', 'Title', 'Test') + + flush(['snap-001', 'snap-003']) + + results = search('Test') + self.assertEqual(len(results), 1) + self.assertEqual(results[0], 'snap-002') + + def test_flush_nonexistent(self): + """flush should not raise for nonexistent snapshots.""" + # Should not raise + flush(['nonexistent-snap']) + + def test_flush_no_index(self): + """flush should not raise when index doesn't exist.""" + os.remove(self.db_path) + # Should not raise + flush(['snap-001']) + + def test_search_special_characters(self): + """search should handle special characters in queries.""" + self._index_snapshot('snap-001', 'https://example.com', 'C++ Programming', 'Learn C++ basics') + + # FTS5 handles special chars + results = search('C++') + # May or may not match depending on tokenizer config + # At minimum, should not raise + self.assertIsInstance(results, list) + + def test_search_unicode(self): + """search should handle unicode content.""" + self._index_snapshot('snap-001', 'https://example.com', 'Titre Francais', 'cafe resume') + self._index_snapshot('snap-002', 'https://example.com', 'Japanese', 'Hello world') + + # With remove_diacritics, 'cafe' should match + results = search('cafe') + self.assertEqual(len(results), 1) + + +class TestSqliteSearchWithRealData(TestCase): + """Integration tests with realistic archived content.""" + + def setUp(self): + """Create index with realistic test data.""" + self.temp_dir = tempfile.mkdtemp() + self.db_path = Path(self.temp_dir) / SQLITEFTS_DB + + self.settings_patch = patch( + 'archivebox.plugins.search_backend_sqlite.search.settings' + ) + self.mock_settings = self.settings_patch.start() + self.mock_settings.DATA_DIR = self.temp_dir + + # Create index + conn = sqlite3.connect(str(self.db_path)) + try: + conn.execute(f''' + CREATE VIRTUAL TABLE IF NOT EXISTS search_index + USING fts5( + snapshot_id, + url, + title, + content, + tokenize = '{FTS_TOKENIZERS}' + ) + ''') + # Index realistic data + test_data = [ + ('snap-001', 'https://github.com/ArchiveBox/ArchiveBox', + 'ArchiveBox - Self-hosted web archiving', + 'Open source self-hosted web archiving. Collects, saves, and displays various types of content.'), + ('snap-002', 'https://docs.python.org/3/tutorial/', + 'Python 3 Tutorial', + 'An informal introduction to Python. Python is an easy to learn, powerful programming language.'), + ('snap-003', 'https://developer.mozilla.org/docs/Web/JavaScript', + 'JavaScript - MDN Web Docs', + 'JavaScript (JS) is a lightweight, interpreted programming language with first-class functions.'), + ('snap-004', 'https://news.ycombinator.com', + 'Hacker News', + 'Social news website focusing on computer science and entrepreneurship.'), + ('snap-005', 'https://en.wikipedia.org/wiki/Web_archiving', + 'Web archiving - Wikipedia', + 'Web archiving is the process of collecting portions of the World Wide Web to ensure the information is preserved.'), + ] + conn.executemany( + 'INSERT INTO search_index (snapshot_id, url, title, content) VALUES (?, ?, ?, ?)', + test_data + ) + conn.commit() + finally: + conn.close() + + def tearDown(self): + """Clean up.""" + self.settings_patch.stop() + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_search_archivebox(self): + """Search for 'archivebox' should find relevant results.""" + results = search('archivebox') + self.assertIn('snap-001', results) + + def test_search_programming(self): + """Search for 'programming' should find Python and JS docs.""" + results = search('programming') + self.assertIn('snap-002', results) + self.assertIn('snap-003', results) + + def test_search_web_archiving(self): + """Search for 'web archiving' should find relevant results.""" + results = search('web archiving') + # Both ArchiveBox and Wikipedia should match + self.assertIn('snap-001', results) + self.assertIn('snap-005', results) + + def test_search_github(self): + """Search for 'github' should find URL match.""" + results = search('github') + self.assertIn('snap-001', results) + + def test_search_tutorial(self): + """Search for 'tutorial' should find Python tutorial.""" + results = search('tutorial') + self.assertIn('snap-002', results) + + def test_flush_and_search(self): + """Flushing a snapshot should remove it from search results.""" + # Verify it's there first + results = search('archivebox') + self.assertIn('snap-001', results) + + # Flush it + flush(['snap-001']) + + # Should no longer be found + results = search('archivebox') + self.assertNotIn('snap-001', results) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/plugins/seo/tests/__init__.py b/archivebox/plugins/seo/tests/__init__.py new file mode 100644 index 00000000..f2b12854 --- /dev/null +++ b/archivebox/plugins/seo/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the SEO plugin.""" diff --git a/archivebox/plugins/seo/tests/test_seo.py b/archivebox/plugins/seo/tests/test_seo.py new file mode 100644 index 00000000..fc74ac91 --- /dev/null +++ b/archivebox/plugins/seo/tests/test_seo.py @@ -0,0 +1,135 @@ +""" +Tests for the SEO plugin. + +Tests the real SEO hook with an actual URL to verify +meta tag extraction. +""" + +import json +import shutil +import subprocess +import sys +import tempfile +from pathlib import Path + +import pytest +from django.test import TestCase + +# Import chrome test helpers +sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests')) +from chrome_test_helpers import ( + chrome_session, + get_test_env, + get_plugin_dir, + get_hook_script, +) + + +def chrome_available() -> bool: + """Check if Chrome/Chromium is available.""" + for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']: + if shutil.which(name): + return True + return False + + +# Get the path to the SEO hook +PLUGIN_DIR = get_plugin_dir(__file__) +SEO_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_seo.*') + + +class TestSEOPlugin(TestCase): + """Test the SEO plugin.""" + + def test_seo_hook_exists(self): + """SEO hook script should exist.""" + self.assertIsNotNone(SEO_HOOK, "SEO hook not found in plugin directory") + self.assertTrue(SEO_HOOK.exists(), f"Hook not found: {SEO_HOOK}") + + +@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed") +class TestSEOWithChrome(TestCase): + """Integration tests for SEO plugin with Chrome.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = Path(tempfile.mkdtemp()) + + def tearDown(self): + """Clean up.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_seo_extracts_meta_tags(self): + """SEO hook should extract meta tags from a real URL.""" + test_url = 'https://example.com' + snapshot_id = 'test-seo-snapshot' + + try: + with chrome_session( + self.temp_dir, + crawl_id='test-seo-crawl', + snapshot_id=snapshot_id, + test_url=test_url, + navigate=True, + timeout=30, + ) as (chrome_process, chrome_pid, snapshot_chrome_dir): + # Get environment and run the SEO hook + env = get_test_env() + env['CHROME_HEADLESS'] = 'true' + + # Run SEO hook with the active Chrome session + result = subprocess.run( + ['node', str(SEO_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'], + cwd=str(snapshot_chrome_dir), + capture_output=True, + text=True, + timeout=60, + env=env + ) + + # Check for output file + seo_output = snapshot_chrome_dir / 'seo.json' + + seo_data = None + + # Try parsing from file first + if seo_output.exists(): + with open(seo_output) as f: + try: + seo_data = json.load(f) + except json.JSONDecodeError: + pass + + # Try parsing from stdout if not in file + if not seo_data: + for line in result.stdout.split('\n'): + line = line.strip() + if line.startswith('{'): + try: + record = json.loads(line) + # SEO data typically has title, description, or og: tags + if any(key in record for key in ['title', 'description', 'og:title', 'canonical']): + seo_data = record + break + except json.JSONDecodeError: + continue + + # Verify hook ran successfully + self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}") + self.assertNotIn('Traceback', result.stderr) + self.assertNotIn('Error:', result.stderr) + + # example.com has a title, so we should get at least that + if seo_data: + # Verify we got some SEO data + has_seo_data = any(key in seo_data for key in ['title', 'description', 'og:title', 'canonical', 'meta']) + self.assertTrue(has_seo_data, f"No SEO data extracted: {seo_data}") + + except RuntimeError as e: + if 'Chrome' in str(e) or 'CDP' in str(e): + self.skipTest(f"Chrome session setup failed: {e}") + raise + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/plugins/ssl/tests/__init__.py b/archivebox/plugins/ssl/tests/__init__.py new file mode 100644 index 00000000..48a022d5 --- /dev/null +++ b/archivebox/plugins/ssl/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the SSL plugin.""" diff --git a/archivebox/plugins/ssl/tests/test_ssl.py b/archivebox/plugins/ssl/tests/test_ssl.py new file mode 100644 index 00000000..e2bcbe52 --- /dev/null +++ b/archivebox/plugins/ssl/tests/test_ssl.py @@ -0,0 +1,139 @@ +""" +Tests for the SSL plugin. + +Tests the real SSL hook with an actual HTTPS URL to verify +certificate information extraction. +""" + +import json +import shutil +import subprocess +import sys +import tempfile +from pathlib import Path + +import pytest +from django.test import TestCase + +# Import chrome test helpers +sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests')) +from chrome_test_helpers import ( + chrome_session, + get_test_env, + get_plugin_dir, + get_hook_script, +) + + +def chrome_available() -> bool: + """Check if Chrome/Chromium is available.""" + for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']: + if shutil.which(name): + return True + return False + + +# Get the path to the SSL hook +PLUGIN_DIR = get_plugin_dir(__file__) +SSL_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_ssl.*') + + +class TestSSLPlugin(TestCase): + """Test the SSL plugin with real HTTPS URLs.""" + + def test_ssl_hook_exists(self): + """SSL hook script should exist.""" + self.assertIsNotNone(SSL_HOOK, "SSL hook not found in plugin directory") + self.assertTrue(SSL_HOOK.exists(), f"Hook not found: {SSL_HOOK}") + + +@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed") +class TestSSLWithChrome(TestCase): + """Integration tests for SSL plugin with Chrome.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = Path(tempfile.mkdtemp()) + + def tearDown(self): + """Clean up.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_ssl_extracts_certificate_from_https_url(self): + """SSL hook should extract certificate info from a real HTTPS URL.""" + test_url = 'https://example.com' + snapshot_id = 'test-ssl-snapshot' + + try: + with chrome_session( + self.temp_dir, + crawl_id='test-ssl-crawl', + snapshot_id=snapshot_id, + test_url=test_url, + navigate=True, + timeout=30, + ) as (chrome_process, chrome_pid, snapshot_chrome_dir): + # Get environment and run the SSL hook + env = get_test_env() + env['CHROME_HEADLESS'] = 'true' + + # Run SSL hook with the active Chrome session + result = subprocess.run( + ['node', str(SSL_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'], + cwd=str(snapshot_chrome_dir), + capture_output=True, + text=True, + timeout=60, + env=env + ) + + # Check for output file + ssl_output = snapshot_chrome_dir / 'ssl.jsonl' + + ssl_data = None + + # Try parsing from file first + if ssl_output.exists(): + with open(ssl_output) as f: + for line in f: + line = line.strip() + if line.startswith('{'): + try: + ssl_data = json.loads(line) + break + except json.JSONDecodeError: + continue + + # Try parsing from stdout if not in file + if not ssl_data: + for line in result.stdout.split('\n'): + line = line.strip() + if line.startswith('{'): + try: + record = json.loads(line) + if 'protocol' in record or 'issuer' in record or record.get('type') == 'SSL': + ssl_data = record + break + except json.JSONDecodeError: + continue + + # Verify we got SSL data from HTTPS URL + if ssl_data: + # example.com uses HTTPS, should get certificate info + self.assertIn('protocol', ssl_data, f"SSL data missing protocol: {ssl_data}") + self.assertTrue( + ssl_data['protocol'].startswith('TLS') or ssl_data['protocol'].startswith('SSL'), + f"Unexpected protocol: {ssl_data['protocol']}" + ) + else: + # If no SSL data, at least verify hook ran without crashing + self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}") + + except RuntimeError as e: + if 'Chrome' in str(e) or 'CDP' in str(e): + self.skipTest(f"Chrome session setup failed: {e}") + raise + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) diff --git a/archivebox/workers/tests/__init__.py b/archivebox/workers/tests/__init__.py new file mode 100644 index 00000000..f798b10f --- /dev/null +++ b/archivebox/workers/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the workers module (Orchestrator, Worker, pid_utils).""" diff --git a/archivebox/workers/tests/test_orchestrator.py b/archivebox/workers/tests/test_orchestrator.py new file mode 100644 index 00000000..d54331ec --- /dev/null +++ b/archivebox/workers/tests/test_orchestrator.py @@ -0,0 +1,453 @@ +""" +Unit tests for the Orchestrator and Worker classes. + +Tests cover: +1. Orchestrator lifecycle (startup, shutdown) +2. Queue polling and worker spawning +3. Idle detection and exit logic +4. Worker registration and management +5. Process model methods (replacing old pid_utils) +""" + +import os +import tempfile +import time +from pathlib import Path +from datetime import timedelta +from unittest.mock import patch, MagicMock + +import pytest +from django.test import TestCase +from django.utils import timezone + +from archivebox.workers.orchestrator import Orchestrator + + +class TestOrchestratorUnit(TestCase): + """Unit tests for Orchestrator class (mocked dependencies).""" + + def test_orchestrator_creation(self): + """Orchestrator should initialize with correct defaults.""" + orchestrator = Orchestrator(exit_on_idle=True) + + self.assertTrue(orchestrator.exit_on_idle) + self.assertEqual(orchestrator.idle_count, 0) + self.assertIsNone(orchestrator.pid_file) + + def test_orchestrator_repr(self): + """Orchestrator __repr__ should include PID.""" + orchestrator = Orchestrator() + repr_str = repr(orchestrator) + + self.assertIn('Orchestrator', repr_str) + self.assertIn(str(os.getpid()), repr_str) + + def test_has_pending_work(self): + """has_pending_work should check if any queue has items.""" + orchestrator = Orchestrator() + + self.assertFalse(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 0})) + self.assertTrue(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 5})) + self.assertTrue(orchestrator.has_pending_work({'crawl': 10, 'snapshot': 0})) + + def test_should_exit_not_exit_on_idle(self): + """should_exit should return False when exit_on_idle is False.""" + orchestrator = Orchestrator(exit_on_idle=False) + orchestrator.idle_count = 100 + + self.assertFalse(orchestrator.should_exit({'crawl': 0})) + + def test_should_exit_pending_work(self): + """should_exit should return False when there's pending work.""" + orchestrator = Orchestrator(exit_on_idle=True) + orchestrator.idle_count = 100 + + self.assertFalse(orchestrator.should_exit({'crawl': 5})) + + @patch.object(Orchestrator, 'has_running_workers') + def test_should_exit_running_workers(self, mock_has_workers): + """should_exit should return False when workers are running.""" + mock_has_workers.return_value = True + orchestrator = Orchestrator(exit_on_idle=True) + orchestrator.idle_count = 100 + + self.assertFalse(orchestrator.should_exit({'crawl': 0})) + + @patch.object(Orchestrator, 'has_running_workers') + @patch.object(Orchestrator, 'has_future_work') + def test_should_exit_idle_timeout(self, mock_future, mock_workers): + """should_exit should return True after idle timeout with no work.""" + mock_workers.return_value = False + mock_future.return_value = False + + orchestrator = Orchestrator(exit_on_idle=True) + orchestrator.idle_count = orchestrator.IDLE_TIMEOUT + + self.assertTrue(orchestrator.should_exit({'crawl': 0, 'snapshot': 0})) + + @patch.object(Orchestrator, 'has_running_workers') + @patch.object(Orchestrator, 'has_future_work') + def test_should_exit_below_idle_timeout(self, mock_future, mock_workers): + """should_exit should return False below idle timeout.""" + mock_workers.return_value = False + mock_future.return_value = False + + orchestrator = Orchestrator(exit_on_idle=True) + orchestrator.idle_count = orchestrator.IDLE_TIMEOUT - 1 + + self.assertFalse(orchestrator.should_exit({'crawl': 0})) + + def test_should_spawn_worker_no_queue(self): + """should_spawn_worker should return False when queue is empty.""" + orchestrator = Orchestrator() + + # Create a mock worker class + mock_worker = MagicMock() + mock_worker.get_running_workers.return_value = [] + + self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 0)) + + def test_should_spawn_worker_at_limit(self): + """should_spawn_worker should return False when at per-type limit.""" + orchestrator = Orchestrator() + + mock_worker = MagicMock() + mock_worker.get_running_workers.return_value = [{}] * orchestrator.MAX_WORKERS_PER_TYPE + + self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10)) + + @patch.object(Orchestrator, 'get_total_worker_count') + def test_should_spawn_worker_at_total_limit(self, mock_total): + """should_spawn_worker should return False when at total limit.""" + orchestrator = Orchestrator() + mock_total.return_value = orchestrator.MAX_TOTAL_WORKERS + + mock_worker = MagicMock() + mock_worker.get_running_workers.return_value = [] + + self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10)) + + @patch.object(Orchestrator, 'get_total_worker_count') + def test_should_spawn_worker_success(self, mock_total): + """should_spawn_worker should return True when conditions are met.""" + orchestrator = Orchestrator() + mock_total.return_value = 0 + + mock_worker = MagicMock() + mock_worker.get_running_workers.return_value = [] + mock_worker.MAX_CONCURRENT_TASKS = 5 + + self.assertTrue(orchestrator.should_spawn_worker(mock_worker, 10)) + + @patch.object(Orchestrator, 'get_total_worker_count') + def test_should_spawn_worker_enough_workers(self, mock_total): + """should_spawn_worker should return False when enough workers for queue.""" + orchestrator = Orchestrator() + mock_total.return_value = 2 + + mock_worker = MagicMock() + mock_worker.get_running_workers.return_value = [{}] # 1 worker running + mock_worker.MAX_CONCURRENT_TASKS = 5 # Can handle 5 items + + # Queue size (3) <= running_workers (1) * MAX_CONCURRENT_TASKS (5) + self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3)) + + +class TestOrchestratorWithProcess(TestCase): + """Test Orchestrator using Process model for tracking.""" + + def setUp(self): + """Reset process cache.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + + def test_is_running_no_orchestrator(self): + """is_running should return False when no orchestrator process exists.""" + from archivebox.machine.models import Process + + # Clean up any stale processes first + Process.cleanup_stale_running() + + # Mark any running orchestrators as exited for clean test state + Process.objects.filter( + process_type=Process.TypeChoices.ORCHESTRATOR, + status=Process.StatusChoices.RUNNING + ).update(status=Process.StatusChoices.EXITED) + + self.assertFalse(Orchestrator.is_running()) + + def test_is_running_with_orchestrator_process(self): + """is_running should return True when orchestrator Process exists.""" + from archivebox.machine.models import Process, Machine + + machine = Machine.current() + + # Create an orchestrator Process record + proc = Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.ORCHESTRATOR, + status=Process.StatusChoices.RUNNING, + pid=os.getpid(), # Use current PID so it appears alive + started_at=timezone.now(), + cmd=['archivebox', 'manage', 'orchestrator'], + ) + + try: + # Should detect running orchestrator + self.assertTrue(Orchestrator.is_running()) + finally: + # Clean up + proc.status = Process.StatusChoices.EXITED + proc.save() + + def test_orchestrator_uses_process_for_is_running(self): + """Orchestrator.is_running should use Process.get_running_count.""" + from archivebox.machine.models import Process + + # Verify is_running uses Process model, not pid files + with patch.object(Process, 'get_running_count') as mock_count: + mock_count.return_value = 1 + + result = Orchestrator.is_running() + + # Should have called Process.get_running_count with orchestrator type + mock_count.assert_called() + self.assertTrue(result) + + +class TestProcessBasedWorkerTracking(TestCase): + """Test Process model methods that replace pid_utils functionality.""" + + def setUp(self): + """Reset caches.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + + def test_process_current_creates_record(self): + """Process.current() should create a Process record for current PID.""" + from archivebox.machine.models import Process + + proc = Process.current() + + self.assertIsNotNone(proc) + self.assertEqual(proc.pid, os.getpid()) + self.assertEqual(proc.status, Process.StatusChoices.RUNNING) + self.assertIsNotNone(proc.machine) + self.assertIsNotNone(proc.started_at) + + def test_process_current_caches_result(self): + """Process.current() should return cached Process within interval.""" + from archivebox.machine.models import Process + + proc1 = Process.current() + proc2 = Process.current() + + self.assertEqual(proc1.id, proc2.id) + + def test_process_get_running_count(self): + """Process.get_running_count should count running processes by type.""" + from archivebox.machine.models import Process, Machine + + machine = Machine.current() + + # Create some worker processes + for i in range(3): + Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=99990 + i, # Fake PIDs + started_at=timezone.now(), + ) + + count = Process.get_running_count(process_type=Process.TypeChoices.WORKER) + self.assertGreaterEqual(count, 3) + + def test_process_get_next_worker_id(self): + """Process.get_next_worker_id should return count of running workers.""" + from archivebox.machine.models import Process, Machine + + machine = Machine.current() + + # Create 2 worker processes + for i in range(2): + Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=99980 + i, + started_at=timezone.now(), + ) + + next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER) + self.assertGreaterEqual(next_id, 2) + + def test_process_cleanup_stale_running(self): + """Process.cleanup_stale_running should mark stale processes as exited.""" + from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW + + machine = Machine.current() + + # Create a stale process (old started_at, fake PID) + stale_proc = Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=999999, # Fake PID that doesn't exist + started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1), + ) + + cleaned = Process.cleanup_stale_running() + + self.assertGreaterEqual(cleaned, 1) + + stale_proc.refresh_from_db() + self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED) + + def test_process_get_running(self): + """Process.get_running should return queryset of running processes.""" + from archivebox.machine.models import Process, Machine + + machine = Machine.current() + + # Create a running process + proc = Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + pid=99970, + started_at=timezone.now(), + ) + + running = Process.get_running(process_type=Process.TypeChoices.HOOK) + + self.assertIn(proc, running) + + def test_process_type_detection(self): + """Process._detect_process_type should detect process type from argv.""" + from archivebox.machine.models import Process + + # Test detection logic + with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR) + + with patch('sys.argv', ['archivebox', 'add', 'http://example.com']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.CLI) + + with patch('sys.argv', ['supervisord', '-c', 'config.ini']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.SUPERVISORD) + + +class TestProcessLifecycle(TestCase): + """Test Process model lifecycle methods.""" + + def setUp(self): + """Reset caches and create a machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + self.machine = models.Machine.current() + + def test_process_is_running_property(self): + """Process.is_running should check actual OS process.""" + from archivebox.machine.models import Process + + # Create a process with current PID (should be running) + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=os.getpid(), + started_at=timezone.now(), + ) + + # Should be running (current process exists) + self.assertTrue(proc.is_running) + + # Create a process with fake PID + fake_proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + # Should not be running (PID doesn't exist) + self.assertFalse(fake_proc.is_running) + + def test_process_poll(self): + """Process.poll should check and update exit status.""" + from archivebox.machine.models import Process + + # Create a process with fake PID (already exited) + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + exit_code = proc.poll() + + # Should have detected exit and updated status + self.assertIsNotNone(exit_code) + proc.refresh_from_db() + self.assertEqual(proc.status, Process.StatusChoices.EXITED) + + def test_process_terminate_already_dead(self): + """Process.terminate should handle already-dead processes.""" + from archivebox.machine.models import Process + + # Create a process with fake PID + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + result = proc.terminate() + + # Should return False (was already dead) + self.assertFalse(result) + + proc.refresh_from_db() + self.assertEqual(proc.status, Process.StatusChoices.EXITED) + + def test_process_tree_traversal(self): + """Process parent/children relationships should work.""" + from archivebox.machine.models import Process + + # Create parent process + parent = Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.CLI, + status=Process.StatusChoices.RUNNING, + pid=1, + started_at=timezone.now(), + ) + + # Create child process + child = Process.objects.create( + machine=self.machine, + parent=parent, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=2, + started_at=timezone.now(), + ) + + # Test relationships + self.assertEqual(child.parent, parent) + self.assertIn(child, parent.children.all()) + self.assertEqual(child.root, parent) + self.assertEqual(child.depth, 1) + self.assertEqual(parent.depth, 0) + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])