mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-06 07:47:53 +10:00
Add tests for accessibility, parse_dom_outlinks, and consolelog plugins (#1749)
This commit is contained in:
1
archivebox/plugins/accessibility/tests/__init__.py
Normal file
1
archivebox/plugins/accessibility/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the accessibility plugin."""
|
||||
121
archivebox/plugins/accessibility/tests/test_accessibility.py
Normal file
121
archivebox/plugins/accessibility/tests/test_accessibility.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""
|
||||
Tests for the accessibility plugin.
|
||||
|
||||
Tests the real accessibility hook with an actual URL to verify
|
||||
accessibility tree and page outline extraction.
|
||||
"""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
|
||||
# Import chrome test helpers
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
|
||||
from chrome_test_helpers import (
|
||||
chrome_session,
|
||||
get_test_env,
|
||||
get_plugin_dir,
|
||||
get_hook_script,
|
||||
)
|
||||
|
||||
|
||||
def chrome_available() -> bool:
|
||||
"""Check if Chrome/Chromium is available."""
|
||||
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
|
||||
if shutil.which(name):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# Get the path to the accessibility hook
|
||||
PLUGIN_DIR = get_plugin_dir(__file__)
|
||||
ACCESSIBILITY_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_accessibility.*')
|
||||
|
||||
|
||||
class TestAccessibilityPlugin(TestCase):
|
||||
"""Test the accessibility plugin."""
|
||||
|
||||
def test_accessibility_hook_exists(self):
|
||||
"""Accessibility hook script should exist."""
|
||||
self.assertIsNotNone(ACCESSIBILITY_HOOK, "Accessibility hook not found in plugin directory")
|
||||
self.assertTrue(ACCESSIBILITY_HOOK.exists(), f"Hook not found: {ACCESSIBILITY_HOOK}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
|
||||
class TestAccessibilityWithChrome(TestCase):
|
||||
"""Integration tests for accessibility plugin with Chrome."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = Path(tempfile.mkdtemp())
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up."""
|
||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||
|
||||
def test_accessibility_extracts_page_outline(self):
|
||||
"""Accessibility hook should extract headings and accessibility tree."""
|
||||
test_url = 'https://example.com'
|
||||
snapshot_id = 'test-accessibility-snapshot'
|
||||
|
||||
try:
|
||||
with chrome_session(
|
||||
self.temp_dir,
|
||||
crawl_id='test-accessibility-crawl',
|
||||
snapshot_id=snapshot_id,
|
||||
test_url=test_url,
|
||||
navigate=True,
|
||||
timeout=30,
|
||||
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
|
||||
# Get environment and run the accessibility hook
|
||||
env = get_test_env()
|
||||
env['CHROME_HEADLESS'] = 'true'
|
||||
|
||||
# Run accessibility hook with the active Chrome session
|
||||
result = subprocess.run(
|
||||
['node', str(ACCESSIBILITY_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
|
||||
cwd=str(snapshot_chrome_dir),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Check for output file
|
||||
accessibility_output = snapshot_chrome_dir / 'accessibility.json'
|
||||
|
||||
accessibility_data = None
|
||||
|
||||
# Try parsing from file first
|
||||
if accessibility_output.exists():
|
||||
with open(accessibility_output) as f:
|
||||
try:
|
||||
accessibility_data = json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Verify hook ran successfully
|
||||
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||
self.assertNotIn('Traceback', result.stderr)
|
||||
|
||||
# example.com has headings, so we should get accessibility data
|
||||
self.assertIsNotNone(accessibility_data, "No accessibility data was generated")
|
||||
|
||||
# Verify we got page outline data
|
||||
self.assertIn('headings', accessibility_data, f"Missing headings: {accessibility_data}")
|
||||
self.assertIn('url', accessibility_data, f"Missing url: {accessibility_data}")
|
||||
|
||||
except RuntimeError as e:
|
||||
if 'Chrome' in str(e) or 'CDP' in str(e):
|
||||
self.skipTest(f"Chrome session setup failed: {e}")
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
1
archivebox/plugins/consolelog/tests/__init__.py
Normal file
1
archivebox/plugins/consolelog/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the consolelog plugin."""
|
||||
123
archivebox/plugins/consolelog/tests/test_consolelog.py
Normal file
123
archivebox/plugins/consolelog/tests/test_consolelog.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""
|
||||
Tests for the consolelog plugin.
|
||||
|
||||
Tests the real consolelog hook with an actual URL to verify
|
||||
console output capture.
|
||||
"""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
|
||||
# Import chrome test helpers
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
|
||||
from chrome_test_helpers import (
|
||||
chrome_session,
|
||||
get_test_env,
|
||||
get_plugin_dir,
|
||||
get_hook_script,
|
||||
)
|
||||
|
||||
|
||||
def chrome_available() -> bool:
|
||||
"""Check if Chrome/Chromium is available."""
|
||||
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
|
||||
if shutil.which(name):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# Get the path to the consolelog hook
|
||||
PLUGIN_DIR = get_plugin_dir(__file__)
|
||||
CONSOLELOG_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_consolelog.*')
|
||||
|
||||
|
||||
class TestConsolelogPlugin(TestCase):
|
||||
"""Test the consolelog plugin."""
|
||||
|
||||
def test_consolelog_hook_exists(self):
|
||||
"""Consolelog hook script should exist."""
|
||||
self.assertIsNotNone(CONSOLELOG_HOOK, "Consolelog hook not found in plugin directory")
|
||||
self.assertTrue(CONSOLELOG_HOOK.exists(), f"Hook not found: {CONSOLELOG_HOOK}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
|
||||
class TestConsolelogWithChrome(TestCase):
|
||||
"""Integration tests for consolelog plugin with Chrome."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = Path(tempfile.mkdtemp())
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up."""
|
||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||
|
||||
def test_consolelog_captures_output(self):
|
||||
"""Consolelog hook should capture console output from page."""
|
||||
test_url = 'https://example.com'
|
||||
snapshot_id = 'test-consolelog-snapshot'
|
||||
|
||||
try:
|
||||
with chrome_session(
|
||||
self.temp_dir,
|
||||
crawl_id='test-consolelog-crawl',
|
||||
snapshot_id=snapshot_id,
|
||||
test_url=test_url,
|
||||
navigate=True,
|
||||
timeout=30,
|
||||
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
|
||||
# Get environment and run the consolelog hook
|
||||
env = get_test_env()
|
||||
env['CHROME_HEADLESS'] = 'true'
|
||||
|
||||
# Run consolelog hook with the active Chrome session
|
||||
result = subprocess.run(
|
||||
['node', str(CONSOLELOG_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
|
||||
cwd=str(snapshot_chrome_dir),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120, # Longer timeout as it waits for navigation
|
||||
env=env
|
||||
)
|
||||
|
||||
# Check for output file
|
||||
console_output = snapshot_chrome_dir / 'console.jsonl'
|
||||
|
||||
# Verify hook ran (may succeed or timeout waiting for navigation)
|
||||
# The hook is designed to wait for page_loaded.txt from chrome_navigate
|
||||
# In test mode, that file may not exist, so hook may timeout
|
||||
# But it should still create the console.jsonl file
|
||||
|
||||
# At minimum, verify no crash
|
||||
self.assertNotIn('Traceback', result.stderr)
|
||||
|
||||
# If output file exists, verify it's valid JSONL
|
||||
if console_output.exists():
|
||||
with open(console_output) as f:
|
||||
content = f.read().strip()
|
||||
if content:
|
||||
for line in content.split('\n'):
|
||||
if line.strip():
|
||||
try:
|
||||
record = json.loads(line)
|
||||
# Verify structure
|
||||
self.assertIn('timestamp', record)
|
||||
self.assertIn('type', record)
|
||||
except json.JSONDecodeError:
|
||||
pass # Some lines may be incomplete
|
||||
|
||||
except RuntimeError as e:
|
||||
if 'Chrome' in str(e) or 'CDP' in str(e):
|
||||
self.skipTest(f"Chrome session setup failed: {e}")
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
1
archivebox/plugins/custom/tests/__init__.py
Normal file
1
archivebox/plugins/custom/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the custom binary provider plugin."""
|
||||
149
archivebox/plugins/custom/tests/test_custom_provider.py
Normal file
149
archivebox/plugins/custom/tests/test_custom_provider.py
Normal file
@@ -0,0 +1,149 @@
|
||||
"""
|
||||
Tests for the custom binary provider plugin.
|
||||
|
||||
Tests the custom bash binary installer with safe commands.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
|
||||
|
||||
# Get the path to the custom provider hook
|
||||
PLUGIN_DIR = Path(__file__).parent.parent
|
||||
INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_custom_bash.py'
|
||||
|
||||
|
||||
class TestCustomProviderHook(TestCase):
|
||||
"""Test the custom binary provider hook."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up."""
|
||||
import shutil
|
||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||
|
||||
def test_hook_script_exists(self):
|
||||
"""Hook script should exist."""
|
||||
self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}")
|
||||
|
||||
def test_hook_skips_when_custom_not_allowed(self):
|
||||
"""Hook should skip when custom not in allowed binproviders."""
|
||||
env = os.environ.copy()
|
||||
env['DATA_DIR'] = self.temp_dir
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(INSTALL_HOOK),
|
||||
'--name=echo',
|
||||
'--binary-id=test-uuid',
|
||||
'--machine-id=test-machine',
|
||||
'--binproviders=pip,apt', # custom not allowed
|
||||
'--custom-cmd=echo hello',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Should exit cleanly (code 0) when custom not allowed
|
||||
self.assertEqual(result.returncode, 0)
|
||||
self.assertIn('custom provider not allowed', result.stderr)
|
||||
|
||||
def test_hook_runs_custom_command_and_finds_binary(self):
|
||||
"""Hook should run custom command and find the binary in PATH."""
|
||||
env = os.environ.copy()
|
||||
env['DATA_DIR'] = self.temp_dir
|
||||
|
||||
# Use a simple echo command that doesn't actually install anything
|
||||
# Then check for 'echo' which is already in PATH
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(INSTALL_HOOK),
|
||||
'--name=echo',
|
||||
'--binary-id=test-uuid',
|
||||
'--machine-id=test-machine',
|
||||
'--custom-cmd=echo "custom install simulation"',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Should succeed since echo is in PATH
|
||||
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||
|
||||
# Parse JSONL output
|
||||
for line in result.stdout.split('\n'):
|
||||
line = line.strip()
|
||||
if line.startswith('{'):
|
||||
try:
|
||||
record = json.loads(line)
|
||||
if record.get('type') == 'Binary' and record.get('name') == 'echo':
|
||||
self.assertEqual(record['binprovider'], 'custom')
|
||||
self.assertTrue(record['abspath'])
|
||||
return
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
self.fail("No Binary JSONL record found in output")
|
||||
|
||||
def test_hook_fails_for_missing_binary_after_command(self):
|
||||
"""Hook should fail if binary not found after running custom command."""
|
||||
env = os.environ.copy()
|
||||
env['DATA_DIR'] = self.temp_dir
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(INSTALL_HOOK),
|
||||
'--name=nonexistent_binary_xyz123',
|
||||
'--binary-id=test-uuid',
|
||||
'--machine-id=test-machine',
|
||||
'--custom-cmd=echo "failed install"', # Doesn't actually install
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Should fail since binary not found after command
|
||||
self.assertEqual(result.returncode, 1)
|
||||
self.assertIn('not found', result.stderr.lower())
|
||||
|
||||
def test_hook_fails_for_failing_command(self):
|
||||
"""Hook should fail if custom command returns non-zero exit code."""
|
||||
env = os.environ.copy()
|
||||
env['DATA_DIR'] = self.temp_dir
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(INSTALL_HOOK),
|
||||
'--name=echo',
|
||||
'--binary-id=test-uuid',
|
||||
'--machine-id=test-machine',
|
||||
'--custom-cmd=exit 1', # Command that fails
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Should fail with exit code 1
|
||||
self.assertEqual(result.returncode, 1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
1
archivebox/plugins/env/tests/__init__.py
vendored
Normal file
1
archivebox/plugins/env/tests/__init__.py
vendored
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the env binary provider plugin."""
|
||||
159
archivebox/plugins/env/tests/test_env_provider.py
vendored
Normal file
159
archivebox/plugins/env/tests/test_env_provider.py
vendored
Normal file
@@ -0,0 +1,159 @@
|
||||
"""
|
||||
Tests for the env binary provider plugin.
|
||||
|
||||
Tests the real env provider hook with actual system binaries.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
|
||||
|
||||
# Get the path to the env provider hook
|
||||
PLUGIN_DIR = Path(__file__).parent.parent
|
||||
INSTALL_HOOK = PLUGIN_DIR / 'on_Binary__install_using_env_provider.py'
|
||||
|
||||
|
||||
class TestEnvProviderHook(TestCase):
|
||||
"""Test the env binary provider hook."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up."""
|
||||
import shutil
|
||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||
|
||||
def test_hook_script_exists(self):
|
||||
"""Hook script should exist."""
|
||||
self.assertTrue(INSTALL_HOOK.exists(), f"Hook not found: {INSTALL_HOOK}")
|
||||
|
||||
def test_hook_finds_python(self):
|
||||
"""Hook should find python3 binary in PATH."""
|
||||
env = os.environ.copy()
|
||||
env['DATA_DIR'] = self.temp_dir
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(INSTALL_HOOK),
|
||||
'--name=python3',
|
||||
'--binary-id=test-uuid',
|
||||
'--machine-id=test-machine',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Should succeed and output JSONL
|
||||
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||
|
||||
# Parse JSONL output
|
||||
for line in result.stdout.split('\n'):
|
||||
line = line.strip()
|
||||
if line.startswith('{'):
|
||||
try:
|
||||
record = json.loads(line)
|
||||
if record.get('type') == 'Binary' and record.get('name') == 'python3':
|
||||
self.assertEqual(record['binprovider'], 'env')
|
||||
self.assertTrue(record['abspath'])
|
||||
self.assertTrue(Path(record['abspath']).exists())
|
||||
return
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
self.fail("No Binary JSONL record found in output")
|
||||
|
||||
def test_hook_finds_bash(self):
|
||||
"""Hook should find bash binary in PATH."""
|
||||
env = os.environ.copy()
|
||||
env['DATA_DIR'] = self.temp_dir
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(INSTALL_HOOK),
|
||||
'--name=bash',
|
||||
'--binary-id=test-uuid',
|
||||
'--machine-id=test-machine',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Should succeed and output JSONL
|
||||
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||
|
||||
# Parse JSONL output
|
||||
for line in result.stdout.split('\n'):
|
||||
line = line.strip()
|
||||
if line.startswith('{'):
|
||||
try:
|
||||
record = json.loads(line)
|
||||
if record.get('type') == 'Binary' and record.get('name') == 'bash':
|
||||
self.assertEqual(record['binprovider'], 'env')
|
||||
self.assertTrue(record['abspath'])
|
||||
return
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
self.fail("No Binary JSONL record found in output")
|
||||
|
||||
def test_hook_fails_for_missing_binary(self):
|
||||
"""Hook should fail for binary not in PATH."""
|
||||
env = os.environ.copy()
|
||||
env['DATA_DIR'] = self.temp_dir
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(INSTALL_HOOK),
|
||||
'--name=nonexistent_binary_xyz123',
|
||||
'--binary-id=test-uuid',
|
||||
'--machine-id=test-machine',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Should fail with exit code 1
|
||||
self.assertEqual(result.returncode, 1)
|
||||
self.assertIn('not found', result.stderr.lower())
|
||||
|
||||
def test_hook_skips_when_env_not_allowed(self):
|
||||
"""Hook should skip when env not in allowed binproviders."""
|
||||
env = os.environ.copy()
|
||||
env['DATA_DIR'] = self.temp_dir
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(INSTALL_HOOK),
|
||||
'--name=python3',
|
||||
'--binary-id=test-uuid',
|
||||
'--machine-id=test-machine',
|
||||
'--binproviders=pip,apt', # env not allowed
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Should exit cleanly (code 0) when env not allowed
|
||||
self.assertEqual(result.returncode, 0)
|
||||
self.assertIn('env provider not allowed', result.stderr)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
1
archivebox/plugins/merkletree/tests/__init__.py
Normal file
1
archivebox/plugins/merkletree/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the merkletree plugin."""
|
||||
157
archivebox/plugins/merkletree/tests/test_merkletree.py
Normal file
157
archivebox/plugins/merkletree/tests/test_merkletree.py
Normal file
@@ -0,0 +1,157 @@
|
||||
"""
|
||||
Tests for the merkletree plugin.
|
||||
|
||||
Tests the real merkle tree generation with actual files.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
|
||||
|
||||
# Get the path to the merkletree hook
|
||||
PLUGIN_DIR = Path(__file__).parent.parent
|
||||
MERKLETREE_HOOK = PLUGIN_DIR / 'on_Snapshot__93_merkletree.py'
|
||||
|
||||
|
||||
class TestMerkletreePlugin(TestCase):
|
||||
"""Test the merkletree plugin."""
|
||||
|
||||
def test_merkletree_hook_exists(self):
|
||||
"""Merkletree hook script should exist."""
|
||||
self.assertTrue(MERKLETREE_HOOK.exists(), f"Hook not found: {MERKLETREE_HOOK}")
|
||||
|
||||
def test_merkletree_generates_tree_for_files(self):
|
||||
"""Merkletree hook should generate merkle tree for files in snapshot directory."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# Create a mock snapshot directory structure
|
||||
snapshot_dir = Path(temp_dir) / 'snapshot'
|
||||
snapshot_dir.mkdir()
|
||||
|
||||
# Create output directory for merkletree
|
||||
output_dir = snapshot_dir / 'merkletree'
|
||||
output_dir.mkdir()
|
||||
|
||||
# Create some test files
|
||||
(snapshot_dir / 'index.html').write_text('<html><body>Test</body></html>')
|
||||
(snapshot_dir / 'screenshot.png').write_bytes(b'\x89PNG\r\n\x1a\n' + b'\x00' * 100)
|
||||
|
||||
subdir = snapshot_dir / 'media'
|
||||
subdir.mkdir()
|
||||
(subdir / 'video.mp4').write_bytes(b'\x00\x00\x00\x18ftypmp42')
|
||||
|
||||
# Run the hook from the output directory
|
||||
env = os.environ.copy()
|
||||
env['MERKLETREE_ENABLED'] = 'true'
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(MERKLETREE_HOOK),
|
||||
'--url=https://example.com',
|
||||
'--snapshot-id=test-snapshot',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=str(output_dir), # Hook expects to run from output dir
|
||||
env=env,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
# Should succeed
|
||||
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||
|
||||
# Check output file exists
|
||||
output_file = output_dir / 'merkletree.json'
|
||||
self.assertTrue(output_file.exists(), "merkletree.json not created")
|
||||
|
||||
# Parse and verify output
|
||||
with open(output_file) as f:
|
||||
data = json.load(f)
|
||||
|
||||
self.assertIn('root_hash', data)
|
||||
self.assertIn('files', data)
|
||||
self.assertIn('metadata', data)
|
||||
|
||||
# Should have indexed our test files
|
||||
file_paths = [f['path'] for f in data['files']]
|
||||
self.assertIn('index.html', file_paths)
|
||||
self.assertIn('screenshot.png', file_paths)
|
||||
|
||||
# Verify metadata
|
||||
self.assertGreater(data['metadata']['file_count'], 0)
|
||||
self.assertGreater(data['metadata']['total_size'], 0)
|
||||
|
||||
def test_merkletree_skips_when_disabled(self):
|
||||
"""Merkletree hook should skip when MERKLETREE_ENABLED=false."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
snapshot_dir = Path(temp_dir) / 'snapshot'
|
||||
snapshot_dir.mkdir()
|
||||
output_dir = snapshot_dir / 'merkletree'
|
||||
output_dir.mkdir()
|
||||
|
||||
env = os.environ.copy()
|
||||
env['MERKLETREE_ENABLED'] = 'false'
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(MERKLETREE_HOOK),
|
||||
'--url=https://example.com',
|
||||
'--snapshot-id=test-snapshot',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=str(output_dir),
|
||||
env=env,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
# Should succeed (exit 0) but skip
|
||||
self.assertEqual(result.returncode, 0)
|
||||
self.assertIn('skipped', result.stdout)
|
||||
|
||||
def test_merkletree_handles_empty_directory(self):
|
||||
"""Merkletree hook should handle empty snapshot directory."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
snapshot_dir = Path(temp_dir) / 'snapshot'
|
||||
snapshot_dir.mkdir()
|
||||
output_dir = snapshot_dir / 'merkletree'
|
||||
output_dir.mkdir()
|
||||
|
||||
env = os.environ.copy()
|
||||
env['MERKLETREE_ENABLED'] = 'true'
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable, str(MERKLETREE_HOOK),
|
||||
'--url=https://example.com',
|
||||
'--snapshot-id=test-snapshot',
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=str(output_dir),
|
||||
env=env,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
# Should succeed even with empty directory
|
||||
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||
|
||||
# Check output file exists
|
||||
output_file = output_dir / 'merkletree.json'
|
||||
self.assertTrue(output_file.exists())
|
||||
|
||||
with open(output_file) as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Should have empty file list
|
||||
self.assertEqual(data['metadata']['file_count'], 0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
1
archivebox/plugins/parse_dom_outlinks/tests/__init__.py
Normal file
1
archivebox/plugins/parse_dom_outlinks/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the parse_dom_outlinks plugin."""
|
||||
@@ -0,0 +1,123 @@
|
||||
"""
|
||||
Tests for the parse_dom_outlinks plugin.
|
||||
|
||||
Tests the real DOM outlinks hook with an actual URL to verify
|
||||
link extraction and categorization.
|
||||
"""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
|
||||
# Import chrome test helpers
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
|
||||
from chrome_test_helpers import (
|
||||
chrome_session,
|
||||
get_test_env,
|
||||
get_plugin_dir,
|
||||
get_hook_script,
|
||||
)
|
||||
|
||||
|
||||
def chrome_available() -> bool:
|
||||
"""Check if Chrome/Chromium is available."""
|
||||
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
|
||||
if shutil.which(name):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# Get the path to the parse_dom_outlinks hook
|
||||
PLUGIN_DIR = get_plugin_dir(__file__)
|
||||
OUTLINKS_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_parse_dom_outlinks.*')
|
||||
|
||||
|
||||
class TestParseDomOutlinksPlugin(TestCase):
|
||||
"""Test the parse_dom_outlinks plugin."""
|
||||
|
||||
def test_outlinks_hook_exists(self):
|
||||
"""DOM outlinks hook script should exist."""
|
||||
self.assertIsNotNone(OUTLINKS_HOOK, "DOM outlinks hook not found in plugin directory")
|
||||
self.assertTrue(OUTLINKS_HOOK.exists(), f"Hook not found: {OUTLINKS_HOOK}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
|
||||
class TestParseDomOutlinksWithChrome(TestCase):
|
||||
"""Integration tests for parse_dom_outlinks plugin with Chrome."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = Path(tempfile.mkdtemp())
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up."""
|
||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||
|
||||
def test_outlinks_extracts_links_from_page(self):
|
||||
"""DOM outlinks hook should extract and categorize links from page."""
|
||||
test_url = 'https://example.com'
|
||||
snapshot_id = 'test-outlinks-snapshot'
|
||||
|
||||
try:
|
||||
with chrome_session(
|
||||
self.temp_dir,
|
||||
crawl_id='test-outlinks-crawl',
|
||||
snapshot_id=snapshot_id,
|
||||
test_url=test_url,
|
||||
navigate=True,
|
||||
timeout=30,
|
||||
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
|
||||
# Get environment and run the outlinks hook
|
||||
env = get_test_env()
|
||||
env['CHROME_HEADLESS'] = 'true'
|
||||
|
||||
# Run outlinks hook with the active Chrome session
|
||||
result = subprocess.run(
|
||||
['node', str(OUTLINKS_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
|
||||
cwd=str(snapshot_chrome_dir),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
env=env
|
||||
)
|
||||
|
||||
# Check for output file
|
||||
outlinks_output = snapshot_chrome_dir / 'outlinks.json'
|
||||
|
||||
outlinks_data = None
|
||||
json_error = None
|
||||
|
||||
# Try parsing from file first
|
||||
if outlinks_output.exists():
|
||||
with open(outlinks_output) as f:
|
||||
try:
|
||||
outlinks_data = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
json_error = str(e)
|
||||
|
||||
# Verify hook ran successfully
|
||||
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||
self.assertNotIn('Traceback', result.stderr)
|
||||
|
||||
# Verify we got outlinks data with expected categories
|
||||
self.assertIsNotNone(outlinks_data, f"No outlinks data found - file missing or invalid JSON: {json_error}")
|
||||
|
||||
self.assertIn('url', outlinks_data, f"Missing url: {outlinks_data}")
|
||||
self.assertIn('hrefs', outlinks_data, f"Missing hrefs: {outlinks_data}")
|
||||
# example.com has at least one link (to iana.org)
|
||||
self.assertIsInstance(outlinks_data['hrefs'], list)
|
||||
|
||||
except RuntimeError as e:
|
||||
if 'Chrome' in str(e) or 'CDP' in str(e):
|
||||
self.skipTest(f"Chrome session setup failed: {e}")
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
1
archivebox/plugins/responses/tests/__init__.py
Normal file
1
archivebox/plugins/responses/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the responses plugin."""
|
||||
118
archivebox/plugins/responses/tests/test_responses.py
Normal file
118
archivebox/plugins/responses/tests/test_responses.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""
|
||||
Tests for the responses plugin.
|
||||
|
||||
Tests the real responses hook with an actual URL to verify
|
||||
network response capture.
|
||||
"""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
|
||||
# Import chrome test helpers
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
|
||||
from chrome_test_helpers import (
|
||||
chrome_session,
|
||||
get_test_env,
|
||||
get_plugin_dir,
|
||||
get_hook_script,
|
||||
)
|
||||
|
||||
|
||||
def chrome_available() -> bool:
|
||||
"""Check if Chrome/Chromium is available."""
|
||||
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
|
||||
if shutil.which(name):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# Get the path to the responses hook
|
||||
PLUGIN_DIR = get_plugin_dir(__file__)
|
||||
RESPONSES_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_responses.*')
|
||||
|
||||
|
||||
class TestResponsesPlugin(TestCase):
|
||||
"""Test the responses plugin."""
|
||||
|
||||
def test_responses_hook_exists(self):
|
||||
"""Responses hook script should exist."""
|
||||
self.assertIsNotNone(RESPONSES_HOOK, "Responses hook not found in plugin directory")
|
||||
self.assertTrue(RESPONSES_HOOK.exists(), f"Hook not found: {RESPONSES_HOOK}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
|
||||
class TestResponsesWithChrome(TestCase):
|
||||
"""Integration tests for responses plugin with Chrome."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = Path(tempfile.mkdtemp())
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up."""
|
||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||
|
||||
def test_responses_captures_network_responses(self):
|
||||
"""Responses hook should capture network responses from page load."""
|
||||
test_url = 'https://example.com'
|
||||
snapshot_id = 'test-responses-snapshot'
|
||||
|
||||
try:
|
||||
with chrome_session(
|
||||
self.temp_dir,
|
||||
crawl_id='test-responses-crawl',
|
||||
snapshot_id=snapshot_id,
|
||||
test_url=test_url,
|
||||
navigate=True,
|
||||
timeout=30,
|
||||
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
|
||||
# Get environment and run the responses hook
|
||||
env = get_test_env()
|
||||
env['CHROME_HEADLESS'] = 'true'
|
||||
|
||||
# Run responses hook with the active Chrome session
|
||||
result = subprocess.run(
|
||||
['node', str(RESPONSES_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
|
||||
cwd=str(snapshot_chrome_dir),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120, # Longer timeout as it waits for navigation
|
||||
env=env
|
||||
)
|
||||
|
||||
# Check for output directory and index file
|
||||
index_output = snapshot_chrome_dir / 'index.jsonl'
|
||||
|
||||
# Verify hook ran (may timeout waiting for page_loaded.txt in test mode)
|
||||
self.assertNotIn('Traceback', result.stderr)
|
||||
|
||||
# If index file exists, verify it's valid JSONL
|
||||
if index_output.exists():
|
||||
with open(index_output) as f:
|
||||
content = f.read().strip()
|
||||
if content:
|
||||
for line in content.split('\n'):
|
||||
if line.strip():
|
||||
try:
|
||||
record = json.loads(line)
|
||||
# Verify structure
|
||||
self.assertIn('url', record)
|
||||
self.assertIn('resourceType', record)
|
||||
except json.JSONDecodeError:
|
||||
pass # Some lines may be incomplete
|
||||
|
||||
except RuntimeError as e:
|
||||
if 'Chrome' in str(e) or 'CDP' in str(e):
|
||||
self.skipTest(f"Chrome session setup failed: {e}")
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
1
archivebox/plugins/staticfile/tests/__init__.py
Normal file
1
archivebox/plugins/staticfile/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for the staticfile plugin."""
|
||||
114
archivebox/plugins/staticfile/tests/test_staticfile.py
Normal file
114
archivebox/plugins/staticfile/tests/test_staticfile.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""
|
||||
Tests for the staticfile plugin.
|
||||
|
||||
Tests the real staticfile hook with actual URLs to verify
|
||||
static file detection and download.
|
||||
"""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
|
||||
# Import chrome test helpers
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
|
||||
from chrome_test_helpers import (
|
||||
chrome_session,
|
||||
get_test_env,
|
||||
get_plugin_dir,
|
||||
get_hook_script,
|
||||
)
|
||||
|
||||
|
||||
def chrome_available() -> bool:
|
||||
"""Check if Chrome/Chromium is available."""
|
||||
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
|
||||
if shutil.which(name):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# Get the path to the staticfile hook
|
||||
PLUGIN_DIR = get_plugin_dir(__file__)
|
||||
STATICFILE_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_staticfile.*')
|
||||
|
||||
|
||||
class TestStaticfilePlugin(TestCase):
|
||||
"""Test the staticfile plugin."""
|
||||
|
||||
def test_staticfile_hook_exists(self):
|
||||
"""Staticfile hook script should exist."""
|
||||
self.assertIsNotNone(STATICFILE_HOOK, "Staticfile hook not found in plugin directory")
|
||||
self.assertTrue(STATICFILE_HOOK.exists(), f"Hook not found: {STATICFILE_HOOK}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
|
||||
class TestStaticfileWithChrome(TestCase):
|
||||
"""Integration tests for staticfile plugin with Chrome."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = Path(tempfile.mkdtemp())
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up."""
|
||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||
|
||||
def test_staticfile_skips_html_pages(self):
|
||||
"""Staticfile hook should skip HTML pages (not static files)."""
|
||||
test_url = 'https://example.com' # HTML page, not a static file
|
||||
snapshot_id = 'test-staticfile-snapshot'
|
||||
|
||||
try:
|
||||
with chrome_session(
|
||||
self.temp_dir,
|
||||
crawl_id='test-staticfile-crawl',
|
||||
snapshot_id=snapshot_id,
|
||||
test_url=test_url,
|
||||
navigate=True,
|
||||
timeout=30,
|
||||
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
|
||||
# Get environment and run the staticfile hook
|
||||
env = get_test_env()
|
||||
env['CHROME_HEADLESS'] = 'true'
|
||||
|
||||
# Run staticfile hook with the active Chrome session
|
||||
result = subprocess.run(
|
||||
['node', str(STATICFILE_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
|
||||
cwd=str(snapshot_chrome_dir),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120, # Longer timeout as it waits for navigation
|
||||
env=env
|
||||
)
|
||||
|
||||
# Verify hook ran without crash
|
||||
self.assertNotIn('Traceback', result.stderr)
|
||||
|
||||
# Parse JSONL output to verify it recognized HTML as non-static
|
||||
for line in result.stdout.split('\n'):
|
||||
line = line.strip()
|
||||
if line.startswith('{'):
|
||||
try:
|
||||
record = json.loads(line)
|
||||
if record.get('type') == 'ArchiveResult':
|
||||
# HTML pages should be skipped
|
||||
if record.get('status') == 'skipped':
|
||||
self.assertIn('Not a static file', record.get('output_str', ''))
|
||||
break
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
except RuntimeError as e:
|
||||
if 'Chrome' in str(e) or 'CDP' in str(e):
|
||||
self.skipTest(f"Chrome session setup failed: {e}")
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
Reference in New Issue
Block a user