mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-03 01:15:57 +10:00
Add SSL, redirects, SEO plugin tests and fix fake test issues
- Add real integration tests for SSL, redirects, and SEO plugins using Chrome session helpers for live URL testing - Remove fake "format" tests that just created dicts and asserted on them (apt, pip, npm provider output format tests) - Remove npm integration test that created dirs then checked they existed - Fix SQLite search test to use SQLITEFTS_DB constant instead of hardcoded value
This commit is contained in:
@@ -111,29 +111,6 @@ class TestAptProviderHook(TestCase):
|
|||||||
self.assertNotIn('Traceback', result.stderr)
|
self.assertNotIn('Traceback', result.stderr)
|
||||||
|
|
||||||
|
|
||||||
class TestAptProviderOutput(TestCase):
|
|
||||||
"""Test JSONL output format from apt provider."""
|
|
||||||
|
|
||||||
def test_binary_record_format(self):
|
|
||||||
"""Binary JSONL records should have required fields."""
|
|
||||||
record = {
|
|
||||||
'type': 'Binary',
|
|
||||||
'name': 'wget',
|
|
||||||
'abspath': '/usr/bin/wget',
|
|
||||||
'version': '1.21',
|
|
||||||
'binprovider': 'apt',
|
|
||||||
'sha256': '',
|
|
||||||
'machine_id': 'machine-uuid',
|
|
||||||
'binary_id': 'binary-uuid',
|
|
||||||
}
|
|
||||||
|
|
||||||
self.assertEqual(record['type'], 'Binary')
|
|
||||||
self.assertEqual(record['binprovider'], 'apt')
|
|
||||||
self.assertIn('name', record)
|
|
||||||
self.assertIn('abspath', record)
|
|
||||||
self.assertIn('version', record)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not is_linux(), reason="apt only available on Linux")
|
@pytest.mark.skipif(not is_linux(), reason="apt only available on Linux")
|
||||||
@pytest.mark.skipif(not apt_available(), reason="apt not installed")
|
@pytest.mark.skipif(not apt_available(), reason="apt not installed")
|
||||||
class TestAptProviderSystemBinaries(TestCase):
|
class TestAptProviderSystemBinaries(TestCase):
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ import subprocess
|
|||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
@@ -141,83 +140,5 @@ class TestNpmProviderHook(TestCase):
|
|||||||
self.assertNotIn('Failed to parse overrides JSON', result.stderr)
|
self.assertNotIn('Failed to parse overrides JSON', result.stderr)
|
||||||
|
|
||||||
|
|
||||||
class TestNpmProviderOutput(TestCase):
|
|
||||||
"""Test JSONL output format from npm provider."""
|
|
||||||
|
|
||||||
def test_binary_record_format(self):
|
|
||||||
"""Binary JSONL records should have required fields."""
|
|
||||||
record = {
|
|
||||||
'type': 'Binary',
|
|
||||||
'name': 'prettier',
|
|
||||||
'abspath': '/path/to/node_modules/.bin/prettier',
|
|
||||||
'version': '3.0.0',
|
|
||||||
'binprovider': 'npm',
|
|
||||||
'sha256': '',
|
|
||||||
'machine_id': 'machine-uuid',
|
|
||||||
'binary_id': 'binary-uuid',
|
|
||||||
}
|
|
||||||
|
|
||||||
self.assertEqual(record['type'], 'Binary')
|
|
||||||
self.assertEqual(record['binprovider'], 'npm')
|
|
||||||
self.assertIn('abspath', record)
|
|
||||||
|
|
||||||
def test_machine_update_record_format(self):
|
|
||||||
"""Machine update records should have correct format."""
|
|
||||||
record = {
|
|
||||||
'type': 'Machine',
|
|
||||||
'_method': 'update',
|
|
||||||
'key': 'config/PATH',
|
|
||||||
'value': '/path/to/npm/bin:/existing/path',
|
|
||||||
}
|
|
||||||
|
|
||||||
self.assertEqual(record['type'], 'Machine')
|
|
||||||
self.assertEqual(record['_method'], 'update')
|
|
||||||
self.assertIn('key', record)
|
|
||||||
self.assertIn('value', record)
|
|
||||||
|
|
||||||
def test_node_modules_dir_record_format(self):
|
|
||||||
"""NODE_MODULES_DIR update record should have correct format."""
|
|
||||||
record = {
|
|
||||||
'type': 'Machine',
|
|
||||||
'_method': 'update',
|
|
||||||
'key': 'config/NODE_MODULES_DIR',
|
|
||||||
'value': '/path/to/npm/node_modules',
|
|
||||||
}
|
|
||||||
|
|
||||||
self.assertEqual(record['key'], 'config/NODE_MODULES_DIR')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(not npm_available(), reason="npm not installed")
|
|
||||||
class TestNpmProviderIntegration(TestCase):
|
|
||||||
"""Integration tests with real npm installations."""
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
"""Set up isolated npm environment."""
|
|
||||||
self.temp_dir = tempfile.mkdtemp()
|
|
||||||
self.lib_dir = Path(self.temp_dir) / 'lib' / 'x86_64-linux'
|
|
||||||
self.lib_dir.mkdir(parents=True)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
"""Clean up."""
|
|
||||||
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
|
||||||
|
|
||||||
def test_npm_prefix_structure(self):
|
|
||||||
"""Verify npm creates expected directory structure."""
|
|
||||||
npm_prefix = self.lib_dir / 'npm'
|
|
||||||
npm_prefix.mkdir(parents=True)
|
|
||||||
|
|
||||||
# Expected structure after npm install:
|
|
||||||
# npm/
|
|
||||||
# bin/ (symlinks to binaries)
|
|
||||||
# node_modules/ (packages)
|
|
||||||
|
|
||||||
expected_dirs = ['bin', 'node_modules']
|
|
||||||
for dir_name in expected_dirs:
|
|
||||||
(npm_prefix / dir_name).mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
for dir_name in expected_dirs:
|
|
||||||
self.assertTrue((npm_prefix / dir_name).exists())
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
pytest.main([__file__, '-v'])
|
pytest.main([__file__, '-v'])
|
||||||
|
|||||||
@@ -171,28 +171,5 @@ class TestPipProviderIntegration(TestCase):
|
|||||||
self.assertNotIn('Traceback', result.stderr)
|
self.assertNotIn('Traceback', result.stderr)
|
||||||
|
|
||||||
|
|
||||||
class TestPipProviderOutput(TestCase):
|
|
||||||
"""Test JSONL output format from pip provider."""
|
|
||||||
|
|
||||||
def test_binary_record_format(self):
|
|
||||||
"""Binary JSONL records should have required fields."""
|
|
||||||
# Example of expected format
|
|
||||||
record = {
|
|
||||||
'type': 'Binary',
|
|
||||||
'name': 'wget',
|
|
||||||
'abspath': '/usr/bin/wget',
|
|
||||||
'version': '1.21',
|
|
||||||
'binprovider': 'pip',
|
|
||||||
'sha256': 'abc123...',
|
|
||||||
}
|
|
||||||
|
|
||||||
# Validate structure
|
|
||||||
self.assertEqual(record['type'], 'Binary')
|
|
||||||
self.assertIn('name', record)
|
|
||||||
self.assertIn('abspath', record)
|
|
||||||
self.assertIn('version', record)
|
|
||||||
self.assertIn('binprovider', record)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
pytest.main([__file__, '-v'])
|
pytest.main([__file__, '-v'])
|
||||||
|
|||||||
1
archivebox/plugins/redirects/tests/__init__.py
Normal file
1
archivebox/plugins/redirects/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
"""Tests for the redirects plugin."""
|
||||||
134
archivebox/plugins/redirects/tests/test_redirects.py
Normal file
134
archivebox/plugins/redirects/tests/test_redirects.py
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
"""
|
||||||
|
Tests for the redirects plugin.
|
||||||
|
|
||||||
|
Tests the real redirects hook with actual URLs to verify
|
||||||
|
redirect chain capture.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
# Import chrome test helpers
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
|
||||||
|
from chrome_test_helpers import (
|
||||||
|
chrome_session,
|
||||||
|
get_test_env,
|
||||||
|
get_plugin_dir,
|
||||||
|
get_hook_script,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def chrome_available() -> bool:
|
||||||
|
"""Check if Chrome/Chromium is available."""
|
||||||
|
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
|
||||||
|
if shutil.which(name):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# Get the path to the redirects hook
|
||||||
|
PLUGIN_DIR = get_plugin_dir(__file__)
|
||||||
|
REDIRECTS_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_redirects.*')
|
||||||
|
|
||||||
|
|
||||||
|
class TestRedirectsPlugin(TestCase):
|
||||||
|
"""Test the redirects plugin."""
|
||||||
|
|
||||||
|
def test_redirects_hook_exists(self):
|
||||||
|
"""Redirects hook script should exist."""
|
||||||
|
self.assertIsNotNone(REDIRECTS_HOOK, "Redirects hook not found in plugin directory")
|
||||||
|
self.assertTrue(REDIRECTS_HOOK.exists(), f"Hook not found: {REDIRECTS_HOOK}")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
|
||||||
|
class TestRedirectsWithChrome(TestCase):
|
||||||
|
"""Integration tests for redirects plugin with Chrome."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Set up test environment."""
|
||||||
|
self.temp_dir = Path(tempfile.mkdtemp())
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Clean up."""
|
||||||
|
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||||
|
|
||||||
|
def test_redirects_captures_navigation(self):
|
||||||
|
"""Redirects hook should capture URL navigation without errors."""
|
||||||
|
# Use a URL that doesn't redirect (simple case)
|
||||||
|
test_url = 'https://example.com'
|
||||||
|
snapshot_id = 'test-redirects-snapshot'
|
||||||
|
|
||||||
|
try:
|
||||||
|
with chrome_session(
|
||||||
|
self.temp_dir,
|
||||||
|
crawl_id='test-redirects-crawl',
|
||||||
|
snapshot_id=snapshot_id,
|
||||||
|
test_url=test_url,
|
||||||
|
navigate=True,
|
||||||
|
timeout=30,
|
||||||
|
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
|
||||||
|
# Get environment and run the redirects hook
|
||||||
|
env = get_test_env()
|
||||||
|
env['CHROME_HEADLESS'] = 'true'
|
||||||
|
|
||||||
|
# Run redirects hook with the active Chrome session
|
||||||
|
result = subprocess.run(
|
||||||
|
['node', str(REDIRECTS_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
|
||||||
|
cwd=str(snapshot_chrome_dir),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=60,
|
||||||
|
env=env
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check for output file
|
||||||
|
redirects_output = snapshot_chrome_dir / 'redirects.jsonl'
|
||||||
|
|
||||||
|
redirects_data = None
|
||||||
|
|
||||||
|
# Try parsing from file first
|
||||||
|
if redirects_output.exists():
|
||||||
|
with open(redirects_output) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith('{'):
|
||||||
|
try:
|
||||||
|
redirects_data = json.loads(line)
|
||||||
|
break
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Try parsing from stdout if not in file
|
||||||
|
if not redirects_data:
|
||||||
|
for line in result.stdout.split('\n'):
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith('{'):
|
||||||
|
try:
|
||||||
|
record = json.loads(line)
|
||||||
|
if 'chain' in record or 'redirects' in record or record.get('type') == 'Redirects':
|
||||||
|
redirects_data = record
|
||||||
|
break
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Verify hook ran successfully
|
||||||
|
# example.com typically doesn't redirect, so we just verify no errors
|
||||||
|
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||||
|
self.assertNotIn('Traceback', result.stderr)
|
||||||
|
self.assertNotIn('Error:', result.stderr)
|
||||||
|
|
||||||
|
except RuntimeError as e:
|
||||||
|
if 'Chrome' in str(e) or 'CDP' in str(e):
|
||||||
|
self.skipTest(f"Chrome session setup failed: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
@@ -33,7 +33,7 @@ class TestSqliteSearchBackend(TestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
"""Create a temporary data directory with search index."""
|
"""Create a temporary data directory with search index."""
|
||||||
self.temp_dir = tempfile.mkdtemp()
|
self.temp_dir = tempfile.mkdtemp()
|
||||||
self.db_path = Path(self.temp_dir) / 'search.sqlite3'
|
self.db_path = Path(self.temp_dir) / SQLITEFTS_DB
|
||||||
|
|
||||||
# Patch DATA_DIR
|
# Patch DATA_DIR
|
||||||
self.settings_patch = patch(
|
self.settings_patch = patch(
|
||||||
@@ -252,7 +252,7 @@ class TestSqliteSearchWithRealData(TestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
"""Create index with realistic test data."""
|
"""Create index with realistic test data."""
|
||||||
self.temp_dir = tempfile.mkdtemp()
|
self.temp_dir = tempfile.mkdtemp()
|
||||||
self.db_path = Path(self.temp_dir) / 'search.sqlite3'
|
self.db_path = Path(self.temp_dir) / SQLITEFTS_DB
|
||||||
|
|
||||||
self.settings_patch = patch(
|
self.settings_patch = patch(
|
||||||
'archivebox.plugins.search_backend_sqlite.search.settings'
|
'archivebox.plugins.search_backend_sqlite.search.settings'
|
||||||
|
|||||||
1
archivebox/plugins/seo/tests/__init__.py
Normal file
1
archivebox/plugins/seo/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
"""Tests for the SEO plugin."""
|
||||||
135
archivebox/plugins/seo/tests/test_seo.py
Normal file
135
archivebox/plugins/seo/tests/test_seo.py
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
"""
|
||||||
|
Tests for the SEO plugin.
|
||||||
|
|
||||||
|
Tests the real SEO hook with an actual URL to verify
|
||||||
|
meta tag extraction.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
# Import chrome test helpers
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
|
||||||
|
from chrome_test_helpers import (
|
||||||
|
chrome_session,
|
||||||
|
get_test_env,
|
||||||
|
get_plugin_dir,
|
||||||
|
get_hook_script,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def chrome_available() -> bool:
|
||||||
|
"""Check if Chrome/Chromium is available."""
|
||||||
|
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
|
||||||
|
if shutil.which(name):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# Get the path to the SEO hook
|
||||||
|
PLUGIN_DIR = get_plugin_dir(__file__)
|
||||||
|
SEO_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_seo.*')
|
||||||
|
|
||||||
|
|
||||||
|
class TestSEOPlugin(TestCase):
|
||||||
|
"""Test the SEO plugin."""
|
||||||
|
|
||||||
|
def test_seo_hook_exists(self):
|
||||||
|
"""SEO hook script should exist."""
|
||||||
|
self.assertIsNotNone(SEO_HOOK, "SEO hook not found in plugin directory")
|
||||||
|
self.assertTrue(SEO_HOOK.exists(), f"Hook not found: {SEO_HOOK}")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
|
||||||
|
class TestSEOWithChrome(TestCase):
|
||||||
|
"""Integration tests for SEO plugin with Chrome."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Set up test environment."""
|
||||||
|
self.temp_dir = Path(tempfile.mkdtemp())
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Clean up."""
|
||||||
|
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||||
|
|
||||||
|
def test_seo_extracts_meta_tags(self):
|
||||||
|
"""SEO hook should extract meta tags from a real URL."""
|
||||||
|
test_url = 'https://example.com'
|
||||||
|
snapshot_id = 'test-seo-snapshot'
|
||||||
|
|
||||||
|
try:
|
||||||
|
with chrome_session(
|
||||||
|
self.temp_dir,
|
||||||
|
crawl_id='test-seo-crawl',
|
||||||
|
snapshot_id=snapshot_id,
|
||||||
|
test_url=test_url,
|
||||||
|
navigate=True,
|
||||||
|
timeout=30,
|
||||||
|
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
|
||||||
|
# Get environment and run the SEO hook
|
||||||
|
env = get_test_env()
|
||||||
|
env['CHROME_HEADLESS'] = 'true'
|
||||||
|
|
||||||
|
# Run SEO hook with the active Chrome session
|
||||||
|
result = subprocess.run(
|
||||||
|
['node', str(SEO_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
|
||||||
|
cwd=str(snapshot_chrome_dir),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=60,
|
||||||
|
env=env
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check for output file
|
||||||
|
seo_output = snapshot_chrome_dir / 'seo.json'
|
||||||
|
|
||||||
|
seo_data = None
|
||||||
|
|
||||||
|
# Try parsing from file first
|
||||||
|
if seo_output.exists():
|
||||||
|
with open(seo_output) as f:
|
||||||
|
try:
|
||||||
|
seo_data = json.load(f)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Try parsing from stdout if not in file
|
||||||
|
if not seo_data:
|
||||||
|
for line in result.stdout.split('\n'):
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith('{'):
|
||||||
|
try:
|
||||||
|
record = json.loads(line)
|
||||||
|
# SEO data typically has title, description, or og: tags
|
||||||
|
if any(key in record for key in ['title', 'description', 'og:title', 'canonical']):
|
||||||
|
seo_data = record
|
||||||
|
break
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Verify hook ran successfully
|
||||||
|
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||||
|
self.assertNotIn('Traceback', result.stderr)
|
||||||
|
self.assertNotIn('Error:', result.stderr)
|
||||||
|
|
||||||
|
# example.com has a title, so we should get at least that
|
||||||
|
if seo_data:
|
||||||
|
# Verify we got some SEO data
|
||||||
|
has_seo_data = any(key in seo_data for key in ['title', 'description', 'og:title', 'canonical', 'meta'])
|
||||||
|
self.assertTrue(has_seo_data or seo_data, f"No SEO data extracted: {seo_data}")
|
||||||
|
|
||||||
|
except RuntimeError as e:
|
||||||
|
if 'Chrome' in str(e) or 'CDP' in str(e):
|
||||||
|
self.skipTest(f"Chrome session setup failed: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
1
archivebox/plugins/ssl/tests/__init__.py
Normal file
1
archivebox/plugins/ssl/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
"""Tests for the SSL plugin."""
|
||||||
139
archivebox/plugins/ssl/tests/test_ssl.py
Normal file
139
archivebox/plugins/ssl/tests/test_ssl.py
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
"""
|
||||||
|
Tests for the SSL plugin.
|
||||||
|
|
||||||
|
Tests the real SSL hook with an actual HTTPS URL to verify
|
||||||
|
certificate information extraction.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
# Import chrome test helpers
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'chrome' / 'tests'))
|
||||||
|
from chrome_test_helpers import (
|
||||||
|
chrome_session,
|
||||||
|
get_test_env,
|
||||||
|
get_plugin_dir,
|
||||||
|
get_hook_script,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def chrome_available() -> bool:
|
||||||
|
"""Check if Chrome/Chromium is available."""
|
||||||
|
for name in ['chromium', 'chromium-browser', 'google-chrome', 'chrome']:
|
||||||
|
if shutil.which(name):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# Get the path to the SSL hook
|
||||||
|
PLUGIN_DIR = get_plugin_dir(__file__)
|
||||||
|
SSL_HOOK = get_hook_script(PLUGIN_DIR, 'on_Snapshot__*_ssl.*')
|
||||||
|
|
||||||
|
|
||||||
|
class TestSSLPlugin(TestCase):
|
||||||
|
"""Test the SSL plugin with real HTTPS URLs."""
|
||||||
|
|
||||||
|
def test_ssl_hook_exists(self):
|
||||||
|
"""SSL hook script should exist."""
|
||||||
|
self.assertIsNotNone(SSL_HOOK, "SSL hook not found in plugin directory")
|
||||||
|
self.assertTrue(SSL_HOOK.exists(), f"Hook not found: {SSL_HOOK}")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not chrome_available(), reason="Chrome not installed")
|
||||||
|
class TestSSLWithChrome(TestCase):
|
||||||
|
"""Integration tests for SSL plugin with Chrome."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Set up test environment."""
|
||||||
|
self.temp_dir = Path(tempfile.mkdtemp())
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Clean up."""
|
||||||
|
shutil.rmtree(self.temp_dir, ignore_errors=True)
|
||||||
|
|
||||||
|
def test_ssl_extracts_certificate_from_https_url(self):
|
||||||
|
"""SSL hook should extract certificate info from a real HTTPS URL."""
|
||||||
|
test_url = 'https://example.com'
|
||||||
|
snapshot_id = 'test-ssl-snapshot'
|
||||||
|
|
||||||
|
try:
|
||||||
|
with chrome_session(
|
||||||
|
self.temp_dir,
|
||||||
|
crawl_id='test-ssl-crawl',
|
||||||
|
snapshot_id=snapshot_id,
|
||||||
|
test_url=test_url,
|
||||||
|
navigate=True,
|
||||||
|
timeout=30,
|
||||||
|
) as (chrome_process, chrome_pid, snapshot_chrome_dir):
|
||||||
|
# Get environment and run the SSL hook
|
||||||
|
env = get_test_env()
|
||||||
|
env['CHROME_HEADLESS'] = 'true'
|
||||||
|
|
||||||
|
# Run SSL hook with the active Chrome session
|
||||||
|
result = subprocess.run(
|
||||||
|
['node', str(SSL_HOOK), f'--url={test_url}', f'--snapshot-id={snapshot_id}'],
|
||||||
|
cwd=str(snapshot_chrome_dir),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=60,
|
||||||
|
env=env
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check for output file
|
||||||
|
ssl_output = snapshot_chrome_dir / 'ssl.jsonl'
|
||||||
|
|
||||||
|
ssl_data = None
|
||||||
|
|
||||||
|
# Try parsing from file first
|
||||||
|
if ssl_output.exists():
|
||||||
|
with open(ssl_output) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith('{'):
|
||||||
|
try:
|
||||||
|
ssl_data = json.loads(line)
|
||||||
|
break
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Try parsing from stdout if not in file
|
||||||
|
if not ssl_data:
|
||||||
|
for line in result.stdout.split('\n'):
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith('{'):
|
||||||
|
try:
|
||||||
|
record = json.loads(line)
|
||||||
|
if 'protocol' in record or 'issuer' in record or record.get('type') == 'SSL':
|
||||||
|
ssl_data = record
|
||||||
|
break
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Verify we got SSL data from HTTPS URL
|
||||||
|
if ssl_data:
|
||||||
|
# example.com uses HTTPS, should get certificate info
|
||||||
|
self.assertIn('protocol', ssl_data, f"SSL data missing protocol: {ssl_data}")
|
||||||
|
self.assertTrue(
|
||||||
|
ssl_data['protocol'].startswith('TLS') or ssl_data['protocol'].startswith('SSL'),
|
||||||
|
f"Unexpected protocol: {ssl_data['protocol']}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# If no SSL data, at least verify hook ran without crashing
|
||||||
|
self.assertEqual(result.returncode, 0, f"Hook failed: {result.stderr}")
|
||||||
|
|
||||||
|
except RuntimeError as e:
|
||||||
|
if 'Chrome' in str(e) or 'CDP' in str(e):
|
||||||
|
self.skipTest(f"Chrome session setup failed: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
Reference in New Issue
Block a user