mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-06 07:47:53 +10:00
wip
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
"""archivebox/tests/conftest.py - Pytest fixtures for CLI tests."""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import subprocess
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
|
||||
@@ -104,6 +106,234 @@ def initialized_archive(isolated_data_dir):
|
||||
return isolated_data_dir
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# CWD-based CLI Helpers (no DATA_DIR env)
|
||||
# =============================================================================
|
||||
|
||||
def run_archivebox_cmd_cwd(
|
||||
args: List[str],
|
||||
cwd: Path,
|
||||
stdin: Optional[str] = None,
|
||||
timeout: int = 60,
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
) -> Tuple[str, str, int]:
|
||||
"""
|
||||
Run archivebox command via subprocess using cwd as DATA_DIR (no DATA_DIR env).
|
||||
Returns (stdout, stderr, returncode).
|
||||
"""
|
||||
cmd = [sys.executable, '-m', 'archivebox'] + args
|
||||
|
||||
base_env = os.environ.copy()
|
||||
base_env.pop('DATA_DIR', None)
|
||||
base_env['USE_COLOR'] = 'False'
|
||||
base_env['SHOW_PROGRESS'] = 'False'
|
||||
|
||||
if env:
|
||||
base_env.update(env)
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
input=stdin,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=cwd,
|
||||
env=base_env,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
return result.stdout, result.stderr, result.returncode
|
||||
|
||||
|
||||
def run_python_cwd(
|
||||
script: str,
|
||||
cwd: Path,
|
||||
timeout: int = 60,
|
||||
) -> Tuple[str, str, int]:
|
||||
base_env = os.environ.copy()
|
||||
base_env.pop('DATA_DIR', None)
|
||||
result = subprocess.run(
|
||||
[sys.executable, '-'],
|
||||
input=script,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
cwd=cwd,
|
||||
env=base_env,
|
||||
timeout=timeout,
|
||||
)
|
||||
return result.stdout, result.stderr, result.returncode
|
||||
|
||||
def _get_machine_type() -> str:
|
||||
import platform
|
||||
|
||||
os_name = platform.system().lower()
|
||||
arch = platform.machine().lower()
|
||||
in_docker = os.environ.get('IN_DOCKER', '').lower() in ('1', 'true', 'yes')
|
||||
suffix = '-docker' if in_docker else ''
|
||||
return f'{arch}-{os_name}{suffix}'
|
||||
|
||||
def _find_cached_chromium(lib_dir: Path) -> Optional[Path]:
|
||||
candidates = [
|
||||
lib_dir / 'puppeteer',
|
||||
lib_dir / 'npm' / 'node_modules' / 'puppeteer' / '.local-chromium',
|
||||
]
|
||||
for base in candidates:
|
||||
if not base.exists():
|
||||
continue
|
||||
for path in base.rglob('Chromium.app/Contents/MacOS/Chromium'):
|
||||
return path
|
||||
for path in base.rglob('chrome-linux/chrome'):
|
||||
return path
|
||||
for path in base.rglob('chrome-linux64/chrome'):
|
||||
return path
|
||||
return None
|
||||
|
||||
def _find_system_browser() -> Optional[Path]:
|
||||
candidates = [
|
||||
Path('/Applications/Chromium.app/Contents/MacOS/Chromium'),
|
||||
Path('/usr/bin/chromium'),
|
||||
Path('/usr/bin/chromium-browser'),
|
||||
]
|
||||
for candidate in candidates:
|
||||
if candidate.exists():
|
||||
return candidate
|
||||
return None
|
||||
|
||||
def _ensure_puppeteer(shared_lib: Path) -> None:
|
||||
npm_prefix = shared_lib / 'npm'
|
||||
node_modules = npm_prefix / 'node_modules'
|
||||
puppeteer_dir = node_modules / 'puppeteer'
|
||||
if puppeteer_dir.exists():
|
||||
return
|
||||
npm_prefix.mkdir(parents=True, exist_ok=True)
|
||||
env = os.environ.copy()
|
||||
env['PUPPETEER_SKIP_DOWNLOAD'] = '1'
|
||||
subprocess.run(
|
||||
['npm', 'install', 'puppeteer'],
|
||||
cwd=str(npm_prefix),
|
||||
env=env,
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=600,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def real_archive_with_example(tmp_path_factory, request):
|
||||
"""
|
||||
Initialize archive and add https://example.com using chrome+responses only.
|
||||
Uses cwd for DATA_DIR and symlinks lib dir to a shared cache.
|
||||
"""
|
||||
tmp_path = tmp_path_factory.mktemp("archivebox_data")
|
||||
if getattr(request, "cls", None) is not None:
|
||||
request.cls.data_dir = tmp_path
|
||||
|
||||
stdout, stderr, returncode = run_archivebox_cmd_cwd(
|
||||
['init', '--quick'],
|
||||
cwd=tmp_path,
|
||||
timeout=120,
|
||||
)
|
||||
assert returncode == 0, f"archivebox init failed: {stderr}"
|
||||
|
||||
stdout, stderr, returncode = run_archivebox_cmd_cwd(
|
||||
[
|
||||
'config',
|
||||
'--set',
|
||||
'LISTEN_HOST=archivebox.localhost:8000',
|
||||
'PUBLIC_INDEX=True',
|
||||
'PUBLIC_SNAPSHOTS=True',
|
||||
'PUBLIC_ADD_VIEW=True',
|
||||
],
|
||||
cwd=tmp_path,
|
||||
)
|
||||
assert returncode == 0, f"archivebox config failed: {stderr}"
|
||||
|
||||
machine_type = _get_machine_type()
|
||||
shared_root = Path(__file__).resolve().parents[3] / 'tmp' / 'test_lib_cache'
|
||||
shared_lib = shared_root / machine_type
|
||||
shared_lib.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
lib_target = tmp_path / 'lib' / machine_type
|
||||
if lib_target.exists() and not lib_target.is_symlink():
|
||||
shutil.rmtree(lib_target)
|
||||
if not lib_target.exists():
|
||||
lib_target.parent.mkdir(parents=True, exist_ok=True)
|
||||
lib_target.symlink_to(shared_lib, target_is_directory=True)
|
||||
|
||||
_ensure_puppeteer(shared_lib)
|
||||
cached_chromium = _find_cached_chromium(shared_lib)
|
||||
if cached_chromium:
|
||||
browser_binary = cached_chromium
|
||||
else:
|
||||
browser_binary = _find_system_browser()
|
||||
if browser_binary:
|
||||
chromium_link = shared_lib / 'chromium-bin'
|
||||
if not chromium_link.exists():
|
||||
chromium_link.symlink_to(browser_binary)
|
||||
browser_binary = chromium_link
|
||||
|
||||
if browser_binary:
|
||||
stdout, stderr, returncode = run_archivebox_cmd_cwd(
|
||||
[f'config', '--set', f'CHROME_BINARY={browser_binary}'],
|
||||
cwd=tmp_path,
|
||||
)
|
||||
assert returncode == 0, f"archivebox config CHROME_BINARY failed: {stderr}"
|
||||
script = textwrap.dedent(f"""\
|
||||
import os
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.core.settings')
|
||||
import django
|
||||
django.setup()
|
||||
from django.utils import timezone
|
||||
from archivebox.machine.models import Binary, Machine
|
||||
machine = Machine.current()
|
||||
Binary.objects.filter(machine=machine, name='chromium').update(
|
||||
status='installed',
|
||||
abspath='{browser_binary}',
|
||||
binprovider='env',
|
||||
retry_at=timezone.now(),
|
||||
)
|
||||
Binary.objects.update_or_create(
|
||||
machine=machine,
|
||||
name='chromium',
|
||||
defaults={{
|
||||
'status': 'installed',
|
||||
'abspath': '{browser_binary}',
|
||||
'binprovider': 'env',
|
||||
'retry_at': timezone.now(),
|
||||
}},
|
||||
)
|
||||
print('OK')
|
||||
"""
|
||||
)
|
||||
stdout, stderr, returncode = run_python_cwd(script, cwd=tmp_path, timeout=60)
|
||||
assert returncode == 0, f"Register chromium binary failed: {stderr}"
|
||||
|
||||
add_env = {
|
||||
'CHROME_ENABLED': 'True',
|
||||
'RESPONSES_ENABLED': 'True',
|
||||
'DOM_ENABLED': 'False',
|
||||
'SHOW_PROGRESS': 'False',
|
||||
'USE_COLOR': 'False',
|
||||
'CHROME_HEADLESS': 'True',
|
||||
'CHROME_PAGELOAD_TIMEOUT': '45',
|
||||
'CHROME_TIMEOUT': '60',
|
||||
'RESPONSES_TIMEOUT': '30',
|
||||
}
|
||||
if browser_binary:
|
||||
add_env['CHROME_BINARY'] = str(browser_binary)
|
||||
if cached_chromium:
|
||||
add_env['PUPPETEER_CACHE_DIR'] = str(shared_lib / 'puppeteer')
|
||||
stdout, stderr, returncode = run_archivebox_cmd_cwd(
|
||||
['add', '--depth=0', '--plugins=chrome,responses', 'https://example.com'],
|
||||
cwd=tmp_path,
|
||||
timeout=600,
|
||||
env=add_env,
|
||||
)
|
||||
assert returncode == 0, f"archivebox add failed: {stderr}"
|
||||
|
||||
return tmp_path
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Output Assertions
|
||||
# =============================================================================
|
||||
|
||||
252
archivebox/tests/test_savepagenow.py
Normal file
252
archivebox/tests/test_savepagenow.py
Normal file
@@ -0,0 +1,252 @@
|
||||
"""Integration tests for /web/https://... shortcut (Save Page Now)."""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
|
||||
from archivebox.tests.conftest import create_test_url
|
||||
|
||||
|
||||
def _run_savepagenow_script(initialized_archive: Path, request_url: str, expected_url: str, *, login: bool, public_add_view: bool):
|
||||
project_root = Path(__file__).resolve().parents[2]
|
||||
script = textwrap.dedent(
|
||||
f"""
|
||||
import os
|
||||
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.core.settings')
|
||||
|
||||
from archivebox.config.django import setup_django
|
||||
setup_django()
|
||||
|
||||
from django.test import Client
|
||||
from django.contrib.auth import get_user_model
|
||||
from archivebox.core.models import Snapshot
|
||||
|
||||
client = Client()
|
||||
if {login!r}:
|
||||
user = get_user_model().objects.create_user(username='tester', password='pw')
|
||||
client.force_login(user)
|
||||
|
||||
target_url = {request_url!r}
|
||||
|
||||
resp = client.get('/web/' + target_url, HTTP_HOST='web.archivebox.localhost:8000')
|
||||
assert resp.status_code == 302, resp.status_code
|
||||
|
||||
snapshot = Snapshot.objects.filter(url={expected_url!r}).order_by('-created_at').first()
|
||||
if snapshot is None:
|
||||
raise AssertionError(
|
||||
"snapshot not created; status=%s location=%s count=%s"
|
||||
% (
|
||||
resp.status_code,
|
||||
resp.get('Location'),
|
||||
Snapshot.objects.count(),
|
||||
)
|
||||
)
|
||||
assert resp['Location'] == f"/{{snapshot.url_path}}"
|
||||
|
||||
resp2 = client.get('/web/' + target_url, HTTP_HOST='web.archivebox.localhost:8000')
|
||||
assert resp2.status_code == 302, resp2.status_code
|
||||
assert Snapshot.objects.filter(url={expected_url!r}).count() == 1
|
||||
assert resp2['Location'] == f"/{{snapshot.url_path}}"
|
||||
"""
|
||||
)
|
||||
|
||||
env = {
|
||||
**os.environ,
|
||||
'DATA_DIR': str(initialized_archive),
|
||||
'USE_COLOR': 'False',
|
||||
'SHOW_PROGRESS': 'False',
|
||||
'PUBLIC_ADD_VIEW': 'True' if public_add_view else 'False',
|
||||
'SAVE_ARCHIVEDOTORG': 'False',
|
||||
'SAVE_TITLE': 'False',
|
||||
'SAVE_FAVICON': 'False',
|
||||
'SAVE_WGET': 'False',
|
||||
'SAVE_WARC': 'False',
|
||||
'SAVE_PDF': 'False',
|
||||
'SAVE_SCREENSHOT': 'False',
|
||||
'SAVE_DOM': 'False',
|
||||
'SAVE_SINGLEFILE': 'False',
|
||||
'SAVE_READABILITY': 'False',
|
||||
'SAVE_MERCURY': 'False',
|
||||
'SAVE_GIT': 'False',
|
||||
'SAVE_YTDLP': 'False',
|
||||
'SAVE_HEADERS': 'False',
|
||||
'SAVE_HTMLTOTEXT': 'False',
|
||||
}
|
||||
|
||||
return subprocess.run(
|
||||
[sys.executable, '-c', script],
|
||||
cwd=project_root,
|
||||
env=env,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
|
||||
def _run_savepagenow_not_found_script(initialized_archive: Path, request_url: str):
|
||||
project_root = Path(__file__).resolve().parents[2]
|
||||
script = textwrap.dedent(
|
||||
f"""
|
||||
import os
|
||||
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.core.settings')
|
||||
|
||||
from archivebox.config.django import setup_django
|
||||
setup_django()
|
||||
|
||||
from django.test import Client
|
||||
from archivebox.core.models import Snapshot
|
||||
|
||||
client = Client()
|
||||
target_url = {request_url!r}
|
||||
|
||||
resp = client.get('/web/' + target_url, HTTP_HOST='web.archivebox.localhost:8000')
|
||||
assert resp.status_code == 404, resp.status_code
|
||||
assert Snapshot.objects.count() == 0
|
||||
"""
|
||||
)
|
||||
|
||||
env = {
|
||||
**os.environ,
|
||||
'DATA_DIR': str(initialized_archive),
|
||||
'USE_COLOR': 'False',
|
||||
'SHOW_PROGRESS': 'False',
|
||||
'PUBLIC_ADD_VIEW': 'False',
|
||||
'SAVE_ARCHIVEDOTORG': 'False',
|
||||
'SAVE_TITLE': 'False',
|
||||
'SAVE_FAVICON': 'False',
|
||||
'SAVE_WGET': 'False',
|
||||
'SAVE_WARC': 'False',
|
||||
'SAVE_PDF': 'False',
|
||||
'SAVE_SCREENSHOT': 'False',
|
||||
'SAVE_DOM': 'False',
|
||||
'SAVE_SINGLEFILE': 'False',
|
||||
'SAVE_READABILITY': 'False',
|
||||
'SAVE_MERCURY': 'False',
|
||||
'SAVE_GIT': 'False',
|
||||
'SAVE_YTDLP': 'False',
|
||||
'SAVE_HEADERS': 'False',
|
||||
'SAVE_HTMLTOTEXT': 'False',
|
||||
}
|
||||
|
||||
return subprocess.run(
|
||||
[sys.executable, '-c', script],
|
||||
cwd=project_root,
|
||||
env=env,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
|
||||
def _run_savepagenow_existing_snapshot_script(initialized_archive: Path, request_url: str, stored_url: str):
|
||||
project_root = Path(__file__).resolve().parents[2]
|
||||
script = textwrap.dedent(
|
||||
f"""
|
||||
import os
|
||||
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.core.settings')
|
||||
|
||||
from archivebox.config.django import setup_django
|
||||
setup_django()
|
||||
|
||||
from django.test import Client
|
||||
from archivebox.core.models import Snapshot
|
||||
from archivebox.crawls.models import Crawl
|
||||
from archivebox.base_models.models import get_or_create_system_user_pk
|
||||
|
||||
target_url = {request_url!r}
|
||||
stored_url = {stored_url!r}
|
||||
created_by_id = get_or_create_system_user_pk()
|
||||
crawl = Crawl.objects.create(urls=stored_url, created_by_id=created_by_id)
|
||||
snapshot = Snapshot.objects.create(url=stored_url, crawl=crawl)
|
||||
|
||||
client = Client()
|
||||
resp = client.get('/web/' + target_url, HTTP_HOST='web.archivebox.localhost:8000')
|
||||
assert resp.status_code == 302, resp.status_code
|
||||
assert resp['Location'] == f"/{{snapshot.url_path}}"
|
||||
"""
|
||||
)
|
||||
|
||||
env = {
|
||||
**os.environ,
|
||||
'DATA_DIR': str(initialized_archive),
|
||||
'USE_COLOR': 'False',
|
||||
'SHOW_PROGRESS': 'False',
|
||||
'PUBLIC_ADD_VIEW': 'False',
|
||||
'SAVE_ARCHIVEDOTORG': 'False',
|
||||
'SAVE_TITLE': 'False',
|
||||
'SAVE_FAVICON': 'False',
|
||||
'SAVE_WGET': 'False',
|
||||
'SAVE_WARC': 'False',
|
||||
'SAVE_PDF': 'False',
|
||||
'SAVE_SCREENSHOT': 'False',
|
||||
'SAVE_DOM': 'False',
|
||||
'SAVE_SINGLEFILE': 'False',
|
||||
'SAVE_READABILITY': 'False',
|
||||
'SAVE_MERCURY': 'False',
|
||||
'SAVE_GIT': 'False',
|
||||
'SAVE_YTDLP': 'False',
|
||||
'SAVE_HEADERS': 'False',
|
||||
'SAVE_HTMLTOTEXT': 'False',
|
||||
}
|
||||
|
||||
return subprocess.run(
|
||||
[sys.executable, '-c', script],
|
||||
cwd=project_root,
|
||||
env=env,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
|
||||
def test_web_add_creates_and_reuses_snapshot_logged_in(initialized_archive):
|
||||
"""/web/https://... should work for authenticated users even when public add is off."""
|
||||
url = create_test_url(domain='example.com', path='savepagenow-auth')
|
||||
request_url = url.replace('https://', '')
|
||||
result = _run_savepagenow_script(initialized_archive, request_url, url, login=True, public_add_view=False)
|
||||
assert result.returncode == 0, (
|
||||
"SavePageNow shortcut (logged-in) test failed.\n"
|
||||
f"stdout:\n{result.stdout}\n"
|
||||
f"stderr:\n{result.stderr}"
|
||||
)
|
||||
|
||||
|
||||
def test_web_add_creates_and_reuses_snapshot_public(initialized_archive):
|
||||
"""/web/https://... should work when PUBLIC_ADD_VIEW is enabled without login."""
|
||||
url = create_test_url(domain='example.com', path='savepagenow-public')
|
||||
request_url = url.replace('https://', '')
|
||||
result = _run_savepagenow_script(initialized_archive, request_url, url, login=False, public_add_view=True)
|
||||
assert result.returncode == 0, (
|
||||
"SavePageNow shortcut (public add) test failed.\n"
|
||||
f"stdout:\n{result.stdout}\n"
|
||||
f"stderr:\n{result.stderr}"
|
||||
)
|
||||
|
||||
|
||||
def test_web_add_requires_login_when_public_off(initialized_archive):
|
||||
"""/web/https://... should 404 for new URLs when PUBLIC_ADD_VIEW is false and not logged in."""
|
||||
url = create_test_url(domain='example.com', path='savepagenow-404')
|
||||
request_url = url.replace('https://', '')
|
||||
result = _run_savepagenow_not_found_script(initialized_archive, request_url)
|
||||
assert result.returncode == 0, (
|
||||
"SavePageNow shortcut (no public add) test failed.\n"
|
||||
f"stdout:\n{result.stdout}\n"
|
||||
f"stderr:\n{result.stderr}"
|
||||
)
|
||||
|
||||
|
||||
def test_web_add_redirects_existing_snapshot_when_public_off(initialized_archive):
|
||||
"""/web/https://... should redirect to existing snapshot even when public add is off and not logged in."""
|
||||
url = create_test_url(domain='example.com', path='savepagenow-existing')
|
||||
request_url = url.replace('https://', '')
|
||||
result = _run_savepagenow_existing_snapshot_script(initialized_archive, request_url, url)
|
||||
assert result.returncode == 0, (
|
||||
"SavePageNow shortcut (existing snapshot) test failed.\n"
|
||||
f"stdout:\n{result.stdout}\n"
|
||||
f"stderr:\n{result.stderr}"
|
||||
)
|
||||
357
archivebox/tests/test_urls.py
Normal file
357
archivebox/tests/test_urls.py
Normal file
@@ -0,0 +1,357 @@
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[3]
|
||||
|
||||
|
||||
def _merge_pythonpath(env: dict[str, str]) -> dict[str, str]:
|
||||
env.pop("DATA_DIR", None)
|
||||
pythonpath = env.get("PYTHONPATH", "")
|
||||
if pythonpath:
|
||||
env["PYTHONPATH"] = f"{REPO_ROOT}{os.pathsep}{pythonpath}"
|
||||
else:
|
||||
env["PYTHONPATH"] = str(REPO_ROOT)
|
||||
return env
|
||||
|
||||
|
||||
def _run_python(script: str, cwd: Path, timeout: int = 60) -> subprocess.CompletedProcess:
|
||||
env = _merge_pythonpath(os.environ.copy())
|
||||
return subprocess.run(
|
||||
[sys.executable, "-"],
|
||||
cwd=cwd,
|
||||
env=env,
|
||||
input=script,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
|
||||
def _build_script(body: str) -> str:
|
||||
prelude = textwrap.dedent(
|
||||
"""
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "archivebox.core.settings")
|
||||
import django
|
||||
django.setup()
|
||||
|
||||
from django.test import Client
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
from archivebox.core.models import Snapshot, ArchiveResult
|
||||
from archivebox.config.common import SERVER_CONFIG
|
||||
from archivebox.core.host_utils import (
|
||||
get_admin_host,
|
||||
get_api_host,
|
||||
get_web_host,
|
||||
get_snapshot_host,
|
||||
get_original_host,
|
||||
get_listen_subdomain,
|
||||
split_host_port,
|
||||
host_matches,
|
||||
is_snapshot_subdomain,
|
||||
)
|
||||
|
||||
def response_body(resp):
|
||||
if getattr(resp, "streaming", False):
|
||||
return b"".join(resp.streaming_content)
|
||||
return resp.content
|
||||
|
||||
def ensure_admin_user():
|
||||
User = get_user_model()
|
||||
admin, _ = User.objects.get_or_create(
|
||||
username="testadmin",
|
||||
defaults={"email": "admin@example.com", "is_staff": True, "is_superuser": True},
|
||||
)
|
||||
admin.set_password("testpassword")
|
||||
admin.save()
|
||||
return admin
|
||||
|
||||
def get_snapshot():
|
||||
snapshot = Snapshot.objects.order_by("-created_at").first()
|
||||
assert snapshot is not None
|
||||
return snapshot
|
||||
|
||||
def get_snapshot_files(snapshot):
|
||||
output_rel = None
|
||||
for output in snapshot.discover_outputs():
|
||||
candidate = output.get("path")
|
||||
if not candidate:
|
||||
continue
|
||||
if candidate.startswith("responses/"):
|
||||
continue
|
||||
if Path(snapshot.output_dir, candidate).is_file():
|
||||
output_rel = candidate
|
||||
break
|
||||
if output_rel is None:
|
||||
fallback = Path(snapshot.output_dir, "index.jsonl")
|
||||
if fallback.exists():
|
||||
output_rel = "index.jsonl"
|
||||
assert output_rel is not None
|
||||
|
||||
responses_root = Path(snapshot.output_dir) / "responses" / snapshot.domain
|
||||
assert responses_root.exists()
|
||||
response_file = None
|
||||
response_rel = None
|
||||
for candidate in responses_root.rglob("*"):
|
||||
if not candidate.is_file():
|
||||
continue
|
||||
rel = candidate.relative_to(responses_root)
|
||||
if not (Path(snapshot.output_dir) / rel).exists():
|
||||
response_file = candidate
|
||||
response_rel = str(rel)
|
||||
break
|
||||
if response_file is None:
|
||||
response_file = next(p for p in responses_root.rglob("*") if p.is_file())
|
||||
response_rel = str(response_file.relative_to(responses_root))
|
||||
response_output_path = Path(snapshot.output_dir) / response_rel
|
||||
return output_rel, response_file, response_rel, response_output_path
|
||||
"""
|
||||
)
|
||||
return prelude + "\n" + textwrap.dedent(body)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("real_archive_with_example")
|
||||
class TestUrlRouting:
|
||||
data_dir: Path
|
||||
|
||||
def _run(self, body: str, timeout: int = 120) -> None:
|
||||
script = _build_script(body)
|
||||
result = _run_python(script, cwd=self.data_dir, timeout=timeout)
|
||||
assert result.returncode == 0, result.stderr
|
||||
assert "OK" in result.stdout
|
||||
|
||||
def test_host_utils_and_public_redirect(self) -> None:
|
||||
self._run(
|
||||
"""
|
||||
snapshot = get_snapshot()
|
||||
snapshot_id = str(snapshot.id)
|
||||
domain = snapshot.domain
|
||||
|
||||
web_host = get_web_host()
|
||||
admin_host = get_admin_host()
|
||||
api_host = get_api_host()
|
||||
snapshot_host = get_snapshot_host(snapshot_id)
|
||||
original_host = get_original_host(domain)
|
||||
base_host = SERVER_CONFIG.LISTEN_HOST
|
||||
|
||||
host_only, port = split_host_port(base_host)
|
||||
assert host_only == "archivebox.localhost"
|
||||
assert port == "8000"
|
||||
assert web_host == "web.archivebox.localhost:8000"
|
||||
assert admin_host == "admin.archivebox.localhost:8000"
|
||||
assert api_host == "api.archivebox.localhost:8000"
|
||||
assert snapshot_host == f"{snapshot_id}.archivebox.localhost:8000"
|
||||
assert original_host == f"{domain}.archivebox.localhost:8000"
|
||||
assert get_listen_subdomain(web_host) == "web"
|
||||
assert get_listen_subdomain(admin_host) == "admin"
|
||||
assert get_listen_subdomain(api_host) == "api"
|
||||
assert get_listen_subdomain(snapshot_host) == snapshot_id
|
||||
assert get_listen_subdomain(original_host) == domain
|
||||
assert get_listen_subdomain(base_host) == ""
|
||||
assert host_matches(web_host, get_web_host())
|
||||
assert is_snapshot_subdomain(snapshot_id)
|
||||
|
||||
client = Client()
|
||||
resp = client.get("/public.html", HTTP_HOST=web_host)
|
||||
assert resp.status_code in (301, 302)
|
||||
assert resp["Location"].endswith("/public/")
|
||||
|
||||
resp = client.get("/public/", HTTP_HOST=base_host)
|
||||
assert resp.status_code in (301, 302)
|
||||
assert resp["Location"].startswith(f"http://{web_host}/public/")
|
||||
|
||||
resp = client.get("/", HTTP_HOST=api_host)
|
||||
assert resp.status_code in (301, 302)
|
||||
assert resp["Location"].startswith("/api/")
|
||||
|
||||
print("OK")
|
||||
"""
|
||||
)
|
||||
|
||||
def test_web_admin_routing(self) -> None:
|
||||
self._run(
|
||||
"""
|
||||
ensure_admin_user()
|
||||
client = Client()
|
||||
web_host = get_web_host()
|
||||
admin_host = get_admin_host()
|
||||
|
||||
resp = client.get("/add/", HTTP_HOST=web_host)
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = client.get("/admin/login/", HTTP_HOST=web_host)
|
||||
assert resp.status_code in (301, 302)
|
||||
assert admin_host in resp["Location"]
|
||||
|
||||
resp = client.get("/admin/login/", HTTP_HOST=admin_host)
|
||||
assert resp.status_code == 200
|
||||
|
||||
print("OK")
|
||||
"""
|
||||
)
|
||||
|
||||
def test_snapshot_routing_and_hosts(self) -> None:
|
||||
self._run(
|
||||
"""
|
||||
snapshot = get_snapshot()
|
||||
output_rel, response_file, response_rel, response_output_path = get_snapshot_files(snapshot)
|
||||
snapshot_id = str(snapshot.id)
|
||||
snapshot_host = get_snapshot_host(snapshot_id)
|
||||
original_host = get_original_host(snapshot.domain)
|
||||
web_host = get_web_host()
|
||||
|
||||
client = Client()
|
||||
|
||||
snapshot_path = f"/{snapshot.url_path}/"
|
||||
resp = client.get(snapshot_path, HTTP_HOST=web_host)
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = client.get(f"/web/{snapshot.domain}", HTTP_HOST=web_host)
|
||||
assert resp.status_code in (301, 302)
|
||||
assert resp["Location"].endswith(f"/{snapshot.url_path}")
|
||||
|
||||
resp = client.get(f"/{snapshot.url_path}", HTTP_HOST=web_host)
|
||||
assert resp.status_code == 200
|
||||
|
||||
date_segment = snapshot.url_path.split("/")[1]
|
||||
resp = client.get(f"/web/{date_segment}/{date_segment}/{snapshot_id}/", HTTP_HOST=web_host)
|
||||
assert resp.status_code == 404
|
||||
|
||||
resp = client.get(f"/{snapshot.url_path}/{output_rel}", HTTP_HOST=web_host)
|
||||
assert resp.status_code in (301, 302)
|
||||
assert snapshot_host in resp["Location"]
|
||||
|
||||
resp = client.get(f"/{output_rel}", HTTP_HOST=snapshot_host)
|
||||
assert resp.status_code == 200
|
||||
assert response_body(resp) == Path(snapshot.output_dir, output_rel).read_bytes()
|
||||
|
||||
resp = client.get(f"/{response_rel}", HTTP_HOST=snapshot_host)
|
||||
assert resp.status_code == 200
|
||||
snapshot_body = response_body(resp)
|
||||
if response_output_path.exists():
|
||||
assert snapshot_body == response_output_path.read_bytes()
|
||||
else:
|
||||
assert snapshot_body == response_file.read_bytes()
|
||||
|
||||
resp = client.get(f"/{response_rel}", HTTP_HOST=original_host)
|
||||
assert resp.status_code == 200
|
||||
assert response_body(resp) == response_file.read_bytes()
|
||||
|
||||
print("OK")
|
||||
"""
|
||||
)
|
||||
|
||||
def test_template_and_admin_links(self) -> None:
|
||||
self._run(
|
||||
"""
|
||||
ensure_admin_user()
|
||||
snapshot = get_snapshot()
|
||||
snapshot.write_html_details()
|
||||
snapshot_id = str(snapshot.id)
|
||||
snapshot_host = get_snapshot_host(snapshot_id)
|
||||
admin_host = get_admin_host()
|
||||
web_host = get_web_host()
|
||||
|
||||
client = Client()
|
||||
|
||||
resp = client.get("/public/", HTTP_HOST=web_host)
|
||||
assert resp.status_code == 200
|
||||
public_html = response_body(resp).decode("utf-8", "ignore")
|
||||
assert "http://web.archivebox.localhost:8000" in public_html
|
||||
|
||||
resp = client.get(f"/{snapshot.url_path}/index.html", HTTP_HOST=web_host)
|
||||
assert resp.status_code == 200
|
||||
live_html = response_body(resp).decode("utf-8", "ignore")
|
||||
assert f"http://{snapshot_host}/" in live_html
|
||||
assert "http://web.archivebox.localhost:8000" in live_html
|
||||
|
||||
static_html = Path(snapshot.output_dir, "index.html").read_text(encoding="utf-8", errors="ignore")
|
||||
assert f"http://{snapshot_host}/" in static_html
|
||||
|
||||
client.login(username="testadmin", password="testpassword")
|
||||
resp = client.get(f"/admin/core/snapshot/{snapshot_id}/change/", HTTP_HOST=admin_host)
|
||||
assert resp.status_code == 200
|
||||
admin_html = response_body(resp).decode("utf-8", "ignore")
|
||||
assert f"http://web.archivebox.localhost:8000/{snapshot.archive_path}" in admin_html
|
||||
assert f"http://{snapshot_host}/" in admin_html
|
||||
|
||||
result = ArchiveResult.objects.filter(snapshot=snapshot).first()
|
||||
assert result is not None
|
||||
resp = client.get(f"/admin/core/archiveresult/{result.id}/change/", HTTP_HOST=admin_host)
|
||||
assert resp.status_code == 200
|
||||
ar_html = response_body(resp).decode("utf-8", "ignore")
|
||||
assert f"http://{snapshot_host}/" in ar_html
|
||||
|
||||
print("OK")
|
||||
"""
|
||||
)
|
||||
|
||||
def test_api_available_on_admin_and_api_hosts(self) -> None:
|
||||
self._run(
|
||||
"""
|
||||
client = Client()
|
||||
admin_host = get_admin_host()
|
||||
api_host = get_api_host()
|
||||
|
||||
resp = client.get("/api/v1/docs", HTTP_HOST=admin_host)
|
||||
assert resp.status_code == 200
|
||||
|
||||
resp = client.get("/api/v1/docs", HTTP_HOST=api_host)
|
||||
assert resp.status_code == 200
|
||||
|
||||
print("OK")
|
||||
"""
|
||||
)
|
||||
|
||||
def test_api_post_with_token_on_admin_and_api_hosts(self) -> None:
|
||||
self._run(
|
||||
"""
|
||||
ensure_admin_user()
|
||||
from archivebox.api.auth import get_or_create_api_token
|
||||
|
||||
token = get_or_create_api_token(get_user_model().objects.get(username="testadmin"))
|
||||
assert token is not None
|
||||
|
||||
client = Client()
|
||||
admin_host = get_admin_host()
|
||||
api_host = get_api_host()
|
||||
|
||||
payload = '{"name": "apitest-tag"}'
|
||||
headers = {"HTTP_X_ARCHIVEBOX_API_KEY": token.token}
|
||||
|
||||
resp = client.post(
|
||||
"/api/v1/core/tags/create/",
|
||||
data=payload,
|
||||
content_type="application/json",
|
||||
HTTP_HOST=admin_host,
|
||||
**headers,
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data.get("success") is True
|
||||
assert data.get("tag_name") == "apitest-tag"
|
||||
|
||||
resp = client.post(
|
||||
"/api/v1/core/tags/create/",
|
||||
data=payload,
|
||||
content_type="application/json",
|
||||
HTTP_HOST=api_host,
|
||||
**headers,
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data.get("success") is True
|
||||
assert data.get("tag_name") == "apitest-tag"
|
||||
|
||||
print("OK")
|
||||
"""
|
||||
)
|
||||
Reference in New Issue
Block a user