Add comprehensive tests for machine/process models, orchestrator, and search backends (#1745)

This commit is contained in:
Nick Sweeting
2025-12-31 10:21:12 -08:00
committed by GitHub
19 changed files with 2565 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Tests for the machine module (Machine, NetworkInterface, Binary, Process models)."""

View File

@@ -0,0 +1,563 @@
"""
Unit tests for machine module models: Machine, NetworkInterface, Binary, Process.
Tests cover:
1. Machine model creation and current() method
2. NetworkInterface model and network detection
3. Binary model lifecycle and state machine
4. Process model lifecycle, hierarchy, and state machine
5. JSONL serialization/deserialization
6. Manager methods
7. Process tracking methods (replacing pid_utils)
"""
import os
import sys
from pathlib import Path
from datetime import timedelta
from unittest.mock import patch
import pytest
from django.test import TestCase
from django.utils import timezone
from archivebox.machine.models import (
Machine,
NetworkInterface,
Binary,
Process,
BinaryMachine,
ProcessMachine,
MACHINE_RECHECK_INTERVAL,
PROCESS_RECHECK_INTERVAL,
PID_REUSE_WINDOW,
)
class TestMachineModel(TestCase):
"""Test the Machine model."""
def setUp(self):
"""Reset cached machine between tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
def test_machine_current_creates_machine(self):
"""Machine.current() should create a machine if none exists."""
machine = Machine.current()
self.assertIsNotNone(machine)
self.assertIsNotNone(machine.id)
self.assertIsNotNone(machine.guid)
self.assertEqual(machine.hostname, os.uname().nodename)
self.assertIn(machine.os_family, ['linux', 'darwin', 'windows', 'freebsd'])
def test_machine_current_returns_cached(self):
"""Machine.current() should return cached machine within recheck interval."""
machine1 = Machine.current()
machine2 = Machine.current()
self.assertEqual(machine1.id, machine2.id)
def test_machine_current_refreshes_after_interval(self):
"""Machine.current() should refresh after recheck interval."""
import archivebox.machine.models as models
machine1 = Machine.current()
# Manually expire the cache by modifying modified_at
machine1.modified_at = timezone.now() - timedelta(seconds=MACHINE_RECHECK_INTERVAL + 1)
machine1.save()
models._CURRENT_MACHINE = machine1
machine2 = Machine.current()
# Should have fetched/updated the machine (same GUID)
self.assertEqual(machine1.guid, machine2.guid)
def test_machine_from_jsonl_update(self):
"""Machine.from_jsonl() should update machine config."""
Machine.current() # Ensure machine exists
record = {
'_method': 'update',
'key': 'WGET_BINARY',
'value': '/usr/bin/wget',
}
result = Machine.from_jsonl(record)
self.assertIsNotNone(result)
self.assertEqual(result.config.get('WGET_BINARY'), '/usr/bin/wget')
def test_machine_from_jsonl_invalid(self):
"""Machine.from_jsonl() should return None for invalid records."""
result = Machine.from_jsonl({'invalid': 'record'})
self.assertIsNone(result)
def test_machine_manager_current(self):
"""Machine.objects.current() should return current machine."""
machine = Machine.objects.current()
self.assertIsNotNone(machine)
self.assertEqual(machine.id, Machine.current().id)
class TestNetworkInterfaceModel(TestCase):
"""Test the NetworkInterface model."""
def setUp(self):
"""Reset cached interface between tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_INTERFACE = None
def test_networkinterface_current_creates_interface(self):
"""NetworkInterface.current() should create an interface if none exists."""
interface = NetworkInterface.current()
self.assertIsNotNone(interface)
self.assertIsNotNone(interface.id)
self.assertIsNotNone(interface.machine)
self.assertIsNotNone(interface.ip_local)
def test_networkinterface_current_returns_cached(self):
"""NetworkInterface.current() should return cached interface within recheck interval."""
interface1 = NetworkInterface.current()
interface2 = NetworkInterface.current()
self.assertEqual(interface1.id, interface2.id)
def test_networkinterface_manager_current(self):
"""NetworkInterface.objects.current() should return current interface."""
interface = NetworkInterface.objects.current()
self.assertIsNotNone(interface)
class TestBinaryModel(TestCase):
"""Test the Binary model."""
def setUp(self):
"""Reset cached binaries and create a machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_BINARIES = {}
self.machine = Machine.current()
def test_binary_creation(self):
"""Binary should be created with default values."""
binary = Binary.objects.create(
machine=self.machine,
name='wget',
binproviders='apt,brew,env',
)
self.assertIsNotNone(binary.id)
self.assertEqual(binary.name, 'wget')
self.assertEqual(binary.status, Binary.StatusChoices.QUEUED)
self.assertFalse(binary.is_valid)
def test_binary_is_valid(self):
"""Binary.is_valid should be True when abspath and version are set."""
binary = Binary.objects.create(
machine=self.machine,
name='wget',
abspath='/usr/bin/wget',
version='1.21',
)
self.assertTrue(binary.is_valid)
def test_binary_manager_get_valid_binary(self):
"""BinaryManager.get_valid_binary() should find valid binaries."""
# Create invalid binary (no abspath)
Binary.objects.create(machine=self.machine, name='wget')
# Create valid binary
Binary.objects.create(
machine=self.machine,
name='wget',
abspath='/usr/bin/wget',
version='1.21',
)
result = Binary.objects.get_valid_binary('wget')
self.assertIsNotNone(result)
self.assertEqual(result.abspath, '/usr/bin/wget')
def test_binary_update_and_requeue(self):
"""Binary.update_and_requeue() should update fields and save."""
binary = Binary.objects.create(machine=self.machine, name='test')
old_modified = binary.modified_at
binary.update_and_requeue(
status=Binary.StatusChoices.STARTED,
retry_at=timezone.now() + timedelta(seconds=60),
)
binary.refresh_from_db()
self.assertEqual(binary.status, Binary.StatusChoices.STARTED)
self.assertGreater(binary.modified_at, old_modified)
class TestBinaryStateMachine(TestCase):
"""Test the BinaryMachine state machine."""
def setUp(self):
"""Create a machine and binary for state machine tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
self.binary = Binary.objects.create(
machine=self.machine,
name='test-binary',
binproviders='env',
)
def test_binary_state_machine_initial_state(self):
"""BinaryMachine should start in queued state."""
sm = BinaryMachine(self.binary)
self.assertEqual(sm.current_state.value, Binary.StatusChoices.QUEUED)
def test_binary_state_machine_can_start(self):
"""BinaryMachine.can_start() should check name and binproviders."""
sm = BinaryMachine(self.binary)
self.assertTrue(sm.can_start())
self.binary.binproviders = ''
self.binary.save()
sm = BinaryMachine(self.binary)
self.assertFalse(sm.can_start())
class TestProcessModel(TestCase):
"""Test the Process model."""
def setUp(self):
"""Create a machine for process tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
self.machine = Machine.current()
def test_process_creation(self):
"""Process should be created with default values."""
process = Process.objects.create(
machine=self.machine,
cmd=['echo', 'hello'],
pwd='/tmp',
)
self.assertIsNotNone(process.id)
self.assertEqual(process.cmd, ['echo', 'hello'])
self.assertEqual(process.status, Process.StatusChoices.QUEUED)
self.assertIsNone(process.pid)
self.assertIsNone(process.exit_code)
def test_process_to_jsonl(self):
"""Process.to_jsonl() should serialize correctly."""
process = Process.objects.create(
machine=self.machine,
cmd=['echo', 'hello'],
pwd='/tmp',
timeout=60,
)
json_data = process.to_jsonl()
self.assertEqual(json_data['type'], 'Process')
self.assertEqual(json_data['cmd'], ['echo', 'hello'])
self.assertEqual(json_data['pwd'], '/tmp')
self.assertEqual(json_data['timeout'], 60)
def test_process_update_and_requeue(self):
"""Process.update_and_requeue() should update fields and save."""
process = Process.objects.create(machine=self.machine, cmd=['test'])
old_modified = process.modified_at
process.update_and_requeue(
status=Process.StatusChoices.RUNNING,
pid=12345,
started_at=timezone.now(),
)
process.refresh_from_db()
self.assertEqual(process.status, Process.StatusChoices.RUNNING)
self.assertEqual(process.pid, 12345)
self.assertIsNotNone(process.started_at)
class TestProcessCurrent(TestCase):
"""Test Process.current() method."""
def setUp(self):
"""Reset caches."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_process_current_creates_record(self):
"""Process.current() should create a Process for current PID."""
proc = Process.current()
self.assertIsNotNone(proc)
self.assertEqual(proc.pid, os.getpid())
self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
self.assertIsNotNone(proc.machine)
self.assertIsNotNone(proc.started_at)
def test_process_current_caches(self):
"""Process.current() should cache the result."""
proc1 = Process.current()
proc2 = Process.current()
self.assertEqual(proc1.id, proc2.id)
def test_process_detect_type_orchestrator(self):
"""_detect_process_type should detect orchestrator."""
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
def test_process_detect_type_cli(self):
"""_detect_process_type should detect CLI commands."""
with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.CLI)
def test_process_detect_type_worker(self):
"""_detect_process_type should detect workers."""
with patch('sys.argv', ['python', '-m', 'crawl_worker']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.WORKER)
class TestProcessHierarchy(TestCase):
"""Test Process parent/child relationships."""
def setUp(self):
"""Create machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
def test_process_parent_child(self):
"""Process should track parent/child relationships."""
parent = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
pid=1,
started_at=timezone.now(),
)
child = Process.objects.create(
machine=self.machine,
parent=parent,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=2,
started_at=timezone.now(),
)
self.assertEqual(child.parent, parent)
self.assertIn(child, parent.children.all())
def test_process_root(self):
"""Process.root should return the root of the hierarchy."""
root = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
child = Process.objects.create(
machine=self.machine,
parent=root,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
grandchild = Process.objects.create(
machine=self.machine,
parent=child,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
self.assertEqual(grandchild.root, root)
self.assertEqual(child.root, root)
self.assertEqual(root.root, root)
def test_process_depth(self):
"""Process.depth should return depth in tree."""
root = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
child = Process.objects.create(
machine=self.machine,
parent=root,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
self.assertEqual(root.depth, 0)
self.assertEqual(child.depth, 1)
class TestProcessLifecycle(TestCase):
"""Test Process lifecycle methods."""
def setUp(self):
"""Create machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
def test_process_is_running_current_pid(self):
"""is_running should be True for current PID."""
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(),
started_at=timezone.now(),
)
self.assertTrue(proc.is_running)
def test_process_is_running_fake_pid(self):
"""is_running should be False for non-existent PID."""
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
self.assertFalse(proc.is_running)
def test_process_poll_detects_exit(self):
"""poll() should detect exited process."""
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
exit_code = proc.poll()
self.assertIsNotNone(exit_code)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_terminate_dead_process(self):
"""terminate() should handle already-dead process."""
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
result = proc.terminate()
self.assertFalse(result)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
class TestProcessClassMethods(TestCase):
"""Test Process class methods for querying."""
def setUp(self):
"""Create machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
def test_get_running(self):
"""get_running should return running processes."""
proc = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99999,
started_at=timezone.now(),
)
running = Process.get_running(process_type=Process.TypeChoices.HOOK)
self.assertIn(proc, running)
def test_get_running_count(self):
"""get_running_count should count running processes."""
for i in range(3):
Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99900 + i,
started_at=timezone.now(),
)
count = Process.get_running_count(process_type=Process.TypeChoices.HOOK)
self.assertGreaterEqual(count, 3)
def test_cleanup_stale_running(self):
"""cleanup_stale_running should mark stale processes as exited."""
stale = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
)
cleaned = Process.cleanup_stale_running()
self.assertGreaterEqual(cleaned, 1)
stale.refresh_from_db()
self.assertEqual(stale.status, Process.StatusChoices.EXITED)
class TestProcessStateMachine(TestCase):
"""Test the ProcessMachine state machine."""
def setUp(self):
"""Create a machine and process for state machine tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
self.process = Process.objects.create(
machine=self.machine,
cmd=['echo', 'test'],
pwd='/tmp',
)
def test_process_state_machine_initial_state(self):
"""ProcessMachine should start in queued state."""
sm = ProcessMachine(self.process)
self.assertEqual(sm.current_state.value, Process.StatusChoices.QUEUED)
def test_process_state_machine_can_start(self):
"""ProcessMachine.can_start() should check cmd and machine."""
sm = ProcessMachine(self.process)
self.assertTrue(sm.can_start())
self.process.cmd = []
self.process.save()
sm = ProcessMachine(self.process)
self.assertFalse(sm.can_start())
def test_process_state_machine_is_exited(self):
"""ProcessMachine.is_exited() should check exit_code."""
sm = ProcessMachine(self.process)
self.assertFalse(sm.is_exited())
self.process.exit_code = 0
self.process.save()
sm = ProcessMachine(self.process)
self.assertTrue(sm.is_exited())
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the apt binary provider plugin."""

View File

@@ -0,0 +1,154 @@
"""
Tests for the apt binary provider plugin.
Tests cover:
1. Hook script execution
2. apt package availability detection
3. JSONL output format
"""
import json
import os
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Get the path to the apt provider hook
PLUGIN_DIR = Path(__file__).parent.parent
INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_apt_provider.py'
def apt_available() -> bool:
"""Check if apt is installed."""
return shutil.which('apt') is not None or shutil.which('apt-get') is not None
def is_linux() -> bool:
"""Check if running on Linux."""
import platform
return platform.system().lower() == 'linux'
class TestAptProviderHook(TestCase):
"""Test the apt binary provider installation hook."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_hook_script_exists(self):
"""Hook script should exist."""
self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}")
def test_hook_skips_when_apt_not_allowed(self):
"""Hook should skip when apt not in allowed binproviders."""
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=wget',
'--binary-id=test-uuid',
'--machine-id=test-machine',
'--binproviders=pip,npm', # apt not allowed
],
capture_output=True,
text=True,
timeout=30
)
# Should exit cleanly (code 0) when apt not allowed
self.assertIn('apt provider not allowed', result.stderr)
self.assertEqual(result.returncode, 0)
@pytest.mark.skipif(not is_linux(), reason="apt only available on Linux")
@pytest.mark.skipif(not apt_available(), reason="apt not installed")
def test_hook_detects_apt(self):
"""Hook should detect apt binary when available."""
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=nonexistent-pkg-xyz123',
'--binary-id=test-uuid',
'--machine-id=test-machine',
],
capture_output=True,
text=True,
timeout=30
)
# Should not say apt is not available
self.assertNotIn('apt not available', result.stderr)
def test_hook_handles_overrides(self):
"""Hook should accept overrides JSON."""
overrides = json.dumps({
'apt': {'packages': ['custom-package-name']}
})
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=test-pkg',
'--binary-id=test-uuid',
'--machine-id=test-machine',
f'--overrides={overrides}',
],
capture_output=True,
text=True,
timeout=30
)
# Should not crash parsing overrides
self.assertNotIn('Traceback', result.stderr)
@pytest.mark.skipif(not is_linux(), reason="apt only available on Linux")
@pytest.mark.skipif(not apt_available(), reason="apt not installed")
class TestAptProviderSystemBinaries(TestCase):
"""Test apt provider with system binaries."""
def test_detect_existing_binary(self):
"""apt provider should detect already-installed system binaries."""
# Check for a binary that's almost certainly installed (like 'ls' or 'bash')
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=bash',
'--binary-id=test-uuid',
'--machine-id=test-machine',
],
capture_output=True,
text=True,
timeout=60
)
# Parse JSONL output
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if record.get('type') == 'Binary' and record.get('name') == 'bash':
# Found bash
self.assertTrue(record.get('abspath'))
self.assertTrue(Path(record['abspath']).exists())
return
except json.JSONDecodeError:
continue
# apt may not be able to "install" bash (already installed)
# Just verify no crash
self.assertNotIn('Traceback', result.stderr)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the npm binary provider plugin."""

View File

@@ -0,0 +1,144 @@
"""
Tests for the npm binary provider plugin.
Tests cover:
1. Hook script execution
2. npm package installation
3. PATH and NODE_MODULES_DIR updates
4. JSONL output format
"""
import json
import os
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Get the path to the npm provider hook
PLUGIN_DIR = Path(__file__).parent.parent
INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_npm_provider.py'
def npm_available() -> bool:
"""Check if npm is installed."""
return shutil.which('npm') is not None
class TestNpmProviderHook(TestCase):
"""Test the npm binary provider installation hook."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.lib_dir = Path(self.temp_dir) / 'lib' / 'x86_64-linux'
self.lib_dir.mkdir(parents=True)
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_hook_script_exists(self):
"""Hook script should exist."""
self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}")
def test_hook_requires_lib_dir(self):
"""Hook should fail when LIB_DIR is not set."""
env = os.environ.copy()
env.pop('LIB_DIR', None) # Remove LIB_DIR
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=some-package',
'--binary-id=test-uuid',
'--machine-id=test-machine',
],
capture_output=True,
text=True,
env=env,
timeout=30
)
self.assertIn('LIB_DIR environment variable not set', result.stderr)
self.assertEqual(result.returncode, 1)
def test_hook_skips_when_npm_not_allowed(self):
"""Hook should skip when npm not in allowed binproviders."""
env = os.environ.copy()
env['LIB_DIR'] = str(self.lib_dir)
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=some-package',
'--binary-id=test-uuid',
'--machine-id=test-machine',
'--binproviders=pip,apt', # npm not allowed
],
capture_output=True,
text=True,
env=env,
timeout=30
)
# Should exit cleanly (code 0) when npm not allowed
self.assertIn('npm provider not allowed', result.stderr)
self.assertEqual(result.returncode, 0)
@pytest.mark.skipif(not npm_available(), reason="npm not installed")
def test_hook_creates_npm_prefix(self):
"""Hook should create npm prefix directory."""
env = os.environ.copy()
env['LIB_DIR'] = str(self.lib_dir)
# Even if installation fails, the npm prefix should be created
subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=nonexistent-xyz123',
'--binary-id=test-uuid',
'--machine-id=test-machine',
],
capture_output=True,
text=True,
env=env,
timeout=60
)
npm_prefix = self.lib_dir / 'npm'
self.assertTrue(npm_prefix.exists())
def test_hook_handles_overrides(self):
"""Hook should accept overrides JSON."""
env = os.environ.copy()
env['LIB_DIR'] = str(self.lib_dir)
overrides = json.dumps({'npm': {'packages': ['custom-pkg']}})
# Just verify it doesn't crash with overrides
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=test-pkg',
'--binary-id=test-uuid',
'--machine-id=test-machine',
f'--overrides={overrides}',
],
capture_output=True,
text=True,
env=env,
timeout=60
)
# May fail to install, but should not crash parsing overrides
self.assertNotIn('Failed to parse overrides JSON', result.stderr)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the pip binary provider plugin."""

View File

@@ -0,0 +1,175 @@
"""
Tests for the pip binary provider plugin.
Tests cover:
1. Hook script execution
2. pip package detection
3. Virtual environment handling
4. JSONL output format
"""
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from django.test import TestCase
# Get the path to the pip provider hook
PLUGIN_DIR = Path(__file__).parent.parent
INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_pip_provider.py'
class TestPipProviderHook(TestCase):
"""Test the pip binary provider installation hook."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.output_dir = Path(self.temp_dir) / 'output'
self.output_dir.mkdir()
def tearDown(self):
"""Clean up."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_hook_script_exists(self):
"""Hook script should exist."""
self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}")
def test_hook_help(self):
"""Hook should accept --help without error."""
result = subprocess.run(
[sys.executable, str(INSTALL_HOOK), '--help'],
capture_output=True,
text=True,
timeout=30
)
# May succeed or fail depending on implementation
# At minimum should not crash with Python error
self.assertNotIn('Traceback', result.stderr)
def test_hook_finds_python(self):
"""Hook should find Python binary."""
env = os.environ.copy()
env['DATA_DIR'] = self.temp_dir
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=python3',
'--binproviders=pip,env',
],
capture_output=True,
text=True,
cwd=str(self.output_dir),
env=env,
timeout=60
)
# Check for JSONL output
jsonl_found = False
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if record.get('type') == 'Binary' and record.get('name') == 'python3':
jsonl_found = True
# Verify structure
self.assertIn('abspath', record)
self.assertIn('version', record)
break
except json.JSONDecodeError:
continue
# May or may not find python3 via pip, but should not crash
self.assertNotIn('Traceback', result.stderr)
def test_hook_unknown_package(self):
"""Hook should handle unknown packages gracefully."""
env = os.environ.copy()
env['DATA_DIR'] = self.temp_dir
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=nonexistent_package_xyz123',
'--binproviders=pip',
],
capture_output=True,
text=True,
cwd=str(self.output_dir),
env=env,
timeout=60
)
# Should not crash
self.assertNotIn('Traceback', result.stderr)
# May have non-zero exit code for missing package
class TestPipProviderIntegration(TestCase):
"""Integration tests for pip provider with real packages."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.output_dir = Path(self.temp_dir) / 'output'
self.output_dir.mkdir()
def tearDown(self):
"""Clean up."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
@pytest.mark.skipif(
subprocess.run([sys.executable, '-m', 'pip', '--version'],
capture_output=True).returncode != 0,
reason="pip not available"
)
def test_hook_finds_pip_installed_binary(self):
"""Hook should find binaries installed via pip."""
env = os.environ.copy()
env['DATA_DIR'] = self.temp_dir
# Try to find 'pip' itself which should be available
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=pip',
'--binproviders=pip,env',
],
capture_output=True,
text=True,
cwd=str(self.output_dir),
env=env,
timeout=60
)
# Look for success in output
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if record.get('type') == 'Binary' and 'pip' in record.get('name', ''):
# Found pip binary
self.assertTrue(record.get('abspath'))
return
except json.JSONDecodeError:
continue
# If we get here without finding pip, that's acceptable
# as long as the hook didn't crash
self.assertNotIn('Traceback', result.stderr)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the redirects plugin."""

View File

@@ -0,0 +1,134 @@
"""
Tests for the redirects plugin.
Tests the real redirects hook with actual URLs to verify
redirect chain capture.
"""
import json
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Import chrome test helpers
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
from chrome_test_helpers import (
chrome_session,
get_test_env,
get_plugin_dir,
get_hook_script,
)
def chrome_available() -> bool:
"""Check if Chrome/Chromium is available."""
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
if shutil.which(name):
return True
return False
# Get the path to the redirects hook
PLUGIN_DIR = get_plugin_dir(__file__)
REDIRECTS_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_redirects.*')
class TestRedirectsPlugin(TestCase):
"""Test the redirects plugin."""
def test_redirects_hook_exists(self):
"""Redirects hook script should exist."""
self.assertIsNotNone(REDIRECTS_HOOK, "Redirects hook not found in plugin directory")
self.assertTrue(REDIRECTS_HOOK.exists(), f"Hook not found: {REDIRECTS_HOOK}")
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
class TestRedirectsWithChrome(TestCase):
"""Integration tests for redirects plugin with Chrome."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = Path(tempfile.mkdtemp())
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_redirects_captures_navigation(self):
"""Redirects hook should capture URL navigation without errors."""
# Use a URL that doesn't redirect (simple case)
test_url = 'https://example.com'
snapshot_id = 'test-redirects-snapshot'
try:
with chrome_session(
self.temp_dir,
crawl_id='test-redirects-crawl',
snapshot_id=snapshot_id,
test_url=test_url,
navigate=True,
timeout=30,
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
# Get environment and run the redirects hook
env = get_test_env()
env['CHROME_HEADLESS'] = 'true'
# Run redirects hook with the active Chrome session
result = subprocess.run(
['node', str(REDIRECTS_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
cwd=str(snapshot_chrome_dir),
capture_output=True,
text=True,
timeout=60,
env=env
)
# Check for output file
redirects_output = snapshot_chrome_dir / 'redirects.jsonl'
redirects_data = None
# Try parsing from file first
if redirects_output.exists():
with open(redirects_output) as f:
for line in f:
line = line.strip()
if line.startswith('{'):
try:
redirects_data = json.loads(line)
break
except json.JSONDecodeError:
continue
# Try parsing from stdout if not in file
if not redirects_data:
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if 'chain' in record or 'redirects' in record or record.get('type') == 'Redirects':
redirects_data = record
break
except json.JSONDecodeError:
continue
# Verify hook ran successfully
# example.com typically doesn't redirect, so we just verify no errors
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
self.assertNotIn('Traceback', result.stderr)
self.assertNotIn('Error:', result.stderr)
except RuntimeError as e:
if 'Chrome' in str(e) or 'CDP' in str(e):
self.skipTest(f"Chrome session setup failed: {e}")
raise
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,308 @@
"""
Tests for the ripgrep search backend.
Tests cover:
1. Search with ripgrep binary
2. Snapshot ID extraction from file paths
3. Timeout handling
4. Error handling
5. Environment variable configuration
"""
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from django.test import TestCase
from archivebox.plugins.search_backend_ripgrep.search import (
search,
flush,
get_env,
get_env_int,
get_env_array,
)
class TestEnvHelpers(TestCase):
"""Test environment variable helper functions."""
def test_get_env_default(self):
"""get_env should return default for unset vars."""
result = get_env('NONEXISTENT_VAR_12345', 'default')
self.assertEqual(result, 'default')
def test_get_env_set(self):
"""get_env should return value for set vars."""
with patch.dict(os.environ, {'TEST_VAR': 'value'}):
result = get_env('TEST_VAR', 'default')
self.assertEqual(result, 'value')
def test_get_env_strips_whitespace(self):
"""get_env should strip whitespace."""
with patch.dict(os.environ, {'TEST_VAR': ' value '}):
result = get_env('TEST_VAR', '')
self.assertEqual(result, 'value')
def test_get_env_int_default(self):
"""get_env_int should return default for unset vars."""
result = get_env_int('NONEXISTENT_VAR_12345', 42)
self.assertEqual(result, 42)
def test_get_env_int_valid(self):
"""get_env_int should parse integer values."""
with patch.dict(os.environ, {'TEST_INT': '100'}):
result = get_env_int('TEST_INT', 0)
self.assertEqual(result, 100)
def test_get_env_int_invalid(self):
"""get_env_int should return default for invalid integers."""
with patch.dict(os.environ, {'TEST_INT': 'not a number'}):
result = get_env_int('TEST_INT', 42)
self.assertEqual(result, 42)
def test_get_env_array_default(self):
"""get_env_array should return default for unset vars."""
result = get_env_array('NONEXISTENT_VAR_12345', ['default'])
self.assertEqual(result, ['default'])
def test_get_env_array_valid(self):
"""get_env_array should parse JSON arrays."""
with patch.dict(os.environ, {'TEST_ARRAY': '["a", "b", "c"]'}):
result = get_env_array('TEST_ARRAY', [])
self.assertEqual(result, ['a', 'b', 'c'])
def test_get_env_array_invalid_json(self):
"""get_env_array should return default for invalid JSON."""
with patch.dict(os.environ, {'TEST_ARRAY': 'not json'}):
result = get_env_array('TEST_ARRAY', ['default'])
self.assertEqual(result, ['default'])
def test_get_env_array_not_array(self):
"""get_env_array should return default for non-array JSON."""
with patch.dict(os.environ, {'TEST_ARRAY': '{"key": "value"}'}):
result = get_env_array('TEST_ARRAY', ['default'])
self.assertEqual(result, ['default'])
class TestRipgrepFlush(TestCase):
"""Test the flush function."""
def test_flush_is_noop(self):
"""flush should be a no-op for ripgrep backend."""
# Should not raise
flush(['snap-001', 'snap-002'])
class TestRipgrepSearch(TestCase):
"""Test the ripgrep search function."""
def setUp(self):
"""Create temporary archive directory with test files."""
self.temp_dir = tempfile.mkdtemp()
self.archive_dir = Path(self.temp_dir) / 'archive'
self.archive_dir.mkdir()
# Create snapshot directories with searchable content
self._create_snapshot('snap-001', {
'singlefile/index.html': '<html><body>Python programming tutorial</body></html>',
'title/title.txt': 'Learn Python Programming',
})
self._create_snapshot('snap-002', {
'singlefile/index.html': '<html><body>JavaScript guide</body></html>',
'title/title.txt': 'JavaScript Basics',
})
self._create_snapshot('snap-003', {
'wget/index.html': '<html><body>Web archiving best practices</body></html>',
'title/title.txt': 'Web Archiving Guide',
})
# Patch settings
self.settings_patch = patch(
'archivebox.plugins.search_backend_ripgrep.search.settings'
)
self.mock_settings = self.settings_patch.start()
self.mock_settings.ARCHIVE_DIR = str(self.archive_dir)
def tearDown(self):
"""Clean up temporary directory."""
self.settings_patch.stop()
shutil.rmtree(self.temp_dir, ignore_errors=True)
def _create_snapshot(self, snapshot_id: str, files: dict):
"""Create a snapshot directory with files."""
snap_dir = self.archive_dir / snapshot_id
for path, content in files.items():
file_path = snap_dir / path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content)
def _has_ripgrep(self) -> bool:
"""Check if ripgrep is available."""
return shutil.which('rg') is not None
def test_search_no_archive_dir(self):
"""search should return empty list when archive dir doesn't exist."""
self.mock_settings.ARCHIVE_DIR = '/nonexistent/path'
results = search('test')
self.assertEqual(results, [])
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_single_match(self):
"""search should find matching snapshot."""
results = search('Python programming')
self.assertIn('snap-001', results)
self.assertNotIn('snap-002', results)
self.assertNotIn('snap-003', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_multiple_matches(self):
"""search should find all matching snapshots."""
# 'guide' appears in snap-002 (JavaScript guide) and snap-003 (Archiving Guide)
results = search('guide')
self.assertIn('snap-002', results)
self.assertIn('snap-003', results)
self.assertNotIn('snap-001', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_case_insensitive_by_default(self):
"""search should be case-sensitive (ripgrep default)."""
# By default rg is case-sensitive
results_upper = search('PYTHON')
results_lower = search('python')
# Depending on ripgrep config, results may differ
self.assertIsInstance(results_upper, list)
self.assertIsInstance(results_lower, list)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_no_results(self):
"""search should return empty list for no matches."""
results = search('xyznonexistent123')
self.assertEqual(results, [])
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_regex(self):
"""search should support regex patterns."""
results = search('(Python|JavaScript)')
self.assertIn('snap-001', results)
self.assertIn('snap-002', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_distinct_snapshots(self):
"""search should return distinct snapshot IDs."""
# Query matches both files in snap-001
results = search('Python')
# Should only appear once
self.assertEqual(results.count('snap-001'), 1)
def test_search_missing_binary(self):
"""search should raise when ripgrep binary not found."""
with patch.dict(os.environ, {'RIPGREP_BINARY': '/nonexistent/rg'}):
with patch('shutil.which', return_value=None):
with self.assertRaises(RuntimeError) as context:
search('test')
self.assertIn('ripgrep binary not found', str(context.exception))
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_with_custom_args(self):
"""search should use custom RIPGREP_ARGS."""
with patch.dict(os.environ, {'RIPGREP_ARGS': '["-i"]'}): # Case insensitive
results = search('PYTHON')
# With -i flag, should find regardless of case
self.assertIn('snap-001', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_timeout(self):
"""search should handle timeout gracefully."""
with patch.dict(os.environ, {'RIPGREP_TIMEOUT': '1'}):
# Short timeout, should still complete for small archive
results = search('Python')
self.assertIsInstance(results, list)
class TestRipgrepSearchIntegration(TestCase):
"""Integration tests with realistic archive structure."""
def setUp(self):
"""Create archive with realistic structure."""
self.temp_dir = tempfile.mkdtemp()
self.archive_dir = Path(self.temp_dir) / 'archive'
self.archive_dir.mkdir()
# Realistic snapshot structure
self._create_snapshot('1704067200.123456', { # 2024-01-01
'singlefile.html': '''<!DOCTYPE html>
<html>
<head><title>ArchiveBox Documentation</title></head>
<body>
<h1>Getting Started with ArchiveBox</h1>
<p>ArchiveBox is a powerful, self-hosted web archiving tool.</p>
<p>Install with: pip install archivebox</p>
</body>
</html>''',
'title/title.txt': 'ArchiveBox Documentation',
'screenshot/screenshot.png': b'PNG IMAGE DATA', # Binary file
})
self._create_snapshot('1704153600.654321', { # 2024-01-02
'wget/index.html': '''<html>
<head><title>Python News</title></head>
<body>
<h1>Python 3.12 Released</h1>
<p>New features include improved error messages and performance.</p>
</body>
</html>''',
'readability/content.html': '<p>Python 3.12 has been released with exciting new features.</p>',
})
self.settings_patch = patch(
'archivebox.plugins.search_backend_ripgrep.search.settings'
)
self.mock_settings = self.settings_patch.start()
self.mock_settings.ARCHIVE_DIR = str(self.archive_dir)
def tearDown(self):
"""Clean up."""
self.settings_patch.stop()
shutil.rmtree(self.temp_dir, ignore_errors=True)
def _create_snapshot(self, timestamp: str, files: dict):
"""Create snapshot with timestamp-based ID."""
snap_dir = self.archive_dir / timestamp
for path, content in files.items():
file_path = snap_dir / path
file_path.parent.mkdir(parents=True, exist_ok=True)
if isinstance(content, bytes):
file_path.write_bytes(content)
else:
file_path.write_text(content)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_archivebox(self):
"""Search for archivebox should find documentation snapshot."""
results = search('archivebox')
self.assertIn('1704067200.123456', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_python(self):
"""Search for python should find Python news snapshot."""
results = search('Python')
self.assertIn('1704153600.654321', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_pip_install(self):
"""Search for installation command."""
results = search('pip install')
self.assertIn('1704067200.123456', results)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the SQLite FTS5 search backend."""

View File

@@ -0,0 +1,351 @@
"""
Tests for the SQLite FTS5 search backend.
Tests cover:
1. Search index creation
2. Indexing snapshots
3. Search queries with real test data
4. Flush operations
5. Edge cases (empty index, special characters)
"""
import os
import sqlite3
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from django.test import TestCase, override_settings
from archivebox.plugins.search_backend_sqlite.search import (
get_db_path,
search,
flush,
SQLITEFTS_DB,
FTS_TOKENIZERS,
)
class TestSqliteSearchBackend(TestCase):
"""Test SQLite FTS5 search backend."""
def setUp(self):
"""Create a temporary data directory with search index."""
self.temp_dir = tempfile.mkdtemp()
self.db_path = Path(self.temp_dir) / SQLITEFTS_DB
# Patch DATA_DIR
self.settings_patch = patch(
'archivebox.plugins.search_backend_sqlite.search.settings'
)
self.mock_settings = self.settings_patch.start()
self.mock_settings.DATA_DIR = self.temp_dir
# Create FTS5 table
self._create_index()
def tearDown(self):
"""Clean up temporary directory."""
self.settings_patch.stop()
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def _create_index(self):
"""Create the FTS5 search index table."""
conn = sqlite3.connect(str(self.db_path))
try:
conn.execute(f'''
CREATE VIRTUAL TABLE IF NOT EXISTS search_index
USING fts5(
snapshot_id,
url,
title,
content,
tokenize = '{FTS_TOKENIZERS}'
)
''')
conn.commit()
finally:
conn.close()
def _index_snapshot(self, snapshot_id: str, url: str, title: str, content: str):
"""Add a snapshot to the index."""
conn = sqlite3.connect(str(self.db_path))
try:
conn.execute(
'INSERT INTO search_index (snapshot_id, url, title, content) VALUES (?, ?, ?, ?)',
(snapshot_id, url, title, content)
)
conn.commit()
finally:
conn.close()
def test_get_db_path(self):
"""get_db_path should return correct path."""
path = get_db_path()
self.assertEqual(path, Path(self.temp_dir) / SQLITEFTS_DB)
def test_search_empty_index(self):
"""search should return empty list for empty index."""
results = search('nonexistent')
self.assertEqual(results, [])
def test_search_no_index_file(self):
"""search should return empty list when index file doesn't exist."""
os.remove(self.db_path)
results = search('test')
self.assertEqual(results, [])
def test_search_single_result(self):
"""search should find matching snapshot."""
self._index_snapshot(
'snap-001',
'https://example.com/page1',
'Example Page',
'This is example content about testing.'
)
results = search('example')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-001')
def test_search_multiple_results(self):
"""search should find all matching snapshots."""
self._index_snapshot('snap-001', 'https://example.com/1', 'Python Tutorial', 'Learn Python programming')
self._index_snapshot('snap-002', 'https://example.com/2', 'Python Guide', 'Advanced Python concepts')
self._index_snapshot('snap-003', 'https://example.com/3', 'JavaScript Basics', 'Learn JavaScript')
results = search('Python')
self.assertEqual(len(results), 2)
self.assertIn('snap-001', results)
self.assertIn('snap-002', results)
self.assertNotIn('snap-003', results)
def test_search_title_match(self):
"""search should match against title."""
self._index_snapshot('snap-001', 'https://example.com', 'Django Web Framework', 'Content here')
results = search('Django')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-001')
def test_search_url_match(self):
"""search should match against URL."""
self._index_snapshot('snap-001', 'https://archivebox.io/docs', 'Title', 'Content')
results = search('archivebox')
self.assertEqual(len(results), 1)
def test_search_content_match(self):
"""search should match against content."""
self._index_snapshot(
'snap-001',
'https://example.com',
'Generic Title',
'This document contains information about cryptography and security.'
)
results = search('cryptography')
self.assertEqual(len(results), 1)
def test_search_case_insensitive(self):
"""search should be case insensitive."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'PYTHON programming')
results = search('python')
self.assertEqual(len(results), 1)
def test_search_stemming(self):
"""search should use porter stemmer for word stems."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Programming concepts')
# 'program' should match 'programming' with porter stemmer
results = search('program')
self.assertEqual(len(results), 1)
def test_search_multiple_words(self):
"""search should match documents with all words."""
self._index_snapshot('snap-001', 'https://example.com', 'Web Development', 'Learn web development skills')
self._index_snapshot('snap-002', 'https://example.com', 'Web Design', 'Design beautiful websites')
results = search('web development')
# FTS5 defaults to OR, so both might match
# With porter stemmer, both should match 'web'
self.assertIn('snap-001', results)
def test_search_phrase(self):
"""search should support phrase queries."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'machine learning algorithms')
self._index_snapshot('snap-002', 'https://example.com', 'Title', 'machine algorithms learning')
# Phrase search with quotes
results = search('"machine learning"')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-001')
def test_search_distinct_results(self):
"""search should return distinct snapshot IDs."""
# Index same snapshot twice (could happen with multiple fields matching)
self._index_snapshot('snap-001', 'https://python.org', 'Python', 'Python programming language')
results = search('Python')
self.assertEqual(len(results), 1)
def test_flush_single(self):
"""flush should remove snapshot from index."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Content')
self._index_snapshot('snap-002', 'https://example.com', 'Title', 'Content')
flush(['snap-001'])
results = search('Content')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-002')
def test_flush_multiple(self):
"""flush should remove multiple snapshots."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Test')
self._index_snapshot('snap-002', 'https://example.com', 'Title', 'Test')
self._index_snapshot('snap-003', 'https://example.com', 'Title', 'Test')
flush(['snap-001', 'snap-003'])
results = search('Test')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-002')
def test_flush_nonexistent(self):
"""flush should not raise for nonexistent snapshots."""
# Should not raise
flush(['nonexistent-snap'])
def test_flush_no_index(self):
"""flush should not raise when index doesn't exist."""
os.remove(self.db_path)
# Should not raise
flush(['snap-001'])
def test_search_special_characters(self):
"""search should handle special characters in queries."""
self._index_snapshot('snap-001', 'https://example.com', 'C++ Programming', 'Learn C++ basics')
# FTS5 handles special chars
results = search('C++')
# May or may not match depending on tokenizer config
# At minimum, should not raise
self.assertIsInstance(results, list)
def test_search_unicode(self):
"""search should handle unicode content."""
self._index_snapshot('snap-001', 'https://example.com', 'Titre Francais', 'cafe resume')
self._index_snapshot('snap-002', 'https://example.com', 'Japanese', 'Hello world')
# With remove_diacritics, 'cafe' should match
results = search('cafe')
self.assertEqual(len(results), 1)
class TestSqliteSearchWithRealData(TestCase):
"""Integration tests with realistic archived content."""
def setUp(self):
"""Create index with realistic test data."""
self.temp_dir = tempfile.mkdtemp()
self.db_path = Path(self.temp_dir) / SQLITEFTS_DB
self.settings_patch = patch(
'archivebox.plugins.search_backend_sqlite.search.settings'
)
self.mock_settings = self.settings_patch.start()
self.mock_settings.DATA_DIR = self.temp_dir
# Create index
conn = sqlite3.connect(str(self.db_path))
try:
conn.execute(f'''
CREATE VIRTUAL TABLE IF NOT EXISTS search_index
USING fts5(
snapshot_id,
url,
title,
content,
tokenize = '{FTS_TOKENIZERS}'
)
''')
# Index realistic data
test_data = [
('snap-001', 'https://github.com/ArchiveBox/ArchiveBox',
'ArchiveBox - Self-hosted web archiving',
'Open source self-hosted web archiving. Collects, saves, and displays various types of content.'),
('snap-002', 'https://docs.python.org/3/tutorial/',
'Python 3 Tutorial',
'An informal introduction to Python. Python is an easy to learn, powerful programming language.'),
('snap-003', 'https://developer.mozilla.org/docs/Web/JavaScript',
'JavaScript - MDN Web Docs',
'JavaScript (JS) is a lightweight, interpreted programming language with first-class functions.'),
('snap-004', 'https://news.ycombinator.com',
'Hacker News',
'Social news website focusing on computer science and entrepreneurship.'),
('snap-005', 'https://en.wikipedia.org/wiki/Web_archiving',
'Web archiving - Wikipedia',
'Web archiving is the process of collecting portions of the World Wide Web to ensure the information is preserved.'),
]
conn.executemany(
'INSERT INTO search_index (snapshot_id, url, title, content) VALUES (?, ?, ?, ?)',
test_data
)
conn.commit()
finally:
conn.close()
def tearDown(self):
"""Clean up."""
self.settings_patch.stop()
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_search_archivebox(self):
"""Search for 'archivebox' should find relevant results."""
results = search('archivebox')
self.assertIn('snap-001', results)
def test_search_programming(self):
"""Search for 'programming' should find Python and JS docs."""
results = search('programming')
self.assertIn('snap-002', results)
self.assertIn('snap-003', results)
def test_search_web_archiving(self):
"""Search for 'web archiving' should find relevant results."""
results = search('web archiving')
# Both ArchiveBox and Wikipedia should match
self.assertIn('snap-001', results)
self.assertIn('snap-005', results)
def test_search_github(self):
"""Search for 'github' should find URL match."""
results = search('github')
self.assertIn('snap-001', results)
def test_search_tutorial(self):
"""Search for 'tutorial' should find Python tutorial."""
results = search('tutorial')
self.assertIn('snap-002', results)
def test_flush_and_search(self):
"""Flushing a snapshot should remove it from search results."""
# Verify it's there first
results = search('archivebox')
self.assertIn('snap-001', results)
# Flush it
flush(['snap-001'])
# Should no longer be found
results = search('archivebox')
self.assertNotIn('snap-001', results)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the SEO plugin."""

View File

@@ -0,0 +1,135 @@
"""
Tests for the SEO plugin.
Tests the real SEO hook with an actual URL to verify
meta tag extraction.
"""
import json
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Import chrome test helpers
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
from chrome_test_helpers import (
chrome_session,
get_test_env,
get_plugin_dir,
get_hook_script,
)
def chrome_available() -> bool:
"""Check if Chrome/Chromium is available."""
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
if shutil.which(name):
return True
return False
# Get the path to the SEO hook
PLUGIN_DIR = get_plugin_dir(__file__)
SEO_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_seo.*')
class TestSEOPlugin(TestCase):
"""Test the SEO plugin."""
def test_seo_hook_exists(self):
"""SEO hook script should exist."""
self.assertIsNotNone(SEO_HOOK, "SEO hook not found in plugin directory")
self.assertTrue(SEO_HOOK.exists(), f"Hook not found: {SEO_HOOK}")
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
class TestSEOWithChrome(TestCase):
"""Integration tests for SEO plugin with Chrome."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = Path(tempfile.mkdtemp())
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_seo_extracts_meta_tags(self):
"""SEO hook should extract meta tags from a real URL."""
test_url = 'https://example.com'
snapshot_id = 'test-seo-snapshot'
try:
with chrome_session(
self.temp_dir,
crawl_id='test-seo-crawl',
snapshot_id=snapshot_id,
test_url=test_url,
navigate=True,
timeout=30,
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
# Get environment and run the SEO hook
env = get_test_env()
env['CHROME_HEADLESS'] = 'true'
# Run SEO hook with the active Chrome session
result = subprocess.run(
['node', str(SEO_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
cwd=str(snapshot_chrome_dir),
capture_output=True,
text=True,
timeout=60,
env=env
)
# Check for output file
seo_output = snapshot_chrome_dir / 'seo.json'
seo_data = None
# Try parsing from file first
if seo_output.exists():
with open(seo_output) as f:
try:
seo_data = json.load(f)
except json.JSONDecodeError:
pass
# Try parsing from stdout if not in file
if not seo_data:
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
# SEO data typically has title, description, or og: tags
if any(key in record for key in ['title', 'description', 'og:title', 'canonical']):
seo_data = record
break
except json.JSONDecodeError:
continue
# Verify hook ran successfully
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
self.assertNotIn('Traceback', result.stderr)
self.assertNotIn('Error:', result.stderr)
# example.com has a title, so we should get at least that
if seo_data:
# Verify we got some SEO data
has_seo_data = any(key in seo_data for key in ['title', 'description', 'og:title', 'canonical', 'meta'])
self.assertTrue(has_seo_data, f"No SEO data extracted: {seo_data}")
except RuntimeError as e:
if 'Chrome' in str(e) or 'CDP' in str(e):
self.skipTest(f"Chrome session setup failed: {e}")
raise
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the SSL plugin."""

View File

@@ -0,0 +1,139 @@
"""
Tests for the SSL plugin.
Tests the real SSL hook with an actual HTTPS URL to verify
certificate information extraction.
"""
import json
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Import chrome test helpers
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
from chrome_test_helpers import (
chrome_session,
get_test_env,
get_plugin_dir,
get_hook_script,
)
def chrome_available() -> bool:
"""Check if Chrome/Chromium is available."""
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
if shutil.which(name):
return True
return False
# Get the path to the SSL hook
PLUGIN_DIR = get_plugin_dir(__file__)
SSL_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_ssl.*')
class TestSSLPlugin(TestCase):
"""Test the SSL plugin with real HTTPS URLs."""
def test_ssl_hook_exists(self):
"""SSL hook script should exist."""
self.assertIsNotNone(SSL_HOOK, "SSL hook not found in plugin directory")
self.assertTrue(SSL_HOOK.exists(), f"Hook not found: {SSL_HOOK}")
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
class TestSSLWithChrome(TestCase):
"""Integration tests for SSL plugin with Chrome."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = Path(tempfile.mkdtemp())
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_ssl_extracts_certificate_from_https_url(self):
"""SSL hook should extract certificate info from a real HTTPS URL."""
test_url = 'https://example.com'
snapshot_id = 'test-ssl-snapshot'
try:
with chrome_session(
self.temp_dir,
crawl_id='test-ssl-crawl',
snapshot_id=snapshot_id,
test_url=test_url,
navigate=True,
timeout=30,
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
# Get environment and run the SSL hook
env = get_test_env()
env['CHROME_HEADLESS'] = 'true'
# Run SSL hook with the active Chrome session
result = subprocess.run(
['node', str(SSL_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
cwd=str(snapshot_chrome_dir),
capture_output=True,
text=True,
timeout=60,
env=env
)
# Check for output file
ssl_output = snapshot_chrome_dir / 'ssl.jsonl'
ssl_data = None
# Try parsing from file first
if ssl_output.exists():
with open(ssl_output) as f:
for line in f:
line = line.strip()
if line.startswith('{'):
try:
ssl_data = json.loads(line)
break
except json.JSONDecodeError:
continue
# Try parsing from stdout if not in file
if not ssl_data:
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if 'protocol' in record or 'issuer' in record or record.get('type') == 'SSL':
ssl_data = record
break
except json.JSONDecodeError:
continue
# Verify we got SSL data from HTTPS URL
if ssl_data:
# example.com uses HTTPS, should get certificate info
self.assertIn('protocol', ssl_data, f"SSL data missing protocol: {ssl_data}")
self.assertTrue(
ssl_data['protocol'].startswith('TLS') or ssl_data['protocol'].startswith('SSL'),
f"Unexpected protocol: {ssl_data['protocol']}"
)
else:
# If no SSL data, at least verify hook ran without crashing
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
except RuntimeError as e:
if 'Chrome' in str(e) or 'CDP' in str(e):
self.skipTest(f"Chrome session setup failed: {e}")
raise
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the workers module (Orchestrator, Worker, pid_utils)."""

View File

@@ -0,0 +1,453 @@
"""
Unit tests for the Orchestrator and Worker classes.
Tests cover:
1. Orchestrator lifecycle (startup, shutdown)
2. Queue polling and worker spawning
3. Idle detection and exit logic
4. Worker registration and management
5. Process model methods (replacing old pid_utils)
"""
import os
import tempfile
import time
from pathlib import Path
from datetime import timedelta
from unittest.mock import patch, MagicMock
import pytest
from django.test import TestCase
from django.utils import timezone
from archivebox.workers.orchestrator import Orchestrator
class TestOrchestratorUnit(TestCase):
"""Unit tests for Orchestrator class (mocked dependencies)."""
def test_orchestrator_creation(self):
"""Orchestrator should initialize with correct defaults."""
orchestrator = Orchestrator(exit_on_idle=True)
self.assertTrue(orchestrator.exit_on_idle)
self.assertEqual(orchestrator.idle_count, 0)
self.assertIsNone(orchestrator.pid_file)
def test_orchestrator_repr(self):
"""Orchestrator __repr__ should include PID."""
orchestrator = Orchestrator()
repr_str = repr(orchestrator)
self.assertIn('Orchestrator', repr_str)
self.assertIn(str(os.getpid()), repr_str)
def test_has_pending_work(self):
"""has_pending_work should check if any queue has items."""
orchestrator = Orchestrator()
self.assertFalse(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 0}))
self.assertTrue(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 5}))
self.assertTrue(orchestrator.has_pending_work({'crawl': 10, 'snapshot': 0}))
def test_should_exit_not_exit_on_idle(self):
"""should_exit should return False when exit_on_idle is False."""
orchestrator = Orchestrator(exit_on_idle=False)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
def test_should_exit_pending_work(self):
"""should_exit should return False when there's pending work."""
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 5}))
@patch.object(Orchestrator, 'has_running_workers')
def test_should_exit_running_workers(self, mock_has_workers):
"""should_exit should return False when workers are running."""
mock_has_workers.return_value = True
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
@patch.object(Orchestrator, 'has_running_workers')
@patch.object(Orchestrator, 'has_future_work')
def test_should_exit_idle_timeout(self, mock_future, mock_workers):
"""should_exit should return True after idle timeout with no work."""
mock_workers.return_value = False
mock_future.return_value = False
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT
self.assertTrue(orchestrator.should_exit({'crawl': 0, 'snapshot': 0}))
@patch.object(Orchestrator, 'has_running_workers')
@patch.object(Orchestrator, 'has_future_work')
def test_should_exit_below_idle_timeout(self, mock_future, mock_workers):
"""should_exit should return False below idle timeout."""
mock_workers.return_value = False
mock_future.return_value = False
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT - 1
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
def test_should_spawn_worker_no_queue(self):
"""should_spawn_worker should return False when queue is empty."""
orchestrator = Orchestrator()
# Create a mock worker class
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 0))
def test_should_spawn_worker_at_limit(self):
"""should_spawn_worker should return False when at per-type limit."""
orchestrator = Orchestrator()
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = [{}] * orchestrator.MAX_WORKERS_PER_TYPE
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_at_total_limit(self, mock_total):
"""should_spawn_worker should return False when at total limit."""
orchestrator = Orchestrator()
mock_total.return_value = orchestrator.MAX_TOTAL_WORKERS
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_success(self, mock_total):
"""should_spawn_worker should return True when conditions are met."""
orchestrator = Orchestrator()
mock_total.return_value = 0
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
mock_worker.MAX_CONCURRENT_TASKS = 5
self.assertTrue(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_enough_workers(self, mock_total):
"""should_spawn_worker should return False when enough workers for queue."""
orchestrator = Orchestrator()
mock_total.return_value = 2
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = [{}] # 1 worker running
mock_worker.MAX_CONCURRENT_TASKS = 5 # Can handle 5 items
# Queue size (3) <= running_workers (1) * MAX_CONCURRENT_TASKS (5)
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3))
class TestOrchestratorWithProcess(TestCase):
"""Test Orchestrator using Process model for tracking."""
def setUp(self):
"""Reset process cache."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_is_running_no_orchestrator(self):
"""is_running should return False when no orchestrator process exists."""
from archivebox.machine.models import Process
# Clean up any stale processes first
Process.cleanup_stale_running()
# Mark any running orchestrators as exited for clean test state
Process.objects.filter(
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING
).update(status=Process.StatusChoices.EXITED)
self.assertFalse(Orchestrator.is_running())
def test_is_running_with_orchestrator_process(self):
"""is_running should return True when orchestrator Process exists."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create an orchestrator Process record
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(), # Use current PID so it appears alive
started_at=timezone.now(),
cmd=['archivebox', 'manage', 'orchestrator'],
)
try:
# Should detect running orchestrator
self.assertTrue(Orchestrator.is_running())
finally:
# Clean up
proc.status = Process.StatusChoices.EXITED
proc.save()
def test_orchestrator_uses_process_for_is_running(self):
"""Orchestrator.is_running should use Process.get_running_count."""
from archivebox.machine.models import Process
# Verify is_running uses Process model, not pid files
with patch.object(Process, 'get_running_count') as mock_count:
mock_count.return_value = 1
result = Orchestrator.is_running()
# Should have called Process.get_running_count with orchestrator type
mock_count.assert_called()
self.assertTrue(result)
class TestProcessBasedWorkerTracking(TestCase):
"""Test Process model methods that replace pid_utils functionality."""
def setUp(self):
"""Reset caches."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_process_current_creates_record(self):
"""Process.current() should create a Process record for current PID."""
from archivebox.machine.models import Process
proc = Process.current()
self.assertIsNotNone(proc)
self.assertEqual(proc.pid, os.getpid())
self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
self.assertIsNotNone(proc.machine)
self.assertIsNotNone(proc.started_at)
def test_process_current_caches_result(self):
"""Process.current() should return cached Process within interval."""
from archivebox.machine.models import Process
proc1 = Process.current()
proc2 = Process.current()
self.assertEqual(proc1.id, proc2.id)
def test_process_get_running_count(self):
"""Process.get_running_count should count running processes by type."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create some worker processes
for i in range(3):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99990 + i, # Fake PIDs
started_at=timezone.now(),
)
count = Process.get_running_count(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(count, 3)
def test_process_get_next_worker_id(self):
"""Process.get_next_worker_id should return count of running workers."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create 2 worker processes
for i in range(2):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99980 + i,
started_at=timezone.now(),
)
next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(next_id, 2)
def test_process_cleanup_stale_running(self):
"""Process.cleanup_stale_running should mark stale processes as exited."""
from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW
machine = Machine.current()
# Create a stale process (old started_at, fake PID)
stale_proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=999999, # Fake PID that doesn't exist
started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
)
cleaned = Process.cleanup_stale_running()
self.assertGreaterEqual(cleaned, 1)
stale_proc.refresh_from_db()
self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED)
def test_process_get_running(self):
"""Process.get_running should return queryset of running processes."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create a running process
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99970,
started_at=timezone.now(),
)
running = Process.get_running(process_type=Process.TypeChoices.HOOK)
self.assertIn(proc, running)
def test_process_type_detection(self):
"""Process._detect_process_type should detect process type from argv."""
from archivebox.machine.models import Process
# Test detection logic
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.CLI)
with patch('sys.argv', ['supervisord', '-c', 'config.ini']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.SUPERVISORD)
class TestProcessLifecycle(TestCase):
"""Test Process model lifecycle methods."""
def setUp(self):
"""Reset caches and create a machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
self.machine = models.Machine.current()
def test_process_is_running_property(self):
"""Process.is_running should check actual OS process."""
from archivebox.machine.models import Process
# Create a process with current PID (should be running)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(),
started_at=timezone.now(),
)
# Should be running (current process exists)
self.assertTrue(proc.is_running)
# Create a process with fake PID
fake_proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
# Should not be running (PID doesn't exist)
self.assertFalse(fake_proc.is_running)
def test_process_poll(self):
"""Process.poll should check and update exit status."""
from archivebox.machine.models import Process
# Create a process with fake PID (already exited)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
exit_code = proc.poll()
# Should have detected exit and updated status
self.assertIsNotNone(exit_code)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_terminate_already_dead(self):
"""Process.terminate should handle already-dead processes."""
from archivebox.machine.models import Process
# Create a process with fake PID
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
result = proc.terminate()
# Should return False (was already dead)
self.assertFalse(result)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_tree_traversal(self):
"""Process parent/children relationships should work."""
from archivebox.machine.models import Process
# Create parent process
parent = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
pid=1,
started_at=timezone.now(),
)
# Create child process
child = Process.objects.create(
machine=self.machine,
parent=parent,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=2,
started_at=timezone.now(),
)
# Test relationships
self.assertEqual(child.parent, parent)
self.assertIn(child, parent.children.all())
self.assertEqual(child.root, parent)
self.assertEqual(child.depth, 1)
self.assertEqual(parent.depth, 0)
if __name__ == '__main__':
pytest.main([__file__, '-v'])