mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-03 01:15:57 +10:00
537 lines
19 KiB
Python
Executable File
537 lines
19 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Unit tests for the ArchiveBox hook architecture.
|
|
|
|
Tests hook discovery, execution, JSONL parsing, background hook detection,
|
|
binary lookup, and install hook XYZ_BINARY env var handling.
|
|
|
|
Run with:
|
|
sudo -u testuser bash -c 'source .venv/bin/activate && python -m pytest archivebox/tests/test_hooks.py -v'
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
# Set up Django before importing any Django-dependent modules
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.settings')
|
|
|
|
|
|
class TestBackgroundHookDetection(unittest.TestCase):
|
|
"""Test that background hooks are detected by .bg. suffix."""
|
|
|
|
def test_bg_js_suffix_detected(self):
|
|
"""Hooks with .bg.js suffix should be detected as background."""
|
|
script = Path('/path/to/on_Snapshot__21_consolelog.bg.js')
|
|
is_background = '.bg.' in script.name or '__background' in script.stem
|
|
self.assertTrue(is_background)
|
|
|
|
def test_bg_py_suffix_detected(self):
|
|
"""Hooks with .bg.py suffix should be detected as background."""
|
|
script = Path('/path/to/on_Snapshot__24_responses.bg.py')
|
|
is_background = '.bg.' in script.name or '__background' in script.stem
|
|
self.assertTrue(is_background)
|
|
|
|
def test_bg_sh_suffix_detected(self):
|
|
"""Hooks with .bg.sh suffix should be detected as background."""
|
|
script = Path('/path/to/on_Snapshot__23_ssl.bg.sh')
|
|
is_background = '.bg.' in script.name or '__background' in script.stem
|
|
self.assertTrue(is_background)
|
|
|
|
def test_legacy_background_suffix_detected(self):
|
|
"""Hooks with __background in stem should be detected (backwards compat)."""
|
|
script = Path('/path/to/on_Snapshot__21_consolelog__background.js')
|
|
is_background = '.bg.' in script.name or '__background' in script.stem
|
|
self.assertTrue(is_background)
|
|
|
|
def test_foreground_hook_not_detected(self):
|
|
"""Hooks without .bg. or __background should NOT be detected as background."""
|
|
script = Path('/path/to/on_Snapshot__11_favicon.js')
|
|
is_background = '.bg.' in script.name or '__background' in script.stem
|
|
self.assertFalse(is_background)
|
|
|
|
def test_foreground_py_hook_not_detected(self):
|
|
"""Python hooks without .bg. should NOT be detected as background."""
|
|
script = Path('/path/to/on_Snapshot__50_wget.py')
|
|
is_background = '.bg.' in script.name or '__background' in script.stem
|
|
self.assertFalse(is_background)
|
|
|
|
|
|
class TestJSONLParsing(unittest.TestCase):
|
|
"""Test JSONL parsing in run_hook() output processing."""
|
|
|
|
def test_parse_clean_jsonl(self):
|
|
"""Clean JSONL format should be parsed correctly."""
|
|
stdout = '{"type": "ArchiveResult", "status": "succeeded", "output_str": "Done"}'
|
|
records = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line or not line.startswith('{'):
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
if 'type' in data:
|
|
records.append(data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0]['type'], 'ArchiveResult')
|
|
self.assertEqual(records[0]['status'], 'succeeded')
|
|
self.assertEqual(records[0]['output_str'], 'Done')
|
|
|
|
def test_parse_multiple_jsonl_records(self):
|
|
"""Multiple JSONL records should all be parsed."""
|
|
stdout = '''{"type": "ArchiveResult", "status": "succeeded", "output_str": "Done"}
|
|
{"type": "Binary", "name": "wget", "abspath": "/usr/bin/wget"}'''
|
|
records = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line or not line.startswith('{'):
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
if 'type' in data:
|
|
records.append(data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
self.assertEqual(len(records), 2)
|
|
self.assertEqual(records[0]['type'], 'ArchiveResult')
|
|
self.assertEqual(records[1]['type'], 'Binary')
|
|
|
|
def test_parse_jsonl_with_log_output(self):
|
|
"""JSONL should be extracted from mixed stdout with log lines."""
|
|
stdout = '''Starting hook execution...
|
|
Processing URL: https://example.com
|
|
{"type": "ArchiveResult", "status": "succeeded", "output_str": "Downloaded"}
|
|
Hook completed successfully'''
|
|
records = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line or not line.startswith('{'):
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
if 'type' in data:
|
|
records.append(data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0]['status'], 'succeeded')
|
|
|
|
def test_parse_legacy_result_json_format(self):
|
|
"""Legacy RESULT_JSON= format should be parsed for backwards compat."""
|
|
stdout = 'RESULT_JSON={"status": "succeeded", "output": "Done"}'
|
|
output_json = None
|
|
records = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if line.startswith('RESULT_JSON='):
|
|
try:
|
|
data = json.loads(line[len('RESULT_JSON='):])
|
|
if output_json is None:
|
|
output_json = data
|
|
data['type'] = 'ArchiveResult'
|
|
records.append(data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0]['type'], 'ArchiveResult')
|
|
self.assertEqual(records[0]['status'], 'succeeded')
|
|
|
|
def test_ignore_invalid_json(self):
|
|
"""Invalid JSON should be silently ignored."""
|
|
stdout = '''{"type": "ArchiveResult", "status": "succeeded"}
|
|
{invalid json here}
|
|
not json at all
|
|
{"type": "Binary", "name": "wget"}'''
|
|
records = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line or not line.startswith('{'):
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
if 'type' in data:
|
|
records.append(data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
self.assertEqual(len(records), 2)
|
|
|
|
def test_json_without_type_ignored(self):
|
|
"""JSON objects without 'type' field should be ignored."""
|
|
stdout = '''{"status": "succeeded", "output_str": "Done"}
|
|
{"type": "ArchiveResult", "status": "succeeded"}'''
|
|
records = []
|
|
for line in stdout.splitlines():
|
|
line = line.strip()
|
|
if not line or not line.startswith('{'):
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
if 'type' in data:
|
|
records.append(data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0]['type'], 'ArchiveResult')
|
|
|
|
|
|
class TestInstallHookEnvVarHandling(unittest.TestCase):
|
|
"""Test that install hooks respect XYZ_BINARY env vars."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
self.work_dir = Path(tempfile.mkdtemp())
|
|
self.test_hook = self.work_dir / 'test_hook.py'
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment."""
|
|
shutil.rmtree(self.work_dir, ignore_errors=True)
|
|
|
|
def test_binary_env_var_absolute_path_handling(self):
|
|
"""Install hooks should handle absolute paths in XYZ_BINARY."""
|
|
# Test the logic that install hooks use
|
|
configured_binary = '/custom/path/to/wget2'
|
|
if '/' in configured_binary:
|
|
bin_name = Path(configured_binary).name
|
|
else:
|
|
bin_name = configured_binary
|
|
|
|
self.assertEqual(bin_name, 'wget2')
|
|
|
|
def test_binary_env_var_name_only_handling(self):
|
|
"""Install hooks should handle binary names in XYZ_BINARY."""
|
|
# Test the logic that install hooks use
|
|
configured_binary = 'wget2'
|
|
if '/' in configured_binary:
|
|
bin_name = Path(configured_binary).name
|
|
else:
|
|
bin_name = configured_binary
|
|
|
|
self.assertEqual(bin_name, 'wget2')
|
|
|
|
def test_binary_env_var_empty_default(self):
|
|
"""Install hooks should use default when XYZ_BINARY is empty."""
|
|
configured_binary = ''
|
|
if configured_binary:
|
|
if '/' in configured_binary:
|
|
bin_name = Path(configured_binary).name
|
|
else:
|
|
bin_name = configured_binary
|
|
else:
|
|
bin_name = 'wget' # default
|
|
|
|
self.assertEqual(bin_name, 'wget')
|
|
|
|
|
|
class TestHookDiscovery(unittest.TestCase):
|
|
"""Test hook discovery functions."""
|
|
|
|
def setUp(self):
|
|
"""Set up test plugin directory."""
|
|
self.test_dir = Path(tempfile.mkdtemp())
|
|
self.plugins_dir = self.test_dir / 'plugins'
|
|
self.plugins_dir.mkdir()
|
|
|
|
# Create test plugin structure
|
|
wget_dir = self.plugins_dir / 'wget'
|
|
wget_dir.mkdir()
|
|
(wget_dir / 'on_Snapshot__50_wget.py').write_text('# test hook')
|
|
(wget_dir / 'on_Crawl__00_install_wget.py').write_text('# install hook')
|
|
|
|
chrome_dir = self.plugins_dir / 'chrome_session'
|
|
chrome_dir.mkdir()
|
|
(chrome_dir / 'on_Snapshot__20_chrome_session.bg.js').write_text('// background hook')
|
|
|
|
consolelog_dir = self.plugins_dir / 'consolelog'
|
|
consolelog_dir.mkdir()
|
|
(consolelog_dir / 'on_Snapshot__21_consolelog.bg.js').write_text('// background hook')
|
|
|
|
def tearDown(self):
|
|
"""Clean up test directory."""
|
|
shutil.rmtree(self.test_dir, ignore_errors=True)
|
|
|
|
def test_discover_hooks_by_event(self):
|
|
"""discover_hooks() should find all hooks for an event."""
|
|
# Use the local implementation since we can't easily mock BUILTIN_PLUGINS_DIR
|
|
hooks = []
|
|
for ext in ('sh', 'py', 'js'):
|
|
pattern = f'*/on_Snapshot__*.{ext}'
|
|
hooks.extend(self.plugins_dir.glob(pattern))
|
|
|
|
hooks = sorted(set(hooks), key=lambda p: p.name)
|
|
|
|
self.assertEqual(len(hooks), 3)
|
|
hook_names = [h.name for h in hooks]
|
|
self.assertIn('on_Snapshot__20_chrome_session.bg.js', hook_names)
|
|
self.assertIn('on_Snapshot__21_consolelog.bg.js', hook_names)
|
|
self.assertIn('on_Snapshot__50_wget.py', hook_names)
|
|
|
|
def test_discover_hooks_sorted_by_name(self):
|
|
"""Hooks should be sorted by filename (numeric prefix ordering)."""
|
|
hooks = []
|
|
for ext in ('sh', 'py', 'js'):
|
|
pattern = f'*/on_Snapshot__*.{ext}'
|
|
hooks.extend(self.plugins_dir.glob(pattern))
|
|
|
|
hooks = sorted(set(hooks), key=lambda p: p.name)
|
|
|
|
# Check numeric ordering
|
|
self.assertEqual(hooks[0].name, 'on_Snapshot__20_chrome_session.js')
|
|
self.assertEqual(hooks[1].name, 'on_Snapshot__21_consolelog.bg.js')
|
|
self.assertEqual(hooks[2].name, 'on_Snapshot__50_wget.py')
|
|
|
|
|
|
class TestGetExtractorName(unittest.TestCase):
|
|
"""Test get_extractor_name() function."""
|
|
|
|
def test_strip_numeric_prefix(self):
|
|
"""Numeric prefix should be stripped from extractor name."""
|
|
# Inline implementation of get_extractor_name
|
|
def get_extractor_name(extractor: str) -> str:
|
|
parts = extractor.split('_', 1)
|
|
if len(parts) == 2 and parts[0].isdigit():
|
|
return parts[1]
|
|
return extractor
|
|
|
|
self.assertEqual(get_extractor_name('10_title'), 'title')
|
|
self.assertEqual(get_extractor_name('26_readability'), 'readability')
|
|
self.assertEqual(get_extractor_name('50_parse_html_urls'), 'parse_html_urls')
|
|
|
|
def test_no_prefix_unchanged(self):
|
|
"""Extractor without numeric prefix should be unchanged."""
|
|
def get_extractor_name(extractor: str) -> str:
|
|
parts = extractor.split('_', 1)
|
|
if len(parts) == 2 and parts[0].isdigit():
|
|
return parts[1]
|
|
return extractor
|
|
|
|
self.assertEqual(get_extractor_name('title'), 'title')
|
|
self.assertEqual(get_extractor_name('readability'), 'readability')
|
|
|
|
|
|
class TestHookExecution(unittest.TestCase):
|
|
"""Test hook execution with real subprocesses."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
self.work_dir = Path(tempfile.mkdtemp())
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment."""
|
|
shutil.rmtree(self.work_dir, ignore_errors=True)
|
|
|
|
def test_python_hook_execution(self):
|
|
"""Python hook should execute and output JSONL."""
|
|
hook_path = self.work_dir / 'test_hook.py'
|
|
hook_path.write_text('''#!/usr/bin/env python3
|
|
import json
|
|
print(json.dumps({"type": "ArchiveResult", "status": "succeeded", "output_str": "Test passed"}))
|
|
''')
|
|
|
|
result = subprocess.run(
|
|
['python3', str(hook_path)],
|
|
cwd=str(self.work_dir),
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
self.assertEqual(result.returncode, 0)
|
|
output = json.loads(result.stdout.strip())
|
|
self.assertEqual(output['type'], 'ArchiveResult')
|
|
self.assertEqual(output['status'], 'succeeded')
|
|
|
|
def test_js_hook_execution(self):
|
|
"""JavaScript hook should execute and output JSONL."""
|
|
# Skip if node not available
|
|
if shutil.which('node') is None:
|
|
self.skipTest('Node.js not available')
|
|
|
|
hook_path = self.work_dir / 'test_hook.js'
|
|
hook_path.write_text('''#!/usr/bin/env node
|
|
console.log(JSON.stringify({type: 'ArchiveResult', status: 'succeeded', output_str: 'JS test'}));
|
|
''')
|
|
|
|
result = subprocess.run(
|
|
['node', str(hook_path)],
|
|
cwd=str(self.work_dir),
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
self.assertEqual(result.returncode, 0)
|
|
output = json.loads(result.stdout.strip())
|
|
self.assertEqual(output['type'], 'ArchiveResult')
|
|
self.assertEqual(output['status'], 'succeeded')
|
|
|
|
def test_hook_receives_cli_args(self):
|
|
"""Hook should receive CLI arguments."""
|
|
hook_path = self.work_dir / 'test_hook.py'
|
|
hook_path.write_text('''#!/usr/bin/env python3
|
|
import sys
|
|
import json
|
|
# Simple arg parsing
|
|
args = {}
|
|
for arg in sys.argv[1:]:
|
|
if arg.startswith('--') and '=' in arg:
|
|
key, val = arg[2:].split('=', 1)
|
|
args[key.replace('-', '_')] = val
|
|
print(json.dumps({"type": "ArchiveResult", "status": "succeeded", "url": args.get("url", "")}))
|
|
''')
|
|
|
|
result = subprocess.run(
|
|
['python3', str(hook_path), '--url=https://example.com'],
|
|
cwd=str(self.work_dir),
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
self.assertEqual(result.returncode, 0)
|
|
output = json.loads(result.stdout.strip())
|
|
self.assertEqual(output['url'], 'https://example.com')
|
|
|
|
|
|
class TestInstallHookOutput(unittest.TestCase):
|
|
"""Test install hook output format compliance."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
self.work_dir = Path(tempfile.mkdtemp())
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment."""
|
|
shutil.rmtree(self.work_dir, ignore_errors=True)
|
|
|
|
def test_install_hook_outputs_binary(self):
|
|
"""Install hook should output Binary JSONL when binary found."""
|
|
hook_output = json.dumps({
|
|
'type': 'Binary',
|
|
'name': 'wget',
|
|
'abspath': '/usr/bin/wget',
|
|
'version': '1.21.3',
|
|
'sha256': None,
|
|
'binprovider': 'apt',
|
|
})
|
|
|
|
data = json.loads(hook_output)
|
|
self.assertEqual(data['type'], 'Binary')
|
|
self.assertEqual(data['name'], 'wget')
|
|
self.assertTrue(data['abspath'].startswith('/'))
|
|
|
|
def test_install_hook_outputs_machine_config(self):
|
|
"""Install hook should output Machine config update JSONL."""
|
|
hook_output = json.dumps({
|
|
'type': 'Machine',
|
|
'_method': 'update',
|
|
'key': 'config/WGET_BINARY',
|
|
'value': '/usr/bin/wget',
|
|
})
|
|
|
|
data = json.loads(hook_output)
|
|
self.assertEqual(data['type'], 'Machine')
|
|
self.assertEqual(data['_method'], 'update')
|
|
self.assertEqual(data['key'], 'config/WGET_BINARY')
|
|
|
|
|
|
class TestSnapshotHookOutput(unittest.TestCase):
|
|
"""Test snapshot hook output format compliance."""
|
|
|
|
def test_snapshot_hook_basic_output(self):
|
|
"""Snapshot hook should output clean ArchiveResult JSONL."""
|
|
hook_output = json.dumps({
|
|
'type': 'ArchiveResult',
|
|
'status': 'succeeded',
|
|
'output_str': 'Downloaded 5 files',
|
|
})
|
|
|
|
data = json.loads(hook_output)
|
|
self.assertEqual(data['type'], 'ArchiveResult')
|
|
self.assertEqual(data['status'], 'succeeded')
|
|
self.assertIn('output_str', data)
|
|
|
|
def test_snapshot_hook_with_cmd(self):
|
|
"""Snapshot hook should include cmd for binary FK lookup."""
|
|
hook_output = json.dumps({
|
|
'type': 'ArchiveResult',
|
|
'status': 'succeeded',
|
|
'output_str': 'Archived with wget',
|
|
'cmd': ['/usr/bin/wget', '-p', '-k', 'https://example.com'],
|
|
})
|
|
|
|
data = json.loads(hook_output)
|
|
self.assertEqual(data['type'], 'ArchiveResult')
|
|
self.assertIsInstance(data['cmd'], list)
|
|
self.assertEqual(data['cmd'][0], '/usr/bin/wget')
|
|
|
|
def test_snapshot_hook_with_output_json(self):
|
|
"""Snapshot hook can include structured metadata in output_json."""
|
|
hook_output = json.dumps({
|
|
'type': 'ArchiveResult',
|
|
'status': 'succeeded',
|
|
'output_str': 'Got headers',
|
|
'output_json': {
|
|
'content-type': 'text/html',
|
|
'server': 'nginx',
|
|
'status-code': 200,
|
|
},
|
|
})
|
|
|
|
data = json.loads(hook_output)
|
|
self.assertEqual(data['type'], 'ArchiveResult')
|
|
self.assertIsInstance(data['output_json'], dict)
|
|
self.assertEqual(data['output_json']['status-code'], 200)
|
|
|
|
def test_snapshot_hook_skipped_status(self):
|
|
"""Snapshot hook should support skipped status."""
|
|
hook_output = json.dumps({
|
|
'type': 'ArchiveResult',
|
|
'status': 'skipped',
|
|
'output_str': 'SAVE_WGET=False',
|
|
})
|
|
|
|
data = json.loads(hook_output)
|
|
self.assertEqual(data['status'], 'skipped')
|
|
|
|
def test_snapshot_hook_failed_status(self):
|
|
"""Snapshot hook should support failed status."""
|
|
hook_output = json.dumps({
|
|
'type': 'ArchiveResult',
|
|
'status': 'failed',
|
|
'output_str': '404 Not Found',
|
|
})
|
|
|
|
data = json.loads(hook_output)
|
|
self.assertEqual(data['status'], 'failed')
|
|
|
|
|
|
class TestPluginMetadata(unittest.TestCase):
|
|
"""Test that plugin metadata is added to JSONL records."""
|
|
|
|
def test_plugin_name_added(self):
|
|
"""run_hook() should add plugin name to records."""
|
|
# Simulate what run_hook() does
|
|
script = Path('/archivebox/plugins/wget/on_Snapshot__50_wget.py')
|
|
plugin_name = script.parent.name
|
|
|
|
record = {'type': 'ArchiveResult', 'status': 'succeeded'}
|
|
record['plugin'] = plugin_name
|
|
record['plugin_hook'] = str(script)
|
|
|
|
self.assertEqual(record['plugin'], 'wget')
|
|
self.assertIn('on_Snapshot__50_wget.py', record['plugin_hook'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|