Files
ArchiveBox/archivebox/tests/test_hooks.py

537 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Unit tests for the ArchiveBox hook architecture.
Tests hook discovery, execution, JSONL parsing, background hook detection,
binary lookup, and install hook XYZ_BINARY env var handling.
Run with:
sudo -u testuser bash -c 'source .venv/bin/activate && python -m pytest archivebox/tests/test_hooks.py -v'
"""
import json
import os
import shutil
import subprocess
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
# Set up Django before importing any Django-dependent modules
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.settings')
class TestBackgroundHookDetection(unittest.TestCase):
"""Test that background hooks are detected by .bg. suffix."""
def test_bg_js_suffix_detected(self):
"""Hooks with .bg.js suffix should be detected as background."""
script = Path('/path/to/on_Snapshot__21_consolelog.bg.js')
is_background = '.bg.' in script.name or '__background' in script.stem
self.assertTrue(is_background)
def test_bg_py_suffix_detected(self):
"""Hooks with .bg.py suffix should be detected as background."""
script = Path('/path/to/on_Snapshot__24_responses.bg.py')
is_background = '.bg.' in script.name or '__background' in script.stem
self.assertTrue(is_background)
def test_bg_sh_suffix_detected(self):
"""Hooks with .bg.sh suffix should be detected as background."""
script = Path('/path/to/on_Snapshot__23_ssl.bg.sh')
is_background = '.bg.' in script.name or '__background' in script.stem
self.assertTrue(is_background)
def test_legacy_background_suffix_detected(self):
"""Hooks with __background in stem should be detected (backwards compat)."""
script = Path('/path/to/on_Snapshot__21_consolelog__background.js')
is_background = '.bg.' in script.name or '__background' in script.stem
self.assertTrue(is_background)
def test_foreground_hook_not_detected(self):
"""Hooks without .bg. or __background should NOT be detected as background."""
script = Path('/path/to/on_Snapshot__11_favicon.js')
is_background = '.bg.' in script.name or '__background' in script.stem
self.assertFalse(is_background)
def test_foreground_py_hook_not_detected(self):
"""Python hooks without .bg. should NOT be detected as background."""
script = Path('/path/to/on_Snapshot__50_wget.py')
is_background = '.bg.' in script.name or '__background' in script.stem
self.assertFalse(is_background)
class TestJSONLParsing(unittest.TestCase):
"""Test JSONL parsing in run_hook() output processing."""
def test_parse_clean_jsonl(self):
"""Clean JSONL format should be parsed correctly."""
stdout = '{"type": "ArchiveResult", "status": "succeeded", "output_str": "Done"}'
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], 'ArchiveResult')
self.assertEqual(records[0]['status'], 'succeeded')
self.assertEqual(records[0]['output_str'], 'Done')
def test_parse_multiple_jsonl_records(self):
"""Multiple JSONL records should all be parsed."""
stdout = '''{"type": "ArchiveResult", "status": "succeeded", "output_str": "Done"}
{"type": "Binary", "name": "wget", "abspath": "/usr/bin/wget"}'''
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
self.assertEqual(len(records), 2)
self.assertEqual(records[0]['type'], 'ArchiveResult')
self.assertEqual(records[1]['type'], 'Binary')
def test_parse_jsonl_with_log_output(self):
"""JSONL should be extracted from mixed stdout with log lines."""
stdout = '''Starting hook execution...
Processing URL: https://example.com
{"type": "ArchiveResult", "status": "succeeded", "output_str": "Downloaded"}
Hook completed successfully'''
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['status'], 'succeeded')
def test_parse_legacy_result_json_format(self):
"""Legacy RESULT_JSON= format should be parsed for backwards compat."""
stdout = 'RESULT_JSON={"status": "succeeded", "output": "Done"}'
output_json = None
records = []
for line in stdout.splitlines():
line = line.strip()
if line.startswith('RESULT_JSON='):
try:
data = json.loads(line[len('RESULT_JSON='):])
if output_json is None:
output_json = data
data['type'] = 'ArchiveResult'
records.append(data)
except json.JSONDecodeError:
pass
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], 'ArchiveResult')
self.assertEqual(records[0]['status'], 'succeeded')
def test_ignore_invalid_json(self):
"""Invalid JSON should be silently ignored."""
stdout = '''{"type": "ArchiveResult", "status": "succeeded"}
{invalid json here}
not json at all
{"type": "Binary", "name": "wget"}'''
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
self.assertEqual(len(records), 2)
def test_json_without_type_ignored(self):
"""JSON objects without 'type' field should be ignored."""
stdout = '''{"status": "succeeded", "output_str": "Done"}
{"type": "ArchiveResult", "status": "succeeded"}'''
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], 'ArchiveResult')
class TestInstallHookEnvVarHandling(unittest.TestCase):
"""Test that install hooks respect XYZ_BINARY env vars."""
def setUp(self):
"""Set up test environment."""
self.work_dir = Path(tempfile.mkdtemp())
self.test_hook = self.work_dir / 'test_hook.py'
def tearDown(self):
"""Clean up test environment."""
shutil.rmtree(self.work_dir, ignore_errors=True)
def test_binary_env_var_absolute_path_handling(self):
"""Install hooks should handle absolute paths in XYZ_BINARY."""
# Test the logic that install hooks use
configured_binary = '/custom/path/to/wget2'
if '/' in configured_binary:
bin_name = Path(configured_binary).name
else:
bin_name = configured_binary
self.assertEqual(bin_name, 'wget2')
def test_binary_env_var_name_only_handling(self):
"""Install hooks should handle binary names in XYZ_BINARY."""
# Test the logic that install hooks use
configured_binary = 'wget2'
if '/' in configured_binary:
bin_name = Path(configured_binary).name
else:
bin_name = configured_binary
self.assertEqual(bin_name, 'wget2')
def test_binary_env_var_empty_default(self):
"""Install hooks should use default when XYZ_BINARY is empty."""
configured_binary = ''
if configured_binary:
if '/' in configured_binary:
bin_name = Path(configured_binary).name
else:
bin_name = configured_binary
else:
bin_name = 'wget' # default
self.assertEqual(bin_name, 'wget')
class TestHookDiscovery(unittest.TestCase):
"""Test hook discovery functions."""
def setUp(self):
"""Set up test plugin directory."""
self.test_dir = Path(tempfile.mkdtemp())
self.plugins_dir = self.test_dir / 'plugins'
self.plugins_dir.mkdir()
# Create test plugin structure
wget_dir = self.plugins_dir / 'wget'
wget_dir.mkdir()
(wget_dir / 'on_Snapshot__50_wget.py').write_text('# test hook')
(wget_dir / 'on_Crawl__00_install_wget.py').write_text('# install hook')
chrome_dir = self.plugins_dir / 'chrome_session'
chrome_dir.mkdir()
(chrome_dir / 'on_Snapshot__20_chrome_session.bg.js').write_text('// background hook')
consolelog_dir = self.plugins_dir / 'consolelog'
consolelog_dir.mkdir()
(consolelog_dir / 'on_Snapshot__21_consolelog.bg.js').write_text('// background hook')
def tearDown(self):
"""Clean up test directory."""
shutil.rmtree(self.test_dir, ignore_errors=True)
def test_discover_hooks_by_event(self):
"""discover_hooks() should find all hooks for an event."""
# Use the local implementation since we can't easily mock BUILTIN_PLUGINS_DIR
hooks = []
for ext in ('sh', 'py', 'js'):
pattern = f'*/on_Snapshot__*.{ext}'
hooks.extend(self.plugins_dir.glob(pattern))
hooks = sorted(set(hooks), key=lambda p: p.name)
self.assertEqual(len(hooks), 3)
hook_names = [h.name for h in hooks]
self.assertIn('on_Snapshot__20_chrome_session.bg.js', hook_names)
self.assertIn('on_Snapshot__21_consolelog.bg.js', hook_names)
self.assertIn('on_Snapshot__50_wget.py', hook_names)
def test_discover_hooks_sorted_by_name(self):
"""Hooks should be sorted by filename (numeric prefix ordering)."""
hooks = []
for ext in ('sh', 'py', 'js'):
pattern = f'*/on_Snapshot__*.{ext}'
hooks.extend(self.plugins_dir.glob(pattern))
hooks = sorted(set(hooks), key=lambda p: p.name)
# Check numeric ordering
self.assertEqual(hooks[0].name, 'on_Snapshot__20_chrome_session.js')
self.assertEqual(hooks[1].name, 'on_Snapshot__21_consolelog.bg.js')
self.assertEqual(hooks[2].name, 'on_Snapshot__50_wget.py')
class TestGetExtractorName(unittest.TestCase):
"""Test get_extractor_name() function."""
def test_strip_numeric_prefix(self):
"""Numeric prefix should be stripped from extractor name."""
# Inline implementation of get_extractor_name
def get_extractor_name(extractor: str) -> str:
parts = extractor.split('_', 1)
if len(parts) == 2 and parts[0].isdigit():
return parts[1]
return extractor
self.assertEqual(get_extractor_name('10_title'), 'title')
self.assertEqual(get_extractor_name('26_readability'), 'readability')
self.assertEqual(get_extractor_name('50_parse_html_urls'), 'parse_html_urls')
def test_no_prefix_unchanged(self):
"""Extractor without numeric prefix should be unchanged."""
def get_extractor_name(extractor: str) -> str:
parts = extractor.split('_', 1)
if len(parts) == 2 and parts[0].isdigit():
return parts[1]
return extractor
self.assertEqual(get_extractor_name('title'), 'title')
self.assertEqual(get_extractor_name('readability'), 'readability')
class TestHookExecution(unittest.TestCase):
"""Test hook execution with real subprocesses."""
def setUp(self):
"""Set up test environment."""
self.work_dir = Path(tempfile.mkdtemp())
def tearDown(self):
"""Clean up test environment."""
shutil.rmtree(self.work_dir, ignore_errors=True)
def test_python_hook_execution(self):
"""Python hook should execute and output JSONL."""
hook_path = self.work_dir / 'test_hook.py'
hook_path.write_text('''#!/usr/bin/env python3
import json
print(json.dumps({"type": "ArchiveResult", "status": "succeeded", "output_str": "Test passed"}))
''')
result = subprocess.run(
['python3', str(hook_path)],
cwd=str(self.work_dir),
capture_output=True,
text=True,
)
self.assertEqual(result.returncode, 0)
output = json.loads(result.stdout.strip())
self.assertEqual(output['type'], 'ArchiveResult')
self.assertEqual(output['status'], 'succeeded')
def test_js_hook_execution(self):
"""JavaScript hook should execute and output JSONL."""
# Skip if node not available
if shutil.which('node') is None:
self.skipTest('Node.js not available')
hook_path = self.work_dir / 'test_hook.js'
hook_path.write_text('''#!/usr/bin/env node
console.log(JSON.stringify({type: 'ArchiveResult', status: 'succeeded', output_str: 'JS test'}));
''')
result = subprocess.run(
['node', str(hook_path)],
cwd=str(self.work_dir),
capture_output=True,
text=True,
)
self.assertEqual(result.returncode, 0)
output = json.loads(result.stdout.strip())
self.assertEqual(output['type'], 'ArchiveResult')
self.assertEqual(output['status'], 'succeeded')
def test_hook_receives_cli_args(self):
"""Hook should receive CLI arguments."""
hook_path = self.work_dir / 'test_hook.py'
hook_path.write_text('''#!/usr/bin/env python3
import sys
import json
# Simple arg parsing
args = {}
for arg in sys.argv[1:]:
if arg.startswith('--') and '=' in arg:
key, val = arg[2:].split('=', 1)
args[key.replace('-', '_')] = val
print(json.dumps({"type": "ArchiveResult", "status": "succeeded", "url": args.get("url", "")}))
''')
result = subprocess.run(
['python3', str(hook_path), '--url=https://example.com'],
cwd=str(self.work_dir),
capture_output=True,
text=True,
)
self.assertEqual(result.returncode, 0)
output = json.loads(result.stdout.strip())
self.assertEqual(output['url'], 'https://example.com')
class TestInstallHookOutput(unittest.TestCase):
"""Test install hook output format compliance."""
def setUp(self):
"""Set up test environment."""
self.work_dir = Path(tempfile.mkdtemp())
def tearDown(self):
"""Clean up test environment."""
shutil.rmtree(self.work_dir, ignore_errors=True)
def test_install_hook_outputs_binary(self):
"""Install hook should output Binary JSONL when binary found."""
hook_output = json.dumps({
'type': 'Binary',
'name': 'wget',
'abspath': '/usr/bin/wget',
'version': '1.21.3',
'sha256': None,
'binprovider': 'apt',
})
data = json.loads(hook_output)
self.assertEqual(data['type'], 'Binary')
self.assertEqual(data['name'], 'wget')
self.assertTrue(data['abspath'].startswith('/'))
def test_install_hook_outputs_machine_config(self):
"""Install hook should output Machine config update JSONL."""
hook_output = json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/WGET_BINARY',
'value': '/usr/bin/wget',
})
data = json.loads(hook_output)
self.assertEqual(data['type'], 'Machine')
self.assertEqual(data['_method'], 'update')
self.assertEqual(data['key'], 'config/WGET_BINARY')
class TestSnapshotHookOutput(unittest.TestCase):
"""Test snapshot hook output format compliance."""
def test_snapshot_hook_basic_output(self):
"""Snapshot hook should output clean ArchiveResult JSONL."""
hook_output = json.dumps({
'type': 'ArchiveResult',
'status': 'succeeded',
'output_str': 'Downloaded 5 files',
})
data = json.loads(hook_output)
self.assertEqual(data['type'], 'ArchiveResult')
self.assertEqual(data['status'], 'succeeded')
self.assertIn('output_str', data)
def test_snapshot_hook_with_cmd(self):
"""Snapshot hook should include cmd for binary FK lookup."""
hook_output = json.dumps({
'type': 'ArchiveResult',
'status': 'succeeded',
'output_str': 'Archived with wget',
'cmd': ['/usr/bin/wget', '-p', '-k', 'https://example.com'],
})
data = json.loads(hook_output)
self.assertEqual(data['type'], 'ArchiveResult')
self.assertIsInstance(data['cmd'], list)
self.assertEqual(data['cmd'][0], '/usr/bin/wget')
def test_snapshot_hook_with_output_json(self):
"""Snapshot hook can include structured metadata in output_json."""
hook_output = json.dumps({
'type': 'ArchiveResult',
'status': 'succeeded',
'output_str': 'Got headers',
'output_json': {
'content-type': 'text/html',
'server': 'nginx',
'status-code': 200,
},
})
data = json.loads(hook_output)
self.assertEqual(data['type'], 'ArchiveResult')
self.assertIsInstance(data['output_json'], dict)
self.assertEqual(data['output_json']['status-code'], 200)
def test_snapshot_hook_skipped_status(self):
"""Snapshot hook should support skipped status."""
hook_output = json.dumps({
'type': 'ArchiveResult',
'status': 'skipped',
'output_str': 'SAVE_WGET=False',
})
data = json.loads(hook_output)
self.assertEqual(data['status'], 'skipped')
def test_snapshot_hook_failed_status(self):
"""Snapshot hook should support failed status."""
hook_output = json.dumps({
'type': 'ArchiveResult',
'status': 'failed',
'output_str': '404 Not Found',
})
data = json.loads(hook_output)
self.assertEqual(data['status'], 'failed')
class TestPluginMetadata(unittest.TestCase):
"""Test that plugin metadata is added to JSONL records."""
def test_plugin_name_added(self):
"""run_hook() should add plugin name to records."""
# Simulate what run_hook() does
script = Path('/archivebox/plugins/wget/on_Snapshot__50_wget.py')
plugin_name = script.parent.name
record = {'type': 'ArchiveResult', 'status': 'succeeded'}
record['plugin'] = plugin_name
record['plugin_hook'] = str(script)
self.assertEqual(record['plugin'], 'wget')
self.assertIn('on_Snapshot__50_wget.py', record['plugin_hook'])
if __name__ == '__main__':
unittest.main()