Merge remote-tracking branch 'origin/dev' into claude/dns-traffic-recorder-plugin-dNbxC

This commit is contained in:
Claude
2025-12-31 18:24:56 +00:00
25 changed files with 2783 additions and 279 deletions

View File

@@ -30,7 +30,6 @@ import tempfile
import unittest
from io import StringIO
from pathlib import Path
from unittest.mock import patch, MagicMock
# Test configuration - disable slow extractors
TEST_CONFIG = {
@@ -152,35 +151,8 @@ class TestJSONLParsing(unittest.TestCase):
self.assertEqual(result['url'], 'file:///path/to/file.txt')
class TestJSONLOutput(unittest.TestCase):
"""Test JSONL output formatting."""
def test_crawl_to_json(self):
"""Crawl model should serialize to JSON correctly."""
from archivebox.misc.jsonl import TYPE_CRAWL
# Create a mock crawl with to_json method configured
mock_crawl = MagicMock()
mock_crawl.to_json.return_value = {
'type': TYPE_CRAWL,
'schema_version': '0.9.0',
'id': 'test-crawl-uuid',
'urls': 'https://example.com',
'status': 'queued',
'max_depth': 0,
'tags_str': 'tag1,tag2',
'label': '',
'created_at': None,
}
result = mock_crawl.to_json()
self.assertEqual(result['type'], TYPE_CRAWL)
self.assertEqual(result['id'], 'test-crawl-uuid')
self.assertEqual(result['urls'], 'https://example.com')
self.assertEqual(result['status'], 'queued')
# Note: Snapshot and ArchiveResult serialization is tested in integration tests
# (TestPipingWorkflowIntegration) using real model instances, not mocks.
# Note: JSONL output serialization is tested in TestPipingWorkflowIntegration
# using real model instances, not mocks.
class TestReadArgsOrStdin(unittest.TestCase):

View File

@@ -0,0 +1 @@
"""Tests for the machine module (Machine, NetworkInterface, Binary, Process models)."""

View File

@@ -0,0 +1,563 @@
"""
Unit tests for machine module models: Machine, NetworkInterface, Binary, Process.
Tests cover:
1. Machine model creation and current() method
2. NetworkInterface model and network detection
3. Binary model lifecycle and state machine
4. Process model lifecycle, hierarchy, and state machine
5. JSONL serialization/deserialization
6. Manager methods
7. Process tracking methods (replacing pid_utils)
"""
import os
import sys
from pathlib import Path
from datetime import timedelta
from unittest.mock import patch
import pytest
from django.test import TestCase
from django.utils import timezone
from archivebox.machine.models import (
Machine,
NetworkInterface,
Binary,
Process,
BinaryMachine,
ProcessMachine,
MACHINE_RECHECK_INTERVAL,
PROCESS_RECHECK_INTERVAL,
PID_REUSE_WINDOW,
)
class TestMachineModel(TestCase):
"""Test the Machine model."""
def setUp(self):
"""Reset cached machine between tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
def test_machine_current_creates_machine(self):
"""Machine.current() should create a machine if none exists."""
machine = Machine.current()
self.assertIsNotNone(machine)
self.assertIsNotNone(machine.id)
self.assertIsNotNone(machine.guid)
self.assertEqual(machine.hostname, os.uname().nodename)
self.assertIn(machine.os_family, ['linux', 'darwin', 'windows', 'freebsd'])
def test_machine_current_returns_cached(self):
"""Machine.current() should return cached machine within recheck interval."""
machine1 = Machine.current()
machine2 = Machine.current()
self.assertEqual(machine1.id, machine2.id)
def test_machine_current_refreshes_after_interval(self):
"""Machine.current() should refresh after recheck interval."""
import archivebox.machine.models as models
machine1 = Machine.current()
# Manually expire the cache by modifying modified_at
machine1.modified_at = timezone.now() - timedelta(seconds=MACHINE_RECHECK_INTERVAL + 1)
machine1.save()
models._CURRENT_MACHINE = machine1
machine2 = Machine.current()
# Should have fetched/updated the machine (same GUID)
self.assertEqual(machine1.guid, machine2.guid)
def test_machine_from_jsonl_update(self):
"""Machine.from_jsonl() should update machine config."""
Machine.current() # Ensure machine exists
record = {
'_method': 'update',
'key': 'WGET_BINARY',
'value': '/usr/bin/wget',
}
result = Machine.from_jsonl(record)
self.assertIsNotNone(result)
self.assertEqual(result.config.get('WGET_BINARY'), '/usr/bin/wget')
def test_machine_from_jsonl_invalid(self):
"""Machine.from_jsonl() should return None for invalid records."""
result = Machine.from_jsonl({'invalid': 'record'})
self.assertIsNone(result)
def test_machine_manager_current(self):
"""Machine.objects.current() should return current machine."""
machine = Machine.objects.current()
self.assertIsNotNone(machine)
self.assertEqual(machine.id, Machine.current().id)
class TestNetworkInterfaceModel(TestCase):
"""Test the NetworkInterface model."""
def setUp(self):
"""Reset cached interface between tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_INTERFACE = None
def test_networkinterface_current_creates_interface(self):
"""NetworkInterface.current() should create an interface if none exists."""
interface = NetworkInterface.current()
self.assertIsNotNone(interface)
self.assertIsNotNone(interface.id)
self.assertIsNotNone(interface.machine)
self.assertIsNotNone(interface.ip_local)
def test_networkinterface_current_returns_cached(self):
"""NetworkInterface.current() should return cached interface within recheck interval."""
interface1 = NetworkInterface.current()
interface2 = NetworkInterface.current()
self.assertEqual(interface1.id, interface2.id)
def test_networkinterface_manager_current(self):
"""NetworkInterface.objects.current() should return current interface."""
interface = NetworkInterface.objects.current()
self.assertIsNotNone(interface)
class TestBinaryModel(TestCase):
"""Test the Binary model."""
def setUp(self):
"""Reset cached binaries and create a machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_BINARIES = {}
self.machine = Machine.current()
def test_binary_creation(self):
"""Binary should be created with default values."""
binary = Binary.objects.create(
machine=self.machine,
name='wget',
binproviders='apt,brew,env',
)
self.assertIsNotNone(binary.id)
self.assertEqual(binary.name, 'wget')
self.assertEqual(binary.status, Binary.StatusChoices.QUEUED)
self.assertFalse(binary.is_valid)
def test_binary_is_valid(self):
"""Binary.is_valid should be True when abspath and version are set."""
binary = Binary.objects.create(
machine=self.machine,
name='wget',
abspath='/usr/bin/wget',
version='1.21',
)
self.assertTrue(binary.is_valid)
def test_binary_manager_get_valid_binary(self):
"""BinaryManager.get_valid_binary() should find valid binaries."""
# Create invalid binary (no abspath)
Binary.objects.create(machine=self.machine, name='wget')
# Create valid binary
Binary.objects.create(
machine=self.machine,
name='wget',
abspath='/usr/bin/wget',
version='1.21',
)
result = Binary.objects.get_valid_binary('wget')
self.assertIsNotNone(result)
self.assertEqual(result.abspath, '/usr/bin/wget')
def test_binary_update_and_requeue(self):
"""Binary.update_and_requeue() should update fields and save."""
binary = Binary.objects.create(machine=self.machine, name='test')
old_modified = binary.modified_at
binary.update_and_requeue(
status=Binary.StatusChoices.STARTED,
retry_at=timezone.now() + timedelta(seconds=60),
)
binary.refresh_from_db()
self.assertEqual(binary.status, Binary.StatusChoices.STARTED)
self.assertGreater(binary.modified_at, old_modified)
class TestBinaryStateMachine(TestCase):
"""Test the BinaryMachine state machine."""
def setUp(self):
"""Create a machine and binary for state machine tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
self.binary = Binary.objects.create(
machine=self.machine,
name='test-binary',
binproviders='env',
)
def test_binary_state_machine_initial_state(self):
"""BinaryMachine should start in queued state."""
sm = BinaryMachine(self.binary)
self.assertEqual(sm.current_state.value, Binary.StatusChoices.QUEUED)
def test_binary_state_machine_can_start(self):
"""BinaryMachine.can_start() should check name and binproviders."""
sm = BinaryMachine(self.binary)
self.assertTrue(sm.can_start())
self.binary.binproviders = ''
self.binary.save()
sm = BinaryMachine(self.binary)
self.assertFalse(sm.can_start())
class TestProcessModel(TestCase):
"""Test the Process model."""
def setUp(self):
"""Create a machine for process tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
self.machine = Machine.current()
def test_process_creation(self):
"""Process should be created with default values."""
process = Process.objects.create(
machine=self.machine,
cmd=['echo', 'hello'],
pwd='/tmp',
)
self.assertIsNotNone(process.id)
self.assertEqual(process.cmd, ['echo', 'hello'])
self.assertEqual(process.status, Process.StatusChoices.QUEUED)
self.assertIsNone(process.pid)
self.assertIsNone(process.exit_code)
def test_process_to_jsonl(self):
"""Process.to_jsonl() should serialize correctly."""
process = Process.objects.create(
machine=self.machine,
cmd=['echo', 'hello'],
pwd='/tmp',
timeout=60,
)
json_data = process.to_jsonl()
self.assertEqual(json_data['type'], 'Process')
self.assertEqual(json_data['cmd'], ['echo', 'hello'])
self.assertEqual(json_data['pwd'], '/tmp')
self.assertEqual(json_data['timeout'], 60)
def test_process_update_and_requeue(self):
"""Process.update_and_requeue() should update fields and save."""
process = Process.objects.create(machine=self.machine, cmd=['test'])
old_modified = process.modified_at
process.update_and_requeue(
status=Process.StatusChoices.RUNNING,
pid=12345,
started_at=timezone.now(),
)
process.refresh_from_db()
self.assertEqual(process.status, Process.StatusChoices.RUNNING)
self.assertEqual(process.pid, 12345)
self.assertIsNotNone(process.started_at)
class TestProcessCurrent(TestCase):
"""Test Process.current() method."""
def setUp(self):
"""Reset caches."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_process_current_creates_record(self):
"""Process.current() should create a Process for current PID."""
proc = Process.current()
self.assertIsNotNone(proc)
self.assertEqual(proc.pid, os.getpid())
self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
self.assertIsNotNone(proc.machine)
self.assertIsNotNone(proc.started_at)
def test_process_current_caches(self):
"""Process.current() should cache the result."""
proc1 = Process.current()
proc2 = Process.current()
self.assertEqual(proc1.id, proc2.id)
def test_process_detect_type_orchestrator(self):
"""_detect_process_type should detect orchestrator."""
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
def test_process_detect_type_cli(self):
"""_detect_process_type should detect CLI commands."""
with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.CLI)
def test_process_detect_type_worker(self):
"""_detect_process_type should detect workers."""
with patch('sys.argv', ['python', '-m', 'crawl_worker']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.WORKER)
class TestProcessHierarchy(TestCase):
"""Test Process parent/child relationships."""
def setUp(self):
"""Create machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
def test_process_parent_child(self):
"""Process should track parent/child relationships."""
parent = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
pid=1,
started_at=timezone.now(),
)
child = Process.objects.create(
machine=self.machine,
parent=parent,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=2,
started_at=timezone.now(),
)
self.assertEqual(child.parent, parent)
self.assertIn(child, parent.children.all())
def test_process_root(self):
"""Process.root should return the root of the hierarchy."""
root = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
child = Process.objects.create(
machine=self.machine,
parent=root,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
grandchild = Process.objects.create(
machine=self.machine,
parent=child,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
self.assertEqual(grandchild.root, root)
self.assertEqual(child.root, root)
self.assertEqual(root.root, root)
def test_process_depth(self):
"""Process.depth should return depth in tree."""
root = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
child = Process.objects.create(
machine=self.machine,
parent=root,
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
self.assertEqual(root.depth, 0)
self.assertEqual(child.depth, 1)
class TestProcessLifecycle(TestCase):
"""Test Process lifecycle methods."""
def setUp(self):
"""Create machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
def test_process_is_running_current_pid(self):
"""is_running should be True for current PID."""
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(),
started_at=timezone.now(),
)
self.assertTrue(proc.is_running)
def test_process_is_running_fake_pid(self):
"""is_running should be False for non-existent PID."""
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
self.assertFalse(proc.is_running)
def test_process_poll_detects_exit(self):
"""poll() should detect exited process."""
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
exit_code = proc.poll()
self.assertIsNotNone(exit_code)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_terminate_dead_process(self):
"""terminate() should handle already-dead process."""
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
result = proc.terminate()
self.assertFalse(result)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
class TestProcessClassMethods(TestCase):
"""Test Process class methods for querying."""
def setUp(self):
"""Create machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
def test_get_running(self):
"""get_running should return running processes."""
proc = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99999,
started_at=timezone.now(),
)
running = Process.get_running(process_type=Process.TypeChoices.HOOK)
self.assertIn(proc, running)
def test_get_running_count(self):
"""get_running_count should count running processes."""
for i in range(3):
Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99900 + i,
started_at=timezone.now(),
)
count = Process.get_running_count(process_type=Process.TypeChoices.HOOK)
self.assertGreaterEqual(count, 3)
def test_cleanup_stale_running(self):
"""cleanup_stale_running should mark stale processes as exited."""
stale = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
)
cleaned = Process.cleanup_stale_running()
self.assertGreaterEqual(cleaned, 1)
stale.refresh_from_db()
self.assertEqual(stale.status, Process.StatusChoices.EXITED)
class TestProcessStateMachine(TestCase):
"""Test the ProcessMachine state machine."""
def setUp(self):
"""Create a machine and process for state machine tests."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
self.machine = Machine.current()
self.process = Process.objects.create(
machine=self.machine,
cmd=['echo', 'test'],
pwd='/tmp',
)
def test_process_state_machine_initial_state(self):
"""ProcessMachine should start in queued state."""
sm = ProcessMachine(self.process)
self.assertEqual(sm.current_state.value, Process.StatusChoices.QUEUED)
def test_process_state_machine_can_start(self):
"""ProcessMachine.can_start() should check cmd and machine."""
sm = ProcessMachine(self.process)
self.assertTrue(sm.can_start())
self.process.cmd = []
self.process.save()
sm = ProcessMachine(self.process)
self.assertFalse(sm.can_start())
def test_process_state_machine_is_exited(self):
"""ProcessMachine.is_exited() should check exit_code."""
sm = ProcessMachine(self.process)
self.assertFalse(sm.is_exited())
self.process.exit_code = 0
self.process.save()
sm = ProcessMachine(self.process)
self.assertTrue(sm.is_exited())
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the apt binary provider plugin."""

View File

@@ -0,0 +1,154 @@
"""
Tests for the apt binary provider plugin.
Tests cover:
1. Hook script execution
2. apt package availability detection
3. JSONL output format
"""
import json
import os
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Get the path to the apt provider hook
PLUGIN_DIR = Path(__file__).parent.parent
INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_apt_provider.py'
def apt_available() -> bool:
"""Check if apt is installed."""
return shutil.which('apt') is not None or shutil.which('apt-get') is not None
def is_linux() -> bool:
"""Check if running on Linux."""
import platform
return platform.system().lower() == 'linux'
class TestAptProviderHook(TestCase):
"""Test the apt binary provider installation hook."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_hook_script_exists(self):
"""Hook script should exist."""
self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}")
def test_hook_skips_when_apt_not_allowed(self):
"""Hook should skip when apt not in allowed binproviders."""
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=wget',
'--binary-id=test-uuid',
'--machine-id=test-machine',
'--binproviders=pip,npm', # apt not allowed
],
capture_output=True,
text=True,
timeout=30
)
# Should exit cleanly (code 0) when apt not allowed
self.assertIn('apt provider not allowed', result.stderr)
self.assertEqual(result.returncode, 0)
@pytest.mark.skipif(not is_linux(), reason="apt only available on Linux")
@pytest.mark.skipif(not apt_available(), reason="apt not installed")
def test_hook_detects_apt(self):
"""Hook should detect apt binary when available."""
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=nonexistent-pkg-xyz123',
'--binary-id=test-uuid',
'--machine-id=test-machine',
],
capture_output=True,
text=True,
timeout=30
)
# Should not say apt is not available
self.assertNotIn('apt not available', result.stderr)
def test_hook_handles_overrides(self):
"""Hook should accept overrides JSON."""
overrides = json.dumps({
'apt': {'packages': ['custom-package-name']}
})
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=test-pkg',
'--binary-id=test-uuid',
'--machine-id=test-machine',
f'--overrides={overrides}',
],
capture_output=True,
text=True,
timeout=30
)
# Should not crash parsing overrides
self.assertNotIn('Traceback', result.stderr)
@pytest.mark.skipif(not is_linux(), reason="apt only available on Linux")
@pytest.mark.skipif(not apt_available(), reason="apt not installed")
class TestAptProviderSystemBinaries(TestCase):
"""Test apt provider with system binaries."""
def test_detect_existing_binary(self):
"""apt provider should detect already-installed system binaries."""
# Check for a binary that's almost certainly installed (like 'ls' or 'bash')
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=bash',
'--binary-id=test-uuid',
'--machine-id=test-machine',
],
capture_output=True,
text=True,
timeout=60
)
# Parse JSONL output
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if record.get('type') == 'Binary' and record.get('name') == 'bash':
# Found bash
self.assertTrue(record.get('abspath'))
self.assertTrue(Path(record['abspath']).exists())
return
except json.JSONDecodeError:
continue
# apt may not be able to "install" bash (already installed)
# Just verify no crash
self.assertNotIn('Traceback', result.stderr)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the npm binary provider plugin."""

View File

@@ -0,0 +1,144 @@
"""
Tests for the npm binary provider plugin.
Tests cover:
1. Hook script execution
2. npm package installation
3. PATH and NODE_MODULES_DIR updates
4. JSONL output format
"""
import json
import os
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Get the path to the npm provider hook
PLUGIN_DIR = Path(__file__).parent.parent
INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_npm_provider.py'
def npm_available() -> bool:
"""Check if npm is installed."""
return shutil.which('npm') is not None
class TestNpmProviderHook(TestCase):
"""Test the npm binary provider installation hook."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.lib_dir = Path(self.temp_dir) / 'lib' / 'x86_64-linux'
self.lib_dir.mkdir(parents=True)
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_hook_script_exists(self):
"""Hook script should exist."""
self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}")
def test_hook_requires_lib_dir(self):
"""Hook should fail when LIB_DIR is not set."""
env = os.environ.copy()
env.pop('LIB_DIR', None) # Remove LIB_DIR
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=some-package',
'--binary-id=test-uuid',
'--machine-id=test-machine',
],
capture_output=True,
text=True,
env=env,
timeout=30
)
self.assertIn('LIB_DIR environment variable not set', result.stderr)
self.assertEqual(result.returncode, 1)
def test_hook_skips_when_npm_not_allowed(self):
"""Hook should skip when npm not in allowed binproviders."""
env = os.environ.copy()
env['LIB_DIR'] = str(self.lib_dir)
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=some-package',
'--binary-id=test-uuid',
'--machine-id=test-machine',
'--binproviders=pip,apt', # npm not allowed
],
capture_output=True,
text=True,
env=env,
timeout=30
)
# Should exit cleanly (code 0) when npm not allowed
self.assertIn('npm provider not allowed', result.stderr)
self.assertEqual(result.returncode, 0)
@pytest.mark.skipif(not npm_available(), reason="npm not installed")
def test_hook_creates_npm_prefix(self):
"""Hook should create npm prefix directory."""
env = os.environ.copy()
env['LIB_DIR'] = str(self.lib_dir)
# Even if installation fails, the npm prefix should be created
subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=nonexistent-xyz123',
'--binary-id=test-uuid',
'--machine-id=test-machine',
],
capture_output=True,
text=True,
env=env,
timeout=60
)
npm_prefix = self.lib_dir / 'npm'
self.assertTrue(npm_prefix.exists())
def test_hook_handles_overrides(self):
"""Hook should accept overrides JSON."""
env = os.environ.copy()
env['LIB_DIR'] = str(self.lib_dir)
overrides = json.dumps({'npm': {'packages': ['custom-pkg']}})
# Just verify it doesn't crash with overrides
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=test-pkg',
'--binary-id=test-uuid',
'--machine-id=test-machine',
f'--overrides={overrides}',
],
capture_output=True,
text=True,
env=env,
timeout=60
)
# May fail to install, but should not crash parsing overrides
self.assertNotIn('Failed to parse overrides JSON', result.stderr)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the pip binary provider plugin."""

View File

@@ -0,0 +1,175 @@
"""
Tests for the pip binary provider plugin.
Tests cover:
1. Hook script execution
2. pip package detection
3. Virtual environment handling
4. JSONL output format
"""
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from django.test import TestCase
# Get the path to the pip provider hook
PLUGIN_DIR = Path(__file__).parent.parent
INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_pip_provider.py'
class TestPipProviderHook(TestCase):
"""Test the pip binary provider installation hook."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.output_dir = Path(self.temp_dir) / 'output'
self.output_dir.mkdir()
def tearDown(self):
"""Clean up."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_hook_script_exists(self):
"""Hook script should exist."""
self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}")
def test_hook_help(self):
"""Hook should accept --help without error."""
result = subprocess.run(
[sys.executable, str(INSTALL_HOOK), '--help'],
capture_output=True,
text=True,
timeout=30
)
# May succeed or fail depending on implementation
# At minimum should not crash with Python error
self.assertNotIn('Traceback', result.stderr)
def test_hook_finds_python(self):
"""Hook should find Python binary."""
env = os.environ.copy()
env['DATA_DIR'] = self.temp_dir
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=python3',
'--binproviders=pip,env',
],
capture_output=True,
text=True,
cwd=str(self.output_dir),
env=env,
timeout=60
)
# Check for JSONL output
jsonl_found = False
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if record.get('type') == 'Binary' and record.get('name') == 'python3':
jsonl_found = True
# Verify structure
self.assertIn('abspath', record)
self.assertIn('version', record)
break
except json.JSONDecodeError:
continue
# May or may not find python3 via pip, but should not crash
self.assertNotIn('Traceback', result.stderr)
def test_hook_unknown_package(self):
"""Hook should handle unknown packages gracefully."""
env = os.environ.copy()
env['DATA_DIR'] = self.temp_dir
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=nonexistent_package_xyz123',
'--binproviders=pip',
],
capture_output=True,
text=True,
cwd=str(self.output_dir),
env=env,
timeout=60
)
# Should not crash
self.assertNotIn('Traceback', result.stderr)
# May have non-zero exit code for missing package
class TestPipProviderIntegration(TestCase):
"""Integration tests for pip provider with real packages."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.output_dir = Path(self.temp_dir) / 'output'
self.output_dir.mkdir()
def tearDown(self):
"""Clean up."""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
@pytest.mark.skipif(
subprocess.run([sys.executable, '-m', 'pip', '--version'],
capture_output=True).returncode != 0,
reason="pip not available"
)
def test_hook_finds_pip_installed_binary(self):
"""Hook should find binaries installed via pip."""
env = os.environ.copy()
env['DATA_DIR'] = self.temp_dir
# Try to find 'pip' itself which should be available
result = subprocess.run(
[
sys.executable, str(INSTALL_HOOK),
'--name=pip',
'--binproviders=pip,env',
],
capture_output=True,
text=True,
cwd=str(self.output_dir),
env=env,
timeout=60
)
# Look for success in output
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if record.get('type') == 'Binary' and 'pip' in record.get('name', ''):
# Found pip binary
self.assertTrue(record.get('abspath'))
return
except json.JSONDecodeError:
continue
# If we get here without finding pip, that's acceptable
# as long as the hook didn't crash
self.assertNotIn('Traceback', result.stderr)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the redirects plugin."""

View File

@@ -0,0 +1,134 @@
"""
Tests for the redirects plugin.
Tests the real redirects hook with actual URLs to verify
redirect chain capture.
"""
import json
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Import chrome test helpers
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
from chrome_test_helpers import (
chrome_session,
get_test_env,
get_plugin_dir,
get_hook_script,
)
def chrome_available() -> bool:
"""Check if Chrome/Chromium is available."""
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
if shutil.which(name):
return True
return False
# Get the path to the redirects hook
PLUGIN_DIR = get_plugin_dir(__file__)
REDIRECTS_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_redirects.*')
class TestRedirectsPlugin(TestCase):
"""Test the redirects plugin."""
def test_redirects_hook_exists(self):
"""Redirects hook script should exist."""
self.assertIsNotNone(REDIRECTS_HOOK, "Redirects hook not found in plugin directory")
self.assertTrue(REDIRECTS_HOOK.exists(), f"Hook not found: {REDIRECTS_HOOK}")
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
class TestRedirectsWithChrome(TestCase):
"""Integration tests for redirects plugin with Chrome."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = Path(tempfile.mkdtemp())
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_redirects_captures_navigation(self):
"""Redirects hook should capture URL navigation without errors."""
# Use a URL that doesn't redirect (simple case)
test_url = 'https://example.com'
snapshot_id = 'test-redirects-snapshot'
try:
with chrome_session(
self.temp_dir,
crawl_id='test-redirects-crawl',
snapshot_id=snapshot_id,
test_url=test_url,
navigate=True,
timeout=30,
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
# Get environment and run the redirects hook
env = get_test_env()
env['CHROME_HEADLESS'] = 'true'
# Run redirects hook with the active Chrome session
result = subprocess.run(
['node', str(REDIRECTS_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
cwd=str(snapshot_chrome_dir),
capture_output=True,
text=True,
timeout=60,
env=env
)
# Check for output file
redirects_output = snapshot_chrome_dir / 'redirects.jsonl'
redirects_data = None
# Try parsing from file first
if redirects_output.exists():
with open(redirects_output) as f:
for line in f:
line = line.strip()
if line.startswith('{'):
try:
redirects_data = json.loads(line)
break
except json.JSONDecodeError:
continue
# Try parsing from stdout if not in file
if not redirects_data:
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if 'chain' in record or 'redirects' in record or record.get('type') == 'Redirects':
redirects_data = record
break
except json.JSONDecodeError:
continue
# Verify hook ran successfully
# example.com typically doesn't redirect, so we just verify no errors
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
self.assertNotIn('Traceback', result.stderr)
self.assertNotIn('Error:', result.stderr)
except RuntimeError as e:
if 'Chrome' in str(e) or 'CDP' in str(e):
self.skipTest(f"Chrome session setup failed: {e}")
raise
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,308 @@
"""
Tests for the ripgrep search backend.
Tests cover:
1. Search with ripgrep binary
2. Snapshot ID extraction from file paths
3. Timeout handling
4. Error handling
5. Environment variable configuration
"""
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from django.test import TestCase
from archivebox.plugins.search_backend_ripgrep.search import (
search,
flush,
get_env,
get_env_int,
get_env_array,
)
class TestEnvHelpers(TestCase):
"""Test environment variable helper functions."""
def test_get_env_default(self):
"""get_env should return default for unset vars."""
result = get_env('NONEXISTENT_VAR_12345', 'default')
self.assertEqual(result, 'default')
def test_get_env_set(self):
"""get_env should return value for set vars."""
with patch.dict(os.environ, {'TEST_VAR': 'value'}):
result = get_env('TEST_VAR', 'default')
self.assertEqual(result, 'value')
def test_get_env_strips_whitespace(self):
"""get_env should strip whitespace."""
with patch.dict(os.environ, {'TEST_VAR': ' value '}):
result = get_env('TEST_VAR', '')
self.assertEqual(result, 'value')
def test_get_env_int_default(self):
"""get_env_int should return default for unset vars."""
result = get_env_int('NONEXISTENT_VAR_12345', 42)
self.assertEqual(result, 42)
def test_get_env_int_valid(self):
"""get_env_int should parse integer values."""
with patch.dict(os.environ, {'TEST_INT': '100'}):
result = get_env_int('TEST_INT', 0)
self.assertEqual(result, 100)
def test_get_env_int_invalid(self):
"""get_env_int should return default for invalid integers."""
with patch.dict(os.environ, {'TEST_INT': 'not a number'}):
result = get_env_int('TEST_INT', 42)
self.assertEqual(result, 42)
def test_get_env_array_default(self):
"""get_env_array should return default for unset vars."""
result = get_env_array('NONEXISTENT_VAR_12345', ['default'])
self.assertEqual(result, ['default'])
def test_get_env_array_valid(self):
"""get_env_array should parse JSON arrays."""
with patch.dict(os.environ, {'TEST_ARRAY': '["a", "b", "c"]'}):
result = get_env_array('TEST_ARRAY', [])
self.assertEqual(result, ['a', 'b', 'c'])
def test_get_env_array_invalid_json(self):
"""get_env_array should return default for invalid JSON."""
with patch.dict(os.environ, {'TEST_ARRAY': 'not json'}):
result = get_env_array('TEST_ARRAY', ['default'])
self.assertEqual(result, ['default'])
def test_get_env_array_not_array(self):
"""get_env_array should return default for non-array JSON."""
with patch.dict(os.environ, {'TEST_ARRAY': '{"key": "value"}'}):
result = get_env_array('TEST_ARRAY', ['default'])
self.assertEqual(result, ['default'])
class TestRipgrepFlush(TestCase):
"""Test the flush function."""
def test_flush_is_noop(self):
"""flush should be a no-op for ripgrep backend."""
# Should not raise
flush(['snap-001', 'snap-002'])
class TestRipgrepSearch(TestCase):
"""Test the ripgrep search function."""
def setUp(self):
"""Create temporary archive directory with test files."""
self.temp_dir = tempfile.mkdtemp()
self.archive_dir = Path(self.temp_dir) / 'archive'
self.archive_dir.mkdir()
# Create snapshot directories with searchable content
self._create_snapshot('snap-001', {
'singlefile/index.html': '<html><body>Python programming tutorial</body></html>',
'title/title.txt': 'Learn Python Programming',
})
self._create_snapshot('snap-002', {
'singlefile/index.html': '<html><body>JavaScript guide</body></html>',
'title/title.txt': 'JavaScript Basics',
})
self._create_snapshot('snap-003', {
'wget/index.html': '<html><body>Web archiving best practices</body></html>',
'title/title.txt': 'Web Archiving Guide',
})
# Patch settings
self.settings_patch = patch(
'archivebox.plugins.search_backend_ripgrep.search.settings'
)
self.mock_settings = self.settings_patch.start()
self.mock_settings.ARCHIVE_DIR = str(self.archive_dir)
def tearDown(self):
"""Clean up temporary directory."""
self.settings_patch.stop()
shutil.rmtree(self.temp_dir, ignore_errors=True)
def _create_snapshot(self, snapshot_id: str, files: dict):
"""Create a snapshot directory with files."""
snap_dir = self.archive_dir / snapshot_id
for path, content in files.items():
file_path = snap_dir / path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content)
def _has_ripgrep(self) -> bool:
"""Check if ripgrep is available."""
return shutil.which('rg') is not None
def test_search_no_archive_dir(self):
"""search should return empty list when archive dir doesn't exist."""
self.mock_settings.ARCHIVE_DIR = '/nonexistent/path'
results = search('test')
self.assertEqual(results, [])
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_single_match(self):
"""search should find matching snapshot."""
results = search('Python programming')
self.assertIn('snap-001', results)
self.assertNotIn('snap-002', results)
self.assertNotIn('snap-003', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_multiple_matches(self):
"""search should find all matching snapshots."""
# 'guide' appears in snap-002 (JavaScript guide) and snap-003 (Archiving Guide)
results = search('guide')
self.assertIn('snap-002', results)
self.assertIn('snap-003', results)
self.assertNotIn('snap-001', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_case_insensitive_by_default(self):
"""search should be case-sensitive (ripgrep default)."""
# By default rg is case-sensitive
results_upper = search('PYTHON')
results_lower = search('python')
# Depending on ripgrep config, results may differ
self.assertIsInstance(results_upper, list)
self.assertIsInstance(results_lower, list)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_no_results(self):
"""search should return empty list for no matches."""
results = search('xyznonexistent123')
self.assertEqual(results, [])
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_regex(self):
"""search should support regex patterns."""
results = search('(Python|JavaScript)')
self.assertIn('snap-001', results)
self.assertIn('snap-002', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_distinct_snapshots(self):
"""search should return distinct snapshot IDs."""
# Query matches both files in snap-001
results = search('Python')
# Should only appear once
self.assertEqual(results.count('snap-001'), 1)
def test_search_missing_binary(self):
"""search should raise when ripgrep binary not found."""
with patch.dict(os.environ, {'RIPGREP_BINARY': '/nonexistent/rg'}):
with patch('shutil.which', return_value=None):
with self.assertRaises(RuntimeError) as context:
search('test')
self.assertIn('ripgrep binary not found', str(context.exception))
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_with_custom_args(self):
"""search should use custom RIPGREP_ARGS."""
with patch.dict(os.environ, {'RIPGREP_ARGS': '["-i"]'}): # Case insensitive
results = search('PYTHON')
# With -i flag, should find regardless of case
self.assertIn('snap-001', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_timeout(self):
"""search should handle timeout gracefully."""
with patch.dict(os.environ, {'RIPGREP_TIMEOUT': '1'}):
# Short timeout, should still complete for small archive
results = search('Python')
self.assertIsInstance(results, list)
class TestRipgrepSearchIntegration(TestCase):
"""Integration tests with realistic archive structure."""
def setUp(self):
"""Create archive with realistic structure."""
self.temp_dir = tempfile.mkdtemp()
self.archive_dir = Path(self.temp_dir) / 'archive'
self.archive_dir.mkdir()
# Realistic snapshot structure
self._create_snapshot('1704067200.123456', { # 2024-01-01
'singlefile.html': '''<!DOCTYPE html>
<html>
<head><title>ArchiveBox Documentation</title></head>
<body>
<h1>Getting Started with ArchiveBox</h1>
<p>ArchiveBox is a powerful, self-hosted web archiving tool.</p>
<p>Install with: pip install archivebox</p>
</body>
</html>''',
'title/title.txt': 'ArchiveBox Documentation',
'screenshot/screenshot.png': b'PNG IMAGE DATA', # Binary file
})
self._create_snapshot('1704153600.654321', { # 2024-01-02
'wget/index.html': '''<html>
<head><title>Python News</title></head>
<body>
<h1>Python 3.12 Released</h1>
<p>New features include improved error messages and performance.</p>
</body>
</html>''',
'readability/content.html': '<p>Python 3.12 has been released with exciting new features.</p>',
})
self.settings_patch = patch(
'archivebox.plugins.search_backend_ripgrep.search.settings'
)
self.mock_settings = self.settings_patch.start()
self.mock_settings.ARCHIVE_DIR = str(self.archive_dir)
def tearDown(self):
"""Clean up."""
self.settings_patch.stop()
shutil.rmtree(self.temp_dir, ignore_errors=True)
def _create_snapshot(self, timestamp: str, files: dict):
"""Create snapshot with timestamp-based ID."""
snap_dir = self.archive_dir / timestamp
for path, content in files.items():
file_path = snap_dir / path
file_path.parent.mkdir(parents=True, exist_ok=True)
if isinstance(content, bytes):
file_path.write_bytes(content)
else:
file_path.write_text(content)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_archivebox(self):
"""Search for archivebox should find documentation snapshot."""
results = search('archivebox')
self.assertIn('1704067200.123456', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_python(self):
"""Search for python should find Python news snapshot."""
results = search('Python')
self.assertIn('1704153600.654321', results)
@pytest.mark.skipif(not shutil.which('rg'), reason="ripgrep not installed")
def test_search_pip_install(self):
"""Search for installation command."""
results = search('pip install')
self.assertIn('1704067200.123456', results)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the SQLite FTS5 search backend."""

View File

@@ -0,0 +1,351 @@
"""
Tests for the SQLite FTS5 search backend.
Tests cover:
1. Search index creation
2. Indexing snapshots
3. Search queries with real test data
4. Flush operations
5. Edge cases (empty index, special characters)
"""
import os
import sqlite3
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from django.test import TestCase, override_settings
from archivebox.plugins.search_backend_sqlite.search import (
get_db_path,
search,
flush,
SQLITEFTS_DB,
FTS_TOKENIZERS,
)
class TestSqliteSearchBackend(TestCase):
"""Test SQLite FTS5 search backend."""
def setUp(self):
"""Create a temporary data directory with search index."""
self.temp_dir = tempfile.mkdtemp()
self.db_path = Path(self.temp_dir) / SQLITEFTS_DB
# Patch DATA_DIR
self.settings_patch = patch(
'archivebox.plugins.search_backend_sqlite.search.settings'
)
self.mock_settings = self.settings_patch.start()
self.mock_settings.DATA_DIR = self.temp_dir
# Create FTS5 table
self._create_index()
def tearDown(self):
"""Clean up temporary directory."""
self.settings_patch.stop()
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def _create_index(self):
"""Create the FTS5 search index table."""
conn = sqlite3.connect(str(self.db_path))
try:
conn.execute(f'''
CREATE VIRTUAL TABLE IF NOT EXISTS search_index
USING fts5(
snapshot_id,
url,
title,
content,
tokenize = '{FTS_TOKENIZERS}'
)
''')
conn.commit()
finally:
conn.close()
def _index_snapshot(self, snapshot_id: str, url: str, title: str, content: str):
"""Add a snapshot to the index."""
conn = sqlite3.connect(str(self.db_path))
try:
conn.execute(
'INSERT INTO search_index (snapshot_id, url, title, content) VALUES (?, ?, ?, ?)',
(snapshot_id, url, title, content)
)
conn.commit()
finally:
conn.close()
def test_get_db_path(self):
"""get_db_path should return correct path."""
path = get_db_path()
self.assertEqual(path, Path(self.temp_dir) / SQLITEFTS_DB)
def test_search_empty_index(self):
"""search should return empty list for empty index."""
results = search('nonexistent')
self.assertEqual(results, [])
def test_search_no_index_file(self):
"""search should return empty list when index file doesn't exist."""
os.remove(self.db_path)
results = search('test')
self.assertEqual(results, [])
def test_search_single_result(self):
"""search should find matching snapshot."""
self._index_snapshot(
'snap-001',
'https://example.com/page1',
'Example Page',
'This is example content about testing.'
)
results = search('example')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-001')
def test_search_multiple_results(self):
"""search should find all matching snapshots."""
self._index_snapshot('snap-001', 'https://example.com/1', 'Python Tutorial', 'Learn Python programming')
self._index_snapshot('snap-002', 'https://example.com/2', 'Python Guide', 'Advanced Python concepts')
self._index_snapshot('snap-003', 'https://example.com/3', 'JavaScript Basics', 'Learn JavaScript')
results = search('Python')
self.assertEqual(len(results), 2)
self.assertIn('snap-001', results)
self.assertIn('snap-002', results)
self.assertNotIn('snap-003', results)
def test_search_title_match(self):
"""search should match against title."""
self._index_snapshot('snap-001', 'https://example.com', 'Django Web Framework', 'Content here')
results = search('Django')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-001')
def test_search_url_match(self):
"""search should match against URL."""
self._index_snapshot('snap-001', 'https://archivebox.io/docs', 'Title', 'Content')
results = search('archivebox')
self.assertEqual(len(results), 1)
def test_search_content_match(self):
"""search should match against content."""
self._index_snapshot(
'snap-001',
'https://example.com',
'Generic Title',
'This document contains information about cryptography and security.'
)
results = search('cryptography')
self.assertEqual(len(results), 1)
def test_search_case_insensitive(self):
"""search should be case insensitive."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'PYTHON programming')
results = search('python')
self.assertEqual(len(results), 1)
def test_search_stemming(self):
"""search should use porter stemmer for word stems."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Programming concepts')
# 'program' should match 'programming' with porter stemmer
results = search('program')
self.assertEqual(len(results), 1)
def test_search_multiple_words(self):
"""search should match documents with all words."""
self._index_snapshot('snap-001', 'https://example.com', 'Web Development', 'Learn web development skills')
self._index_snapshot('snap-002', 'https://example.com', 'Web Design', 'Design beautiful websites')
results = search('web development')
# FTS5 defaults to OR, so both might match
# With porter stemmer, both should match 'web'
self.assertIn('snap-001', results)
def test_search_phrase(self):
"""search should support phrase queries."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'machine learning algorithms')
self._index_snapshot('snap-002', 'https://example.com', 'Title', 'machine algorithms learning')
# Phrase search with quotes
results = search('"machine learning"')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-001')
def test_search_distinct_results(self):
"""search should return distinct snapshot IDs."""
# Index same snapshot twice (could happen with multiple fields matching)
self._index_snapshot('snap-001', 'https://python.org', 'Python', 'Python programming language')
results = search('Python')
self.assertEqual(len(results), 1)
def test_flush_single(self):
"""flush should remove snapshot from index."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Content')
self._index_snapshot('snap-002', 'https://example.com', 'Title', 'Content')
flush(['snap-001'])
results = search('Content')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-002')
def test_flush_multiple(self):
"""flush should remove multiple snapshots."""
self._index_snapshot('snap-001', 'https://example.com', 'Title', 'Test')
self._index_snapshot('snap-002', 'https://example.com', 'Title', 'Test')
self._index_snapshot('snap-003', 'https://example.com', 'Title', 'Test')
flush(['snap-001', 'snap-003'])
results = search('Test')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], 'snap-002')
def test_flush_nonexistent(self):
"""flush should not raise for nonexistent snapshots."""
# Should not raise
flush(['nonexistent-snap'])
def test_flush_no_index(self):
"""flush should not raise when index doesn't exist."""
os.remove(self.db_path)
# Should not raise
flush(['snap-001'])
def test_search_special_characters(self):
"""search should handle special characters in queries."""
self._index_snapshot('snap-001', 'https://example.com', 'C++ Programming', 'Learn C++ basics')
# FTS5 handles special chars
results = search('C++')
# May or may not match depending on tokenizer config
# At minimum, should not raise
self.assertIsInstance(results, list)
def test_search_unicode(self):
"""search should handle unicode content."""
self._index_snapshot('snap-001', 'https://example.com', 'Titre Francais', 'cafe resume')
self._index_snapshot('snap-002', 'https://example.com', 'Japanese', 'Hello world')
# With remove_diacritics, 'cafe' should match
results = search('cafe')
self.assertEqual(len(results), 1)
class TestSqliteSearchWithRealData(TestCase):
"""Integration tests with realistic archived content."""
def setUp(self):
"""Create index with realistic test data."""
self.temp_dir = tempfile.mkdtemp()
self.db_path = Path(self.temp_dir) / SQLITEFTS_DB
self.settings_patch = patch(
'archivebox.plugins.search_backend_sqlite.search.settings'
)
self.mock_settings = self.settings_patch.start()
self.mock_settings.DATA_DIR = self.temp_dir
# Create index
conn = sqlite3.connect(str(self.db_path))
try:
conn.execute(f'''
CREATE VIRTUAL TABLE IF NOT EXISTS search_index
USING fts5(
snapshot_id,
url,
title,
content,
tokenize = '{FTS_TOKENIZERS}'
)
''')
# Index realistic data
test_data = [
('snap-001', 'https://github.com/ArchiveBox/ArchiveBox',
'ArchiveBox - Self-hosted web archiving',
'Open source self-hosted web archiving. Collects, saves, and displays various types of content.'),
('snap-002', 'https://docs.python.org/3/tutorial/',
'Python 3 Tutorial',
'An informal introduction to Python. Python is an easy to learn, powerful programming language.'),
('snap-003', 'https://developer.mozilla.org/docs/Web/JavaScript',
'JavaScript - MDN Web Docs',
'JavaScript (JS) is a lightweight, interpreted programming language with first-class functions.'),
('snap-004', 'https://news.ycombinator.com',
'Hacker News',
'Social news website focusing on computer science and entrepreneurship.'),
('snap-005', 'https://en.wikipedia.org/wiki/Web_archiving',
'Web archiving - Wikipedia',
'Web archiving is the process of collecting portions of the World Wide Web to ensure the information is preserved.'),
]
conn.executemany(
'INSERT INTO search_index (snapshot_id, url, title, content) VALUES (?, ?, ?, ?)',
test_data
)
conn.commit()
finally:
conn.close()
def tearDown(self):
"""Clean up."""
self.settings_patch.stop()
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_search_archivebox(self):
"""Search for 'archivebox' should find relevant results."""
results = search('archivebox')
self.assertIn('snap-001', results)
def test_search_programming(self):
"""Search for 'programming' should find Python and JS docs."""
results = search('programming')
self.assertIn('snap-002', results)
self.assertIn('snap-003', results)
def test_search_web_archiving(self):
"""Search for 'web archiving' should find relevant results."""
results = search('web archiving')
# Both ArchiveBox and Wikipedia should match
self.assertIn('snap-001', results)
self.assertIn('snap-005', results)
def test_search_github(self):
"""Search for 'github' should find URL match."""
results = search('github')
self.assertIn('snap-001', results)
def test_search_tutorial(self):
"""Search for 'tutorial' should find Python tutorial."""
results = search('tutorial')
self.assertIn('snap-002', results)
def test_flush_and_search(self):
"""Flushing a snapshot should remove it from search results."""
# Verify it's there first
results = search('archivebox')
self.assertIn('snap-001', results)
# Flush it
flush(['snap-001'])
# Should no longer be found
results = search('archivebox')
self.assertNotIn('snap-001', results)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the SEO plugin."""

View File

@@ -0,0 +1,135 @@
"""
Tests for the SEO plugin.
Tests the real SEO hook with an actual URL to verify
meta tag extraction.
"""
import json
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Import chrome test helpers
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
from chrome_test_helpers import (
chrome_session,
get_test_env,
get_plugin_dir,
get_hook_script,
)
def chrome_available() -> bool:
"""Check if Chrome/Chromium is available."""
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
if shutil.which(name):
return True
return False
# Get the path to the SEO hook
PLUGIN_DIR = get_plugin_dir(__file__)
SEO_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_seo.*')
class TestSEOPlugin(TestCase):
"""Test the SEO plugin."""
def test_seo_hook_exists(self):
"""SEO hook script should exist."""
self.assertIsNotNone(SEO_HOOK, "SEO hook not found in plugin directory")
self.assertTrue(SEO_HOOK.exists(), f"Hook not found: {SEO_HOOK}")
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
class TestSEOWithChrome(TestCase):
"""Integration tests for SEO plugin with Chrome."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = Path(tempfile.mkdtemp())
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_seo_extracts_meta_tags(self):
"""SEO hook should extract meta tags from a real URL."""
test_url = 'https://example.com'
snapshot_id = 'test-seo-snapshot'
try:
with chrome_session(
self.temp_dir,
crawl_id='test-seo-crawl',
snapshot_id=snapshot_id,
test_url=test_url,
navigate=True,
timeout=30,
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
# Get environment and run the SEO hook
env = get_test_env()
env['CHROME_HEADLESS'] = 'true'
# Run SEO hook with the active Chrome session
result = subprocess.run(
['node', str(SEO_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
cwd=str(snapshot_chrome_dir),
capture_output=True,
text=True,
timeout=60,
env=env
)
# Check for output file
seo_output = snapshot_chrome_dir / 'seo.json'
seo_data = None
# Try parsing from file first
if seo_output.exists():
with open(seo_output) as f:
try:
seo_data = json.load(f)
except json.JSONDecodeError:
pass
# Try parsing from stdout if not in file
if not seo_data:
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
# SEO data typically has title, description, or og: tags
if any(key in record for key in ['title', 'description', 'og:title', 'canonical']):
seo_data = record
break
except json.JSONDecodeError:
continue
# Verify hook ran successfully
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
self.assertNotIn('Traceback', result.stderr)
self.assertNotIn('Error:', result.stderr)
# example.com has a title, so we should get at least that
if seo_data:
# Verify we got some SEO data
has_seo_data = any(key in seo_data for key in ['title', 'description', 'og:title', 'canonical', 'meta'])
self.assertTrue(has_seo_data, f"No SEO data extracted: {seo_data}")
except RuntimeError as e:
if 'Chrome' in str(e) or 'CDP' in str(e):
self.skipTest(f"Chrome session setup failed: {e}")
raise
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1 @@
"""Tests for the SSL plugin."""

View File

@@ -0,0 +1,139 @@
"""
Tests for the SSL plugin.
Tests the real SSL hook with an actual HTTPS URL to verify
certificate information extraction.
"""
import json
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from django.test import TestCase
# Import chrome test helpers
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
from chrome_test_helpers import (
chrome_session,
get_test_env,
get_plugin_dir,
get_hook_script,
)
def chrome_available() -> bool:
"""Check if Chrome/Chromium is available."""
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
if shutil.which(name):
return True
return False
# Get the path to the SSL hook
PLUGIN_DIR = get_plugin_dir(__file__)
SSL_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_ssl.*')
class TestSSLPlugin(TestCase):
"""Test the SSL plugin with real HTTPS URLs."""
def test_ssl_hook_exists(self):
"""SSL hook script should exist."""
self.assertIsNotNone(SSL_HOOK, "SSL hook not found in plugin directory")
self.assertTrue(SSL_HOOK.exists(), f"Hook not found: {SSL_HOOK}")
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
class TestSSLWithChrome(TestCase):
"""Integration tests for SSL plugin with Chrome."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = Path(tempfile.mkdtemp())
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_ssl_extracts_certificate_from_https_url(self):
"""SSL hook should extract certificate info from a real HTTPS URL."""
test_url = 'https://example.com'
snapshot_id = 'test-ssl-snapshot'
try:
with chrome_session(
self.temp_dir,
crawl_id='test-ssl-crawl',
snapshot_id=snapshot_id,
test_url=test_url,
navigate=True,
timeout=30,
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
# Get environment and run the SSL hook
env = get_test_env()
env['CHROME_HEADLESS'] = 'true'
# Run SSL hook with the active Chrome session
result = subprocess.run(
['node', str(SSL_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
cwd=str(snapshot_chrome_dir),
capture_output=True,
text=True,
timeout=60,
env=env
)
# Check for output file
ssl_output = snapshot_chrome_dir / 'ssl.jsonl'
ssl_data = None
# Try parsing from file first
if ssl_output.exists():
with open(ssl_output) as f:
for line in f:
line = line.strip()
if line.startswith('{'):
try:
ssl_data = json.loads(line)
break
except json.JSONDecodeError:
continue
# Try parsing from stdout if not in file
if not ssl_data:
for line in result.stdout.split('\n'):
line = line.strip()
if line.startswith('{'):
try:
record = json.loads(line)
if 'protocol' in record or 'issuer' in record or record.get('type') == 'SSL':
ssl_data = record
break
except json.JSONDecodeError:
continue
# Verify we got SSL data from HTTPS URL
if ssl_data:
# example.com uses HTTPS, should get certificate info
self.assertIn('protocol', ssl_data, f"SSL data missing protocol: {ssl_data}")
self.assertTrue(
ssl_data['protocol'].startswith('TLS') or ssl_data['protocol'].startswith('SSL'),
f"Unexpected protocol: {ssl_data['protocol']}"
)
else:
# If no SSL data, at least verify hook ran without crashing
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
except RuntimeError as e:
if 'Chrome' in str(e) or 'CDP' in str(e):
self.skipTest(f"Chrome session setup failed: {e}")
raise
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -10,29 +10,83 @@ from typing import List, Dict, Any, Optional, Tuple
import pytest
# =============================================================================
# CLI Helpers (defined before fixtures that use them)
# =============================================================================
def run_archivebox_cmd(
args: List[str],
data_dir: Path,
stdin: Optional[str] = None,
timeout: int = 60,
env: Optional[Dict[str, str]] = None,
) -> Tuple[str, str, int]:
"""
Run archivebox command via subprocess, return (stdout, stderr, returncode).
Args:
args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
data_dir: The DATA_DIR to use
stdin: Optional string to pipe to stdin
timeout: Command timeout in seconds
env: Additional environment variables
Returns:
Tuple of (stdout, stderr, returncode)
"""
cmd = [sys.executable, '-m', 'archivebox'] + args
base_env = os.environ.copy()
base_env['DATA_DIR'] = str(data_dir)
base_env['USE_COLOR'] = 'False'
base_env['SHOW_PROGRESS'] = 'False'
# Disable slow extractors for faster tests
base_env['SAVE_ARCHIVEDOTORG'] = 'False'
base_env['SAVE_TITLE'] = 'False'
base_env['SAVE_FAVICON'] = 'False'
base_env['SAVE_WGET'] = 'False'
base_env['SAVE_WARC'] = 'False'
base_env['SAVE_PDF'] = 'False'
base_env['SAVE_SCREENSHOT'] = 'False'
base_env['SAVE_DOM'] = 'False'
base_env['SAVE_SINGLEFILE'] = 'False'
base_env['SAVE_READABILITY'] = 'False'
base_env['SAVE_MERCURY'] = 'False'
base_env['SAVE_GIT'] = 'False'
base_env['SAVE_YTDLP'] = 'False'
base_env['SAVE_HEADERS'] = 'False'
base_env['SAVE_HTMLTOTEXT'] = 'False'
if env:
base_env.update(env)
result = subprocess.run(
cmd,
input=stdin,
capture_output=True,
text=True,
cwd=data_dir,
env=base_env,
timeout=timeout,
)
return result.stdout, result.stderr, result.returncode
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture
def isolated_data_dir(tmp_path, settings):
def isolated_data_dir(tmp_path):
"""
Create isolated DATA_DIR for each test.
Uses tmp_path for isolation, configures Django settings.
Uses tmp_path for complete isolation.
"""
data_dir = tmp_path / 'archivebox_data'
data_dir.mkdir()
# Set environment for subprocess calls
os.environ['DATA_DIR'] = str(data_dir)
# Update Django settings
settings.DATA_DIR = data_dir
yield data_dir
# Cleanup handled by tmp_path fixture
return data_dir
@pytest.fixture
@@ -40,81 +94,15 @@ def initialized_archive(isolated_data_dir):
"""
Initialize ArchiveBox archive in isolated directory.
Runs `archivebox init` to set up database and directories.
Runs `archivebox init` via subprocess to set up database and directories.
"""
from archivebox.cli.archivebox_init import init
init(setup=True, quick=True)
return isolated_data_dir
@pytest.fixture
def cli_env(initialized_archive):
"""
Environment dict for CLI subprocess calls.
Includes DATA_DIR and disables slow extractors.
"""
return {
**os.environ,
'DATA_DIR': str(initialized_archive),
'USE_COLOR': 'False',
'SHOW_PROGRESS': 'False',
'SAVE_TITLE': 'True',
'SAVE_FAVICON': 'False',
'SAVE_WGET': 'False',
'SAVE_WARC': 'False',
'SAVE_PDF': 'False',
'SAVE_SCREENSHOT': 'False',
'SAVE_DOM': 'False',
'SAVE_SINGLEFILE': 'False',
'SAVE_READABILITY': 'False',
'SAVE_MERCURY': 'False',
'SAVE_GIT': 'False',
'SAVE_YTDLP': 'False',
'SAVE_HEADERS': 'False',
}
# =============================================================================
# CLI Helpers
# =============================================================================
def run_archivebox_cmd(
args: List[str],
stdin: Optional[str] = None,
cwd: Optional[Path] = None,
env: Optional[Dict[str, str]] = None,
timeout: int = 60,
) -> Tuple[str, str, int]:
"""
Run archivebox command, return (stdout, stderr, returncode).
Args:
args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
stdin: Optional string to pipe to stdin
cwd: Working directory (defaults to DATA_DIR from env)
env: Environment variables (defaults to os.environ with DATA_DIR)
timeout: Command timeout in seconds
Returns:
Tuple of (stdout, stderr, returncode)
"""
cmd = [sys.executable, '-m', 'archivebox'] + args
env = env or {**os.environ}
cwd = cwd or Path(env.get('DATA_DIR', '.'))
result = subprocess.run(
cmd,
input=stdin,
capture_output=True,
text=True,
cwd=cwd,
env=env,
timeout=timeout,
stdout, stderr, returncode = run_archivebox_cmd(
['init', '--quick'],
data_dir=isolated_data_dir,
timeout=60,
)
return result.stdout, result.stderr, result.returncode
assert returncode == 0, f"archivebox init failed: {stderr}"
return isolated_data_dir
# =============================================================================
@@ -162,23 +150,6 @@ def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str])
assert record[field] is not None, f"Record field is None: {field}"
# =============================================================================
# Database Assertions
# =============================================================================
def assert_db_count(model_class, filters: Dict[str, Any], expected: int):
"""Assert database count matches expected."""
actual = model_class.objects.filter(**filters).count()
assert actual == expected, \
f"Expected {expected} {model_class.__name__}, got {actual}"
def assert_db_exists(model_class, **filters):
"""Assert at least one record exists matching filters."""
assert model_class.objects.filter(**filters).exists(), \
f"No {model_class.__name__} found matching {filters}"
# =============================================================================
# Test Data Factories
# =============================================================================
@@ -192,11 +163,9 @@ def create_test_url(domain: str = 'example.com', path: str = None) -> str:
def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
"""Create Crawl JSONL record for testing."""
from archivebox.misc.jsonl import TYPE_CRAWL
urls = urls or [create_test_url()]
return {
'type': TYPE_CRAWL,
'type': 'Crawl',
'urls': '\n'.join(urls),
'max_depth': kwargs.get('max_depth', 0),
'tags_str': kwargs.get('tags_str', ''),
@@ -207,10 +176,8 @@ def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
"""Create Snapshot JSONL record for testing."""
from archivebox.misc.jsonl import TYPE_SNAPSHOT
return {
'type': TYPE_SNAPSHOT,
'type': 'Snapshot',
'url': url or create_test_url(),
'tags_str': kwargs.get('tags_str', ''),
'status': kwargs.get('status', 'queued'),

View File

@@ -21,19 +21,19 @@ from archivebox.tests.conftest import (
class TestArchiveResultCreate:
"""Tests for `archivebox archiveresult create`."""
def test_create_from_snapshot_jsonl(self, cli_env, initialized_archive):
def test_create_from_snapshot_jsonl(self, initialized_archive):
"""Create archive results from Snapshot JSONL input."""
url = create_test_url()
# Create a snapshot first
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
# Pipe snapshot to archiveresult create
stdout2, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -47,16 +47,16 @@ class TestArchiveResultCreate:
ar = next(r for r in records if r['type'] == 'ArchiveResult')
assert ar['plugin'] == 'title'
def test_create_with_specific_plugin(self, cli_env, initialized_archive):
def test_create_with_specific_plugin(self, initialized_archive):
"""Create archive result for specific plugin."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=screenshot'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -65,25 +65,25 @@ class TestArchiveResultCreate:
assert len(ar_records) >= 1
assert ar_records[0]['plugin'] == 'screenshot'
def test_create_pass_through_crawl(self, cli_env, initialized_archive):
def test_create_pass_through_crawl(self, initialized_archive):
"""Pass-through Crawl records unchanged."""
url = create_test_url()
# Create crawl and snapshot
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['snapshot', 'create'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
# Now pipe all to archiveresult create
stdout3, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=stdout2,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -94,14 +94,14 @@ class TestArchiveResultCreate:
assert 'Snapshot' in types
assert 'ArchiveResult' in types
def test_create_pass_through_only_when_no_snapshots(self, cli_env, initialized_archive):
def test_create_pass_through_only_when_no_snapshots(self, initialized_archive):
"""Only pass-through records but no new snapshots returns success."""
crawl_record = {'type': 'Crawl', 'id': 'fake-id', 'urls': 'https://example.com'}
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'create'],
stdin=json.dumps(crawl_record),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -111,31 +111,31 @@ class TestArchiveResultCreate:
class TestArchiveResultList:
"""Tests for `archivebox archiveresult list`."""
def test_list_empty(self, cli_env, initialized_archive):
def test_list_empty(self, initialized_archive):
"""List with no archive results returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 archive results' in stderr
def test_list_filter_by_status(self, cli_env, initialized_archive):
def test_list_filter_by_status(self, initialized_archive):
"""Filter archive results by status."""
# Create snapshot and archive result
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--status=queued'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -143,20 +143,20 @@ class TestArchiveResultList:
for r in records:
assert r['status'] == 'queued'
def test_list_filter_by_plugin(self, cli_env, initialized_archive):
def test_list_filter_by_plugin(self, initialized_archive):
"""Filter archive results by plugin."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--plugin=title'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -164,22 +164,22 @@ class TestArchiveResultList:
for r in records:
assert r['plugin'] == 'title'
def test_list_with_limit(self, cli_env, initialized_archive):
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
# Create multiple archive results
for _ in range(3):
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--limit=2'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -190,23 +190,23 @@ class TestArchiveResultList:
class TestArchiveResultUpdate:
"""Tests for `archivebox archiveresult update`."""
def test_update_status(self, cli_env, initialized_archive):
def test_update_status(self, initialized_archive):
"""Update archive result status."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout3, stderr, code = run_archivebox_cmd(
['archiveresult', 'update', '--status=failed'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -219,45 +219,45 @@ class TestArchiveResultUpdate:
class TestArchiveResultDelete:
"""Tests for `archivebox archiveresult delete`."""
def test_delete_requires_yes(self, cli_env, initialized_archive):
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'delete'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, cli_env, initialized_archive):
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'delete', '--yes'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0

View File

@@ -23,13 +23,13 @@ from archivebox.tests.conftest import (
class TestCrawlCreate:
"""Tests for `archivebox crawl create`."""
def test_create_from_url_args(self, cli_env, initialized_archive):
def test_create_from_url_args(self, initialized_archive):
"""Create crawl from URL arguments."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -41,7 +41,7 @@ class TestCrawlCreate:
assert records[0]['type'] == 'Crawl'
assert url in records[0]['urls']
def test_create_from_stdin_urls(self, cli_env, initialized_archive):
def test_create_from_stdin_urls(self, initialized_archive):
"""Create crawl from stdin URLs (one per line)."""
urls = [create_test_url() for _ in range(3)]
stdin = '\n'.join(urls)
@@ -49,7 +49,7 @@ class TestCrawlCreate:
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=stdin,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -62,33 +62,33 @@ class TestCrawlCreate:
for url in urls:
assert url in crawl['urls']
def test_create_with_depth(self, cli_env, initialized_archive):
def test_create_with_depth(self, initialized_archive):
"""Create crawl with --depth flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', '--depth=2', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert records[0]['max_depth'] == 2
def test_create_with_tag(self, cli_env, initialized_archive):
def test_create_with_tag(self, initialized_archive):
"""Create crawl with --tag flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', '--tag=test-tag', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert 'test-tag' in records[0].get('tags_str', '')
def test_create_pass_through_other_types(self, cli_env, initialized_archive):
def test_create_pass_through_other_types(self, initialized_archive):
"""Pass-through records of other types unchanged."""
tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'}
url = create_test_url()
@@ -97,7 +97,7 @@ class TestCrawlCreate:
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=stdin,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -108,18 +108,18 @@ class TestCrawlCreate:
assert 'Tag' in types
assert 'Crawl' in types
def test_create_pass_through_existing_crawl(self, cli_env, initialized_archive):
def test_create_pass_through_existing_crawl(self, initialized_archive):
"""Existing Crawl records (with id) are passed through."""
# First create a crawl
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Now pipe it back - should pass through
stdout2, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -131,24 +131,24 @@ class TestCrawlCreate:
class TestCrawlList:
"""Tests for `archivebox crawl list`."""
def test_list_empty(self, cli_env, initialized_archive):
def test_list_empty(self, initialized_archive):
"""List with no crawls returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 crawls' in stderr
def test_list_returns_created(self, cli_env, initialized_archive):
def test_list_returns_created(self, initialized_archive):
"""List returns previously created crawls."""
url = create_test_url()
run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -156,14 +156,14 @@ class TestCrawlList:
assert len(records) >= 1
assert any(url in r.get('urls', '') for r in records)
def test_list_filter_by_status(self, cli_env, initialized_archive):
def test_list_filter_by_status(self, initialized_archive):
"""Filter crawls by status."""
url = create_test_url()
run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list', '--status=queued'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -171,15 +171,15 @@ class TestCrawlList:
for r in records:
assert r['status'] == 'queued'
def test_list_with_limit(self, cli_env, initialized_archive):
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
# Create multiple crawls
for _ in range(3):
run_archivebox_cmd(['crawl', 'create', create_test_url()], env=cli_env)
run_archivebox_cmd(['crawl', 'create', create_test_url()], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list', '--limit=2'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -190,18 +190,18 @@ class TestCrawlList:
class TestCrawlUpdate:
"""Tests for `archivebox crawl update`."""
def test_update_status(self, cli_env, initialized_archive):
def test_update_status(self, initialized_archive):
"""Update crawl status."""
# Create a crawl
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Update it
stdout2, stderr, code = run_archivebox_cmd(
['crawl', 'update', '--status=started'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -214,46 +214,46 @@ class TestCrawlUpdate:
class TestCrawlDelete:
"""Tests for `archivebox crawl delete`."""
def test_delete_requires_yes(self, cli_env, initialized_archive):
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, cli_env, initialized_archive):
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete', '--yes'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Deleted 1 crawls' in stderr
def test_delete_dry_run(self, cli_env, initialized_archive):
def test_delete_dry_run(self, initialized_archive):
"""Dry run shows what would be deleted."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete', '--dry-run'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0

View File

@@ -22,14 +22,14 @@ from archivebox.tests.conftest import (
class TestRunWithCrawl:
"""Tests for `archivebox run` with Crawl input."""
def test_run_with_new_crawl(self, cli_env, initialized_archive):
def test_run_with_new_crawl(self, initialized_archive):
"""Run creates and processes a new Crawl (no id)."""
crawl_record = create_test_crawl_json()
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl_record),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -41,19 +41,19 @@ class TestRunWithCrawl:
assert len(crawl_records) >= 1
assert crawl_records[0].get('id') # Should have an id now
def test_run_with_existing_crawl(self, cli_env, initialized_archive):
def test_run_with_existing_crawl(self, initialized_archive):
"""Run re-queues an existing Crawl (with id)."""
url = create_test_url()
# First create a crawl
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Run with the existing crawl
stdout2, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -65,14 +65,14 @@ class TestRunWithCrawl:
class TestRunWithSnapshot:
"""Tests for `archivebox run` with Snapshot input."""
def test_run_with_new_snapshot(self, cli_env, initialized_archive):
def test_run_with_new_snapshot(self, initialized_archive):
"""Run creates and processes a new Snapshot (no id, just url)."""
snapshot_record = create_test_snapshot_json()
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(snapshot_record),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -83,19 +83,19 @@ class TestRunWithSnapshot:
assert len(snapshot_records) >= 1
assert snapshot_records[0].get('id')
def test_run_with_existing_snapshot(self, cli_env, initialized_archive):
def test_run_with_existing_snapshot(self, initialized_archive):
"""Run re-queues an existing Snapshot (with id)."""
url = create_test_url()
# First create a snapshot
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
# Run with the existing snapshot
stdout2, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -103,7 +103,7 @@ class TestRunWithSnapshot:
records = parse_jsonl_output(stdout2)
assert len(records) >= 1
def test_run_with_plain_url(self, cli_env, initialized_archive):
def test_run_with_plain_url(self, initialized_archive):
"""Run accepts plain URL records (no type field)."""
url = create_test_url()
url_record = {'url': url}
@@ -111,7 +111,7 @@ class TestRunWithSnapshot:
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(url_record),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -123,18 +123,18 @@ class TestRunWithSnapshot:
class TestRunWithArchiveResult:
"""Tests for `archivebox run` with ArchiveResult input."""
def test_run_requeues_failed_archiveresult(self, cli_env, initialized_archive):
def test_run_requeues_failed_archiveresult(self, initialized_archive):
"""Run re-queues a failed ArchiveResult."""
url = create_test_url()
# Create snapshot and archive result
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
@@ -143,14 +143,14 @@ class TestRunWithArchiveResult:
run_archivebox_cmd(
['archiveresult', 'update', '--status=failed'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
)
# Now run should re-queue it
stdout3, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -163,14 +163,14 @@ class TestRunWithArchiveResult:
class TestRunPassThrough:
"""Tests for pass-through behavior in `archivebox run`."""
def test_run_passes_through_unknown_types(self, cli_env, initialized_archive):
def test_run_passes_through_unknown_types(self, initialized_archive):
"""Run passes through records with unknown types."""
unknown_record = {'type': 'Unknown', 'id': 'fake-id', 'data': 'test'}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(unknown_record),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -179,7 +179,7 @@ class TestRunPassThrough:
assert len(unknown_records) == 1
assert unknown_records[0]['data'] == 'test'
def test_run_outputs_all_processed_records(self, cli_env, initialized_archive):
def test_run_outputs_all_processed_records(self, initialized_archive):
"""Run outputs all processed records for chaining."""
url = create_test_url()
crawl_record = create_test_crawl_json(urls=[url])
@@ -187,7 +187,7 @@ class TestRunPassThrough:
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl_record),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -200,7 +200,7 @@ class TestRunPassThrough:
class TestRunMixedInput:
"""Tests for `archivebox run` with mixed record types."""
def test_run_handles_mixed_types(self, cli_env, initialized_archive):
def test_run_handles_mixed_types(self, initialized_archive):
"""Run handles mixed Crawl/Snapshot/ArchiveResult input."""
crawl = create_test_crawl_json()
snapshot = create_test_snapshot_json()
@@ -215,7 +215,7 @@ class TestRunMixedInput:
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=stdin,
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -230,24 +230,24 @@ class TestRunMixedInput:
class TestRunEmpty:
"""Tests for `archivebox run` edge cases."""
def test_run_empty_stdin(self, cli_env, initialized_archive):
def test_run_empty_stdin(self, initialized_archive):
"""Run with empty stdin returns success."""
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin='',
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
def test_run_no_records_to_process(self, cli_env, initialized_archive):
def test_run_no_records_to_process(self, initialized_archive):
"""Run with only pass-through records shows message."""
unknown = {'type': 'Unknown', 'id': 'fake'}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(unknown),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0

View File

@@ -22,13 +22,13 @@ from archivebox.tests.conftest import (
class TestSnapshotCreate:
"""Tests for `archivebox snapshot create`."""
def test_create_from_url_args(self, cli_env, initialized_archive):
def test_create_from_url_args(self, initialized_archive):
"""Create snapshot from URL arguments."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -39,19 +39,19 @@ class TestSnapshotCreate:
assert records[0]['type'] == 'Snapshot'
assert records[0]['url'] == url
def test_create_from_crawl_jsonl(self, cli_env, initialized_archive):
def test_create_from_crawl_jsonl(self, initialized_archive):
"""Create snapshots from Crawl JSONL input."""
url = create_test_url()
# First create a crawl
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Pipe crawl to snapshot create
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'create'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -65,20 +65,20 @@ class TestSnapshotCreate:
snapshot = next(r for r in records if r['type'] == 'Snapshot')
assert snapshot['url'] == url
def test_create_with_tag(self, cli_env, initialized_archive):
def test_create_with_tag(self, initialized_archive):
"""Create snapshot with --tag flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create', '--tag=test-tag', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert 'test-tag' in records[0].get('tags_str', '')
def test_create_pass_through_other_types(self, cli_env, initialized_archive):
def test_create_pass_through_other_types(self, initialized_archive):
"""Pass-through records of other types unchanged."""
tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'}
url = create_test_url()
@@ -87,7 +87,7 @@ class TestSnapshotCreate:
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create'],
stdin=stdin,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -97,13 +97,13 @@ class TestSnapshotCreate:
assert 'Tag' in types
assert 'Snapshot' in types
def test_create_multiple_urls(self, cli_env, initialized_archive):
def test_create_multiple_urls(self, initialized_archive):
"""Create snapshots from multiple URLs."""
urls = [create_test_url() for _ in range(3)]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create'] + urls,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -118,24 +118,24 @@ class TestSnapshotCreate:
class TestSnapshotList:
"""Tests for `archivebox snapshot list`."""
def test_list_empty(self, cli_env, initialized_archive):
def test_list_empty(self, initialized_archive):
"""List with no snapshots returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 snapshots' in stderr
def test_list_returns_created(self, cli_env, initialized_archive):
def test_list_returns_created(self, initialized_archive):
"""List returns previously created snapshots."""
url = create_test_url()
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -143,14 +143,14 @@ class TestSnapshotList:
assert len(records) >= 1
assert any(r.get('url') == url for r in records)
def test_list_filter_by_status(self, cli_env, initialized_archive):
def test_list_filter_by_status(self, initialized_archive):
"""Filter snapshots by status."""
url = create_test_url()
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--status=queued'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -158,14 +158,14 @@ class TestSnapshotList:
for r in records:
assert r['status'] == 'queued'
def test_list_filter_by_url_contains(self, cli_env, initialized_archive):
def test_list_filter_by_url_contains(self, initialized_archive):
"""Filter snapshots by URL contains."""
url = create_test_url(domain='unique-domain-12345.com')
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--url__icontains=unique-domain-12345'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -173,14 +173,14 @@ class TestSnapshotList:
assert len(records) == 1
assert 'unique-domain-12345' in records[0]['url']
def test_list_with_limit(self, cli_env, initialized_archive):
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
for _ in range(3):
run_archivebox_cmd(['snapshot', 'create', create_test_url()], env=cli_env)
run_archivebox_cmd(['snapshot', 'create', create_test_url()], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--limit=2'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -191,16 +191,16 @@ class TestSnapshotList:
class TestSnapshotUpdate:
"""Tests for `archivebox snapshot update`."""
def test_update_status(self, cli_env, initialized_archive):
def test_update_status(self, initialized_archive):
"""Update snapshot status."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'update', '--status=started'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -209,16 +209,16 @@ class TestSnapshotUpdate:
records = parse_jsonl_output(stdout2)
assert records[0]['status'] == 'started'
def test_update_add_tag(self, cli_env, initialized_archive):
def test_update_add_tag(self, initialized_archive):
"""Update snapshot by adding tag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'update', '--tag=new-tag'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -228,46 +228,46 @@ class TestSnapshotUpdate:
class TestSnapshotDelete:
"""Tests for `archivebox snapshot delete`."""
def test_delete_requires_yes(self, cli_env, initialized_archive):
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, cli_env, initialized_archive):
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete', '--yes'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Deleted 1 snapshots' in stderr
def test_delete_dry_run(self, cli_env, initialized_archive):
def test_delete_dry_run(self, initialized_archive):
"""Dry run shows what would be deleted."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete', '--dry-run'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0

View File

@@ -0,0 +1 @@
"""Tests for the workers module (Orchestrator, Worker, pid_utils)."""

View File

@@ -0,0 +1,453 @@
"""
Unit tests for the Orchestrator and Worker classes.
Tests cover:
1. Orchestrator lifecycle (startup, shutdown)
2. Queue polling and worker spawning
3. Idle detection and exit logic
4. Worker registration and management
5. Process model methods (replacing old pid_utils)
"""
import os
import tempfile
import time
from pathlib import Path
from datetime import timedelta
from unittest.mock import patch, MagicMock
import pytest
from django.test import TestCase
from django.utils import timezone
from archivebox.workers.orchestrator import Orchestrator
class TestOrchestratorUnit(TestCase):
"""Unit tests for Orchestrator class (mocked dependencies)."""
def test_orchestrator_creation(self):
"""Orchestrator should initialize with correct defaults."""
orchestrator = Orchestrator(exit_on_idle=True)
self.assertTrue(orchestrator.exit_on_idle)
self.assertEqual(orchestrator.idle_count, 0)
self.assertIsNone(orchestrator.pid_file)
def test_orchestrator_repr(self):
"""Orchestrator __repr__ should include PID."""
orchestrator = Orchestrator()
repr_str = repr(orchestrator)
self.assertIn('Orchestrator', repr_str)
self.assertIn(str(os.getpid()), repr_str)
def test_has_pending_work(self):
"""has_pending_work should check if any queue has items."""
orchestrator = Orchestrator()
self.assertFalse(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 0}))
self.assertTrue(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 5}))
self.assertTrue(orchestrator.has_pending_work({'crawl': 10, 'snapshot': 0}))
def test_should_exit_not_exit_on_idle(self):
"""should_exit should return False when exit_on_idle is False."""
orchestrator = Orchestrator(exit_on_idle=False)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
def test_should_exit_pending_work(self):
"""should_exit should return False when there's pending work."""
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 5}))
@patch.object(Orchestrator, 'has_running_workers')
def test_should_exit_running_workers(self, mock_has_workers):
"""should_exit should return False when workers are running."""
mock_has_workers.return_value = True
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
@patch.object(Orchestrator, 'has_running_workers')
@patch.object(Orchestrator, 'has_future_work')
def test_should_exit_idle_timeout(self, mock_future, mock_workers):
"""should_exit should return True after idle timeout with no work."""
mock_workers.return_value = False
mock_future.return_value = False
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT
self.assertTrue(orchestrator.should_exit({'crawl': 0, 'snapshot': 0}))
@patch.object(Orchestrator, 'has_running_workers')
@patch.object(Orchestrator, 'has_future_work')
def test_should_exit_below_idle_timeout(self, mock_future, mock_workers):
"""should_exit should return False below idle timeout."""
mock_workers.return_value = False
mock_future.return_value = False
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT - 1
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
def test_should_spawn_worker_no_queue(self):
"""should_spawn_worker should return False when queue is empty."""
orchestrator = Orchestrator()
# Create a mock worker class
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 0))
def test_should_spawn_worker_at_limit(self):
"""should_spawn_worker should return False when at per-type limit."""
orchestrator = Orchestrator()
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = [{}] * orchestrator.MAX_WORKERS_PER_TYPE
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_at_total_limit(self, mock_total):
"""should_spawn_worker should return False when at total limit."""
orchestrator = Orchestrator()
mock_total.return_value = orchestrator.MAX_TOTAL_WORKERS
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_success(self, mock_total):
"""should_spawn_worker should return True when conditions are met."""
orchestrator = Orchestrator()
mock_total.return_value = 0
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
mock_worker.MAX_CONCURRENT_TASKS = 5
self.assertTrue(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_enough_workers(self, mock_total):
"""should_spawn_worker should return False when enough workers for queue."""
orchestrator = Orchestrator()
mock_total.return_value = 2
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = [{}] # 1 worker running
mock_worker.MAX_CONCURRENT_TASKS = 5 # Can handle 5 items
# Queue size (3) <= running_workers (1) * MAX_CONCURRENT_TASKS (5)
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3))
class TestOrchestratorWithProcess(TestCase):
"""Test Orchestrator using Process model for tracking."""
def setUp(self):
"""Reset process cache."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_is_running_no_orchestrator(self):
"""is_running should return False when no orchestrator process exists."""
from archivebox.machine.models import Process
# Clean up any stale processes first
Process.cleanup_stale_running()
# Mark any running orchestrators as exited for clean test state
Process.objects.filter(
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING
).update(status=Process.StatusChoices.EXITED)
self.assertFalse(Orchestrator.is_running())
def test_is_running_with_orchestrator_process(self):
"""is_running should return True when orchestrator Process exists."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create an orchestrator Process record
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(), # Use current PID so it appears alive
started_at=timezone.now(),
cmd=['archivebox', 'manage', 'orchestrator'],
)
try:
# Should detect running orchestrator
self.assertTrue(Orchestrator.is_running())
finally:
# Clean up
proc.status = Process.StatusChoices.EXITED
proc.save()
def test_orchestrator_uses_process_for_is_running(self):
"""Orchestrator.is_running should use Process.get_running_count."""
from archivebox.machine.models import Process
# Verify is_running uses Process model, not pid files
with patch.object(Process, 'get_running_count') as mock_count:
mock_count.return_value = 1
result = Orchestrator.is_running()
# Should have called Process.get_running_count with orchestrator type
mock_count.assert_called()
self.assertTrue(result)
class TestProcessBasedWorkerTracking(TestCase):
"""Test Process model methods that replace pid_utils functionality."""
def setUp(self):
"""Reset caches."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_process_current_creates_record(self):
"""Process.current() should create a Process record for current PID."""
from archivebox.machine.models import Process
proc = Process.current()
self.assertIsNotNone(proc)
self.assertEqual(proc.pid, os.getpid())
self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
self.assertIsNotNone(proc.machine)
self.assertIsNotNone(proc.started_at)
def test_process_current_caches_result(self):
"""Process.current() should return cached Process within interval."""
from archivebox.machine.models import Process
proc1 = Process.current()
proc2 = Process.current()
self.assertEqual(proc1.id, proc2.id)
def test_process_get_running_count(self):
"""Process.get_running_count should count running processes by type."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create some worker processes
for i in range(3):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99990 + i, # Fake PIDs
started_at=timezone.now(),
)
count = Process.get_running_count(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(count, 3)
def test_process_get_next_worker_id(self):
"""Process.get_next_worker_id should return count of running workers."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create 2 worker processes
for i in range(2):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99980 + i,
started_at=timezone.now(),
)
next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(next_id, 2)
def test_process_cleanup_stale_running(self):
"""Process.cleanup_stale_running should mark stale processes as exited."""
from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW
machine = Machine.current()
# Create a stale process (old started_at, fake PID)
stale_proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=999999, # Fake PID that doesn't exist
started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
)
cleaned = Process.cleanup_stale_running()
self.assertGreaterEqual(cleaned, 1)
stale_proc.refresh_from_db()
self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED)
def test_process_get_running(self):
"""Process.get_running should return queryset of running processes."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create a running process
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99970,
started_at=timezone.now(),
)
running = Process.get_running(process_type=Process.TypeChoices.HOOK)
self.assertIn(proc, running)
def test_process_type_detection(self):
"""Process._detect_process_type should detect process type from argv."""
from archivebox.machine.models import Process
# Test detection logic
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.CLI)
with patch('sys.argv', ['supervisord', '-c', 'config.ini']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.SUPERVISORD)
class TestProcessLifecycle(TestCase):
"""Test Process model lifecycle methods."""
def setUp(self):
"""Reset caches and create a machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
self.machine = models.Machine.current()
def test_process_is_running_property(self):
"""Process.is_running should check actual OS process."""
from archivebox.machine.models import Process
# Create a process with current PID (should be running)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(),
started_at=timezone.now(),
)
# Should be running (current process exists)
self.assertTrue(proc.is_running)
# Create a process with fake PID
fake_proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
# Should not be running (PID doesn't exist)
self.assertFalse(fake_proc.is_running)
def test_process_poll(self):
"""Process.poll should check and update exit status."""
from archivebox.machine.models import Process
# Create a process with fake PID (already exited)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
exit_code = proc.poll()
# Should have detected exit and updated status
self.assertIsNotNone(exit_code)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_terminate_already_dead(self):
"""Process.terminate should handle already-dead processes."""
from archivebox.machine.models import Process
# Create a process with fake PID
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
result = proc.terminate()
# Should return False (was already dead)
self.assertFalse(result)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_tree_traversal(self):
"""Process parent/children relationships should work."""
from archivebox.machine.models import Process
# Create parent process
parent = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
pid=1,
started_at=timezone.now(),
)
# Create child process
child = Process.objects.create(
machine=self.machine,
parent=parent,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=2,
started_at=timezone.now(),
)
# Test relationships
self.assertEqual(child.parent, parent)
self.assertIn(child, parent.children.all())
self.assertEqual(child.root, parent)
self.assertEqual(child.depth, 1)
self.assertEqual(parent.depth, 0)
if __name__ == '__main__':
pytest.main([__file__, '-v'])