From ad41b15581d8072b26e59fe9992e2e4ac24a21e5 Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Sun, 15 Mar 2026 23:34:40 -0700 Subject: [PATCH] Add configurable server security modes --- archivebox/config/common.py | 89 ++- archivebox/core/host_utils.py | 24 + archivebox/core/middleware.py | 41 +- archivebox/core/settings.py | 1 + archivebox/core/urls.py | 4 +- archivebox/core/views.py | 174 ++++-- archivebox/misc/serve_static.py | 75 ++- .../tests/test_server_security_browser.py | 578 ++++++++++++++++++ archivebox/tests/test_urls.py | 235 ++++++- 9 files changed, 1139 insertions(+), 82 deletions(-) create mode 100644 archivebox/tests/test_server_security_browser.py diff --git a/archivebox/config/common.py b/archivebox/config/common.py index b0c87ed1..1546332d 100644 --- a/archivebox/config/common.py +++ b/archivebox/config/common.py @@ -3,7 +3,7 @@ __package__ = "archivebox.config" import re import sys import shutil -from typing import Dict, Optional, List +from typing import ClassVar, Dict, Optional, List from pathlib import Path from rich import print @@ -97,6 +97,13 @@ GENERAL_CONFIG = GeneralConfig() class ServerConfig(BaseConfigSet): toml_section_header: str = "SERVER_CONFIG" + SERVER_SECURITY_MODES: ClassVar[tuple[str, ...]] = ( + "safe-subdomains-fullreplay", + "safe-onedomain-nojsreplay", + "unsafe-onedomain-noadmin", + "danger-onedomain-fullreplay", + ) + SECRET_KEY: str = Field(default_factory=lambda: get_random_string(50, "abcdefghijklmnopqrstuvwxyz0123456789_")) BIND_ADDR: str = Field(default="127.0.0.1:8000") LISTEN_HOST: str = Field(default="archivebox.localhost:8000") @@ -104,6 +111,7 @@ class ServerConfig(BaseConfigSet): ARCHIVE_BASE_URL: str = Field(default="") ALLOWED_HOSTS: str = Field(default="*") CSRF_TRUSTED_ORIGINS: str = Field(default="http://admin.archivebox.localhost:8000") + SERVER_SECURITY_MODE: str = Field(default="safe-subdomains-fullreplay") SNAPSHOTS_PER_PAGE: int = Field(default=40) PREVIEW_ORIGINALS: bool = Field(default=True) @@ -123,10 +131,89 @@ class ServerConfig(BaseConfigSet): REVERSE_PROXY_WHITELIST: str = Field(default="") LOGOUT_REDIRECT_URL: str = Field(default="/") + @field_validator("SERVER_SECURITY_MODE", mode="after") + def validate_server_security_mode(cls, v: str) -> str: + mode = (v or "").strip().lower() + if mode not in cls.SERVER_SECURITY_MODES: + raise ValueError(f"SERVER_SECURITY_MODE must be one of: {', '.join(cls.SERVER_SECURITY_MODES)}") + return mode + + @property + def USES_SUBDOMAIN_ROUTING(self) -> bool: + return self.SERVER_SECURITY_MODE == "safe-subdomains-fullreplay" + + @property + def ENABLES_FULL_JS_REPLAY(self) -> bool: + return self.SERVER_SECURITY_MODE in ( + "safe-subdomains-fullreplay", + "unsafe-onedomain-noadmin", + "danger-onedomain-fullreplay", + ) + + @property + def CONTROL_PLANE_ENABLED(self) -> bool: + return self.SERVER_SECURITY_MODE != "unsafe-onedomain-noadmin" + + @property + def BLOCK_UNSAFE_METHODS(self) -> bool: + return self.SERVER_SECURITY_MODE == "unsafe-onedomain-noadmin" + + @property + def SHOULD_NEUTER_RISKY_REPLAY(self) -> bool: + return self.SERVER_SECURITY_MODE == "safe-onedomain-nojsreplay" + + @property + def IS_UNSAFE_MODE(self) -> bool: + return self.SERVER_SECURITY_MODE == "unsafe-onedomain-noadmin" + + @property + def IS_DANGEROUS_MODE(self) -> bool: + return self.SERVER_SECURITY_MODE == "danger-onedomain-fullreplay" + + @property + def IS_LOWER_SECURITY_MODE(self) -> bool: + return self.SERVER_SECURITY_MODE in ( + "unsafe-onedomain-noadmin", + "danger-onedomain-fullreplay", + ) + SERVER_CONFIG = ServerConfig() +def _print_server_security_mode_warning() -> None: + if not SERVER_CONFIG.IS_LOWER_SECURITY_MODE: + return + + print( + f"[yellow][!] WARNING: ArchiveBox is running with SERVER_SECURITY_MODE={SERVER_CONFIG.SERVER_SECURITY_MODE}[/yellow]", + file=sys.stderr, + ) + print( + "[yellow] Archived pages may share an origin with privileged app routes in this mode.[/yellow]", + file=sys.stderr, + ) + print( + "[yellow] To switch to the safer isolated setup:[/yellow]", + file=sys.stderr, + ) + print( + "[yellow] 1. Set SERVER_SECURITY_MODE=safe-subdomains-fullreplay[/yellow]", + file=sys.stderr, + ) + print( + "[yellow] 2. Point *.archivebox.localhost (or your chosen base domain) at this server[/yellow]", + file=sys.stderr, + ) + print( + "[yellow] 3. Configure wildcard DNS/TLS or your reverse proxy so admin., web., api., and snapshot subdomains resolve[/yellow]", + file=sys.stderr, + ) + + +_print_server_security_mode_warning() + + class ArchivingConfig(BaseConfigSet): toml_section_header: str = "ARCHIVING_CONFIG" diff --git a/archivebox/core/host_utils.py b/archivebox/core/host_utils.py index 2e723d05..2cf8131b 100644 --- a/archivebox/core/host_utils.py +++ b/archivebox/core/host_utils.py @@ -55,6 +55,8 @@ def _build_listen_host(subdomain: str | None) -> str: def get_admin_host() -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return get_listen_host().lower() override = _normalize_base_url(SERVER_CONFIG.ADMIN_BASE_URL) if override: return urlparse(override).netloc.lower() @@ -62,23 +64,33 @@ def get_admin_host() -> str: def get_web_host() -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return get_listen_host().lower() override = _normalize_base_url(SERVER_CONFIG.ARCHIVE_BASE_URL) if override: return urlparse(override).netloc.lower() return _build_listen_host("web") def get_api_host() -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return get_listen_host().lower() return _build_listen_host("api") def get_public_host() -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return get_listen_host().lower() return _build_listen_host("public") def get_snapshot_host(snapshot_id: str) -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return get_listen_host().lower() return _build_listen_host(snapshot_id) def get_original_host(domain: str) -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return get_listen_host().lower() return _build_listen_host(domain) @@ -87,6 +99,8 @@ def is_snapshot_subdomain(subdomain: str) -> bool: def get_listen_subdomain(request_host: str) -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return "" req_host, req_port = split_host_port(request_host) listen_host, listen_port = get_listen_parts() if not listen_host: @@ -127,6 +141,8 @@ def _build_base_url_for_host(host: str, request=None) -> str: def get_admin_base_url(request=None) -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return _build_base_url_for_host(get_listen_host(), request=request) override = _normalize_base_url(SERVER_CONFIG.ADMIN_BASE_URL) if override: return override @@ -134,12 +150,16 @@ def get_admin_base_url(request=None) -> str: def get_web_base_url(request=None) -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return _build_base_url_for_host(get_listen_host(), request=request) override = _normalize_base_url(SERVER_CONFIG.ARCHIVE_BASE_URL) if override: return override return _build_base_url_for_host(get_web_host(), request=request) def get_api_base_url(request=None) -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return _build_base_url_for_host(get_listen_host(), request=request) return _build_base_url_for_host(get_api_host(), request=request) @@ -149,10 +169,14 @@ def get_archive_base_url(request=None) -> str: def get_snapshot_base_url(snapshot_id: str, request=None) -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return _build_url(get_web_base_url(request=request), f"/snapshot/{snapshot_id}") return _build_base_url_for_host(get_snapshot_host(snapshot_id), request=request) def get_original_base_url(domain: str, request=None) -> str: + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + return _build_url(get_web_base_url(request=request), f"/original/{domain}") return _build_base_url_for_host(get_original_host(domain), request=request) diff --git a/archivebox/core/middleware.py b/archivebox/core/middleware.py index 62accedc..b7778966 100644 --- a/archivebox/core/middleware.py +++ b/archivebox/core/middleware.py @@ -10,7 +10,7 @@ from django.core.exceptions import ImproperlyConfigured from django.shortcuts import redirect from django.contrib.staticfiles import finders from django.utils.http import http_date -from django.http import HttpResponseNotModified +from django.http import HttpResponseForbidden, HttpResponseNotModified from archivebox.config.common import SERVER_CONFIG from archivebox.config import VERSION @@ -26,6 +26,7 @@ from archivebox.core.host_utils import ( get_web_host, host_matches, is_snapshot_subdomain, + split_host_port, ) from archivebox.core.views import SnapshotHostView, OriginalDomainHostView @@ -90,6 +91,29 @@ def CacheControlMiddleware(get_response): return middleware +def ServerSecurityModeMiddleware(get_response): + blocked_prefixes = ("/admin", "/accounts", "/api", "/add", "/web") + allowed_methods = {"GET", "HEAD", "OPTIONS"} + + def middleware(request): + if SERVER_CONFIG.CONTROL_PLANE_ENABLED: + return get_response(request) + + request.user = AnonymousUser() + request._cached_user = request.user + + if request.method.upper() not in allowed_methods: + return HttpResponseForbidden("ArchiveBox is running with the control plane disabled in this security mode.") + + for prefix in blocked_prefixes: + if request.path == prefix or request.path.startswith(f"{prefix}/"): + return HttpResponseForbidden("ArchiveBox is running with the control plane disabled in this security mode.") + + return get_response(request) + + return middleware + + def HostRoutingMiddleware(get_response): def middleware(request): request_host = (request.get_host() or "").lower() @@ -100,6 +124,21 @@ def HostRoutingMiddleware(get_response): listen_host = get_listen_host() subdomain = get_listen_subdomain(request_host) + if not SERVER_CONFIG.USES_SUBDOMAIN_ROUTING: + if host_matches(request_host, listen_host): + return get_response(request) + + req_host, req_port = split_host_port(request_host) + listen_host_only, listen_port = split_host_port(listen_host) + if req_host.endswith(f".{listen_host_only}"): + if not listen_port or not req_port or listen_port == req_port: + target = build_web_url(request.path, request=request) + if request.META.get("QUERY_STRING"): + target = f"{target}?{request.META['QUERY_STRING']}" + return redirect(target) + + return get_response(request) + if host_matches(request_host, admin_host): return get_response(request) diff --git a/archivebox/core/settings.py b/archivebox/core/settings.py index dc9c23cf..7f855b94 100644 --- a/archivebox/core/settings.py +++ b/archivebox/core/settings.py @@ -86,6 +86,7 @@ MIDDLEWARE = [ "django.middleware.csrf.CsrfViewMiddleware", "django.contrib.auth.middleware.AuthenticationMiddleware", "archivebox.core.middleware.ReverseProxyAuthMiddleware", + "archivebox.core.middleware.ServerSecurityModeMiddleware", "archivebox.core.middleware.HostRoutingMiddleware", "django.contrib.messages.middleware.MessageMiddleware", "archivebox.core.middleware.CacheControlMiddleware", diff --git a/archivebox/core/urls.py b/archivebox/core/urls.py index 838056e1..e8072d14 100644 --- a/archivebox/core/urls.py +++ b/archivebox/core/urls.py @@ -9,7 +9,7 @@ from django.http import HttpRequest from archivebox.misc.serve_static import serve_static from archivebox.core.admin_site import archivebox_admin -from archivebox.core.views import HomepageView, SnapshotView, SnapshotPathView, PublicIndexView, AddView, WebAddView, HealthCheckView, live_progress_view +from archivebox.core.views import HomepageView, SnapshotView, SnapshotPathView, SnapshotReplayView, OriginalDomainReplayView, PublicIndexView, AddView, WebAddView, HealthCheckView, live_progress_view # GLOBAL_CONTEXT doesn't work as-is, disabled for now: https://github.com/ArchiveBox/ArchiveBox/discussions/1306 @@ -33,6 +33,8 @@ urlpatterns = [ path('archive/', RedirectView.as_view(url='/')), path('archive/', SnapshotView.as_view(), name='Snapshot'), + re_path(r'^snapshot\/(?P[0-9a-fA-F-]{8,36})(?:\/(?P.*))?$', SnapshotReplayView.as_view(), name='snapshot-replay'), + re_path(r'^original\/(?P[^/]+)(?:\/(?P.*))?$', OriginalDomainReplayView.as_view(), name='original-replay'), re_path(r'^web/(?P(?!\d{4}(?:\d{2})?(?:\d{2})?(?:/|$)).+)$', WebAddView.as_view(), name='web-add'), re_path(r'^(?P[^/]+)/(?P\d{4}(?:\d{2})?(?:\d{2})?)/(?Phttps?://.*)$', SnapshotPathView.as_view(), name='snapshot-path-url'), re_path(r'^(?P[^/]+)/(?P\d{4}(?:\d{2})?(?:\d{2})?)/(?P[^/]+)(?:/(?P[0-9a-fA-F-]{8,36})(?:/(?P.*))?)?$', SnapshotPathView.as_view(), name='snapshot-path'), diff --git a/archivebox/core/views.py b/archivebox/core/views.py index f1015224..d9351f7a 100644 --- a/archivebox/core/views.py +++ b/archivebox/core/views.py @@ -52,15 +52,21 @@ def _files_index_target(snapshot: Snapshot, archivefile: str | None) -> str: return target +def _admin_login_redirect_or_forbidden(request: HttpRequest): + if SERVER_CONFIG.CONTROL_PLANE_ENABLED: + return redirect(f'/admin/login/?next={request.path}') + return HttpResponseForbidden("ArchiveBox is running with the control plane disabled in this security mode.") + + class HomepageView(View): def get(self, request): - if request.user.is_authenticated: + if request.user.is_authenticated and SERVER_CONFIG.CONTROL_PLANE_ENABLED: return redirect('/admin/core/snapshot/') if SERVER_CONFIG.PUBLIC_INDEX: return redirect('/public') - return redirect(f'/admin/login/?next={request.path}') + return _admin_login_redirect_or_forbidden(request) class SnapshotView(View): @@ -277,7 +283,7 @@ class SnapshotView(View): def get(self, request, path): if not request.user.is_authenticated and not SERVER_CONFIG.PUBLIC_SNAPSHOTS: - return redirect(f'/admin/login/?next={request.path}') + return _admin_login_redirect_or_forbidden(request) snapshot = None @@ -308,7 +314,7 @@ class SnapshotView(View): if request.GET.get('files'): target_path = _files_index_target(snapshot, archivefile) response = serve_static_with_byterange_support( - request, target_path, document_root=snapshot.output_dir, show_indexes=True, + request, target_path, document_root=snapshot.output_dir, show_indexes=True, is_archive_replay=True, ) elif archivefile == 'index.html': # if they requested snapshot index, serve live rendered template instead of static html @@ -474,7 +480,7 @@ class SnapshotPathView(View): def get(self, request, username: str, date: str | None = None, domain: str | None = None, snapshot_id: str | None = None, path: str = "", url: str | None = None): if not request.user.is_authenticated and not SERVER_CONFIG.PUBLIC_SNAPSHOTS: - return redirect(f'/admin/login/?next={request.path}') + return _admin_login_redirect_or_forbidden(request) if username == 'system': return redirect(request.path.replace('/system/', '/web/', 1)) @@ -573,14 +579,14 @@ class SnapshotPathView(View): if request.GET.get('files'): target_path = _files_index_target(snapshot, archivefile) return serve_static_with_byterange_support( - request, target_path, document_root=snapshot.output_dir, show_indexes=True, + request, target_path, document_root=snapshot.output_dir, show_indexes=True, is_archive_replay=True, ) if archivefile == "index.html": return SnapshotView.render_live_index(request, snapshot) return serve_static_with_byterange_support( - request, archivefile, document_root=snapshot.output_dir, show_indexes=True, + request, archivefile, document_root=snapshot.output_dir, show_indexes=True, is_archive_replay=True, ) @@ -670,6 +676,7 @@ def _serve_responses_path(request, responses_root: Path, rel_path: str, show_ind candidate, document_root=str(responses_root), show_indexes=show_indexes, + is_archive_replay=True, ) except Http404: pass @@ -682,18 +689,85 @@ def _serve_responses_path(request, responses_root: Path, rel_path: str, show_ind rel_dir, document_root=str(responses_root), show_indexes=True, + is_archive_replay=True, ) except Http404: return None return None +def _serve_snapshot_replay(request: HttpRequest, snapshot: Snapshot, path: str = ""): + rel_path = path or "" + show_indexes = bool(request.GET.get("files")) + if not rel_path or rel_path.endswith("/"): + if show_indexes: + rel_path = rel_path.rstrip("/") + else: + rel_path = f"{rel_path}index.html" + rel_path = _safe_archive_relpath(rel_path) + if rel_path is None: + raise Http404 + + try: + return serve_static_with_byterange_support( + request, + rel_path, + document_root=snapshot.output_dir, + show_indexes=show_indexes, + is_archive_replay=True, + ) + except Http404: + pass + + host = urlparse(snapshot.url).hostname or snapshot.domain + responses_root = Path(snapshot.output_dir) / "responses" / host + if responses_root.exists(): + response = _serve_responses_path(request, responses_root, rel_path, show_indexes) + if response is not None: + return response + + raise Http404 + + +def _serve_original_domain_replay(request: HttpRequest, domain: str, path: str = ""): + rel_path = path or "" + if not rel_path or rel_path.endswith("/"): + rel_path = f"{rel_path}index.html" + rel_path = _safe_archive_relpath(rel_path) + if rel_path is None: + raise Http404 + + domain = domain.lower() + match = _latest_response_match(domain, rel_path) + if not match and "." not in Path(rel_path).name: + index_path = f"{rel_path.rstrip('/')}/index.html" + match = _latest_response_match(domain, index_path) + if not match and "." not in Path(rel_path).name: + html_path = f"{rel_path}.html" + match = _latest_response_match(domain, html_path) + + show_indexes = bool(request.GET.get("files")) + if match: + responses_root, rel_to_root = match + response = _serve_responses_path(request, responses_root, str(rel_to_root), show_indexes) + if response is not None: + return response + + responses_root = _latest_responses_root(domain) + if responses_root: + response = _serve_responses_path(request, responses_root, rel_path, show_indexes) + if response is not None: + return response + + raise Http404 + + class SnapshotHostView(View): """Serve snapshot directory contents on ./.""" def get(self, request, snapshot_id: str, path: str = ""): if not request.user.is_authenticated and not SERVER_CONFIG.PUBLIC_SNAPSHOTS: - return HttpResponseForbidden("Public snapshots are disabled.") + return _admin_login_redirect_or_forbidden(request) snapshot = None if snapshot_id: try: @@ -708,37 +782,30 @@ class SnapshotHostView(View): if not snapshot: raise Http404 + return _serve_snapshot_replay(request, snapshot, path) - rel_path = path or "" - show_indexes = bool(request.GET.get("files")) - if not rel_path or rel_path.endswith("/"): - if show_indexes: - rel_path = rel_path.rstrip("/") - else: - rel_path = f"{rel_path}index.html" - rel_path = _safe_archive_relpath(rel_path) - if rel_path is None: - raise Http404 + +class SnapshotReplayView(View): + """Serve snapshot directory contents on a one-domain replay path.""" + + def get(self, request, snapshot_id: str, path: str = ""): + if not request.user.is_authenticated and not SERVER_CONFIG.PUBLIC_SNAPSHOTS: + return _admin_login_redirect_or_forbidden(request) try: - return serve_static_with_byterange_support( - request, - rel_path, - document_root=snapshot.output_dir, - show_indexes=show_indexes, - ) - except Http404: - pass + snapshot = Snapshot.objects.get(pk=snapshot_id) + except Snapshot.DoesNotExist: + try: + snapshot = Snapshot.objects.get(id__startswith=snapshot_id) + except Snapshot.DoesNotExist: + raise Http404 + except Snapshot.MultipleObjectsReturned: + snapshot = Snapshot.objects.filter(id__startswith=snapshot_id).first() - # Fallback to responses// - host = urlparse(snapshot.url).hostname or snapshot.domain - responses_root = Path(snapshot.output_dir) / "responses" / host - if responses_root.exists(): - response = _serve_responses_path(request, responses_root, rel_path, show_indexes) - if response is not None: - return response + if snapshot is None: + raise Http404 - raise Http404 + return _serve_snapshot_replay(request, snapshot, path) class OriginalDomainHostView(View): @@ -746,38 +813,17 @@ class OriginalDomainHostView(View): def get(self, request, domain: str, path: str = ""): if not request.user.is_authenticated and not SERVER_CONFIG.PUBLIC_SNAPSHOTS: - return HttpResponseForbidden("Public snapshots are disabled.") - rel_path = path or "" - if not rel_path or rel_path.endswith("/"): - rel_path = f"{rel_path}index.html" - rel_path = _safe_archive_relpath(rel_path) - if rel_path is None: - raise Http404 + return _admin_login_redirect_or_forbidden(request) + return _serve_original_domain_replay(request, domain, path) - domain = domain.lower() - match = _latest_response_match(domain, rel_path) - if not match and "." not in Path(rel_path).name: - index_path = f"{rel_path.rstrip('/')}/index.html" - match = _latest_response_match(domain, index_path) - if not match and "." not in Path(rel_path).name: - html_path = f"{rel_path}.html" - match = _latest_response_match(domain, html_path) - show_indexes = bool(request.GET.get("files")) - if match: - responses_root, rel_to_root = match - response = _serve_responses_path(request, responses_root, str(rel_to_root), show_indexes) - if response is not None: - return response +class OriginalDomainReplayView(View): + """Serve original-domain replay content on a one-domain replay path.""" - # If no direct match, try serving directory index from latest responses root - responses_root = _latest_responses_root(domain) - if responses_root: - response = _serve_responses_path(request, responses_root, rel_path, show_indexes) - if response is not None: - return response - - raise Http404 + def get(self, request, domain: str, path: str = ""): + if not request.user.is_authenticated and not SERVER_CONFIG.PUBLIC_SNAPSHOTS: + return _admin_login_redirect_or_forbidden(request) + return _serve_original_domain_replay(request, domain, path) class PublicIndexView(ListView): @@ -834,7 +880,7 @@ class PublicIndexView(ListView): response = super().get(*args, **kwargs) return response else: - return redirect(f'/admin/login/?next={self.request.path}') + return _admin_login_redirect_or_forbidden(self.request) @method_decorator(csrf_exempt, name='dispatch') class AddView(UserPassesTestMixin, FormView): diff --git a/archivebox/misc/serve_static.py b/archivebox/misc/serve_static.py index 2c6a4662..459eefe4 100644 --- a/archivebox/misc/serve_static.py +++ b/archivebox/misc/serve_static.py @@ -81,6 +81,17 @@ MARKDOWN_BOLD_RE = re.compile(r'\*\*([^*]+)\*\*') MARKDOWN_ITALIC_RE = re.compile(r'(?]*>') HTML_BODY_RE = re.compile(r']*>(.*)', flags=re.IGNORECASE | re.DOTALL) +RISKY_REPLAY_MIMETYPES = { + "text/html", + "application/xhtml+xml", + "image/svg+xml", +} +RISKY_REPLAY_EXTENSIONS = {".html", ".htm", ".xhtml", ".svg", ".svgz"} +RISKY_REPLAY_MARKERS = ( + " str: @@ -278,7 +289,56 @@ def _render_markdown_document(markdown_text: str) -> str: return wrapped -def serve_static_with_byterange_support(request, path, document_root=None, show_indexes=False): +def _content_type_base(content_type: str) -> str: + return (content_type or "").split(";", 1)[0].strip().lower() + + +def _is_risky_replay_document(fullpath: Path, content_type: str) -> bool: + if fullpath.suffix.lower() in RISKY_REPLAY_EXTENSIONS: + return True + + if _content_type_base(content_type) in RISKY_REPLAY_MIMETYPES: + return True + + # Unknown archived response paths often have no extension. Sniff a small prefix + # so one-domain no-JS mode still catches HTML/SVG documents. + try: + head = fullpath.read_bytes()[:4096].decode("utf-8", errors="ignore").lower() + except Exception: + return False + + return any(marker in head for marker in RISKY_REPLAY_MARKERS) + + +def _apply_archive_replay_headers(response: HttpResponse, *, fullpath: Path, content_type: str, is_archive_replay: bool) -> HttpResponse: + if not is_archive_replay: + return response + + response.headers.setdefault("X-Content-Type-Options", "nosniff") + response.headers.setdefault("X-ArchiveBox-Security-Mode", SERVER_CONFIG.SERVER_SECURITY_MODE) + + if SERVER_CONFIG.SHOULD_NEUTER_RISKY_REPLAY and _is_risky_replay_document(fullpath, content_type): + response.headers["Content-Security-Policy"] = ( + "sandbox; " + "default-src 'self' data: blob:; " + "script-src 'none'; " + "object-src 'none'; " + "base-uri 'none'; " + "form-action 'none'; " + "connect-src 'none'; " + "worker-src 'none'; " + "frame-ancestors 'self'; " + "style-src 'self' 'unsafe-inline' data: blob:; " + "img-src 'self' data: blob:; " + "media-src 'self' data: blob:; " + "font-src 'self' data: blob:;" + ) + response.headers.setdefault("Referrer-Policy", "no-referrer") + + return response + + +def serve_static_with_byterange_support(request, path, document_root=None, show_indexes=False, is_archive_replay: bool=False): """ Overrides Django's built-in django.views.static.serve function to support byte range requests. This allows you to do things like seek into the middle of a huge mp4 or WACZ without downloading the whole file. @@ -289,7 +349,8 @@ def serve_static_with_byterange_support(request, path, document_root=None, show_ fullpath = Path(safe_join(document_root, path)) if os.access(fullpath, os.R_OK) and fullpath.is_dir(): if show_indexes: - return static.directory_index(path, fullpath) + response = static.directory_index(path, fullpath) + return _apply_archive_replay_headers(response, fullpath=fullpath, content_type="text/html", is_archive_replay=is_archive_replay) raise Http404(_("Directory indexes are not allowed here.")) if not os.access(fullpath, os.R_OK): raise Http404(_("ā€œ%(path)sā€ does not exist") % {"path": fullpath}) @@ -312,7 +373,7 @@ def serve_static_with_byterange_support(request, path, document_root=None, show_ not_modified.headers["ETag"] = etag not_modified.headers["Cache-Control"] = f"{_cache_policy()}, max-age=31536000, immutable" not_modified.headers["Last-Modified"] = http_date(statobj.st_mtime) - return not_modified + return _apply_archive_replay_headers(not_modified, fullpath=fullpath, content_type="", is_archive_replay=is_archive_replay) content_type, encoding = mimetypes.guess_type(str(fullpath)) content_type = content_type or "application/octet-stream" @@ -333,7 +394,7 @@ def serve_static_with_byterange_support(request, path, document_root=None, show_ # Respect the If-Modified-Since header for non-markdown responses. if not (content_type.startswith("text/plain") or content_type.startswith("text/html")): if not static.was_modified_since(request.META.get("HTTP_IF_MODIFIED_SINCE"), statobj.st_mtime): - return HttpResponseNotModified() + return _apply_archive_replay_headers(HttpResponseNotModified(), fullpath=fullpath, content_type=content_type, is_archive_replay=is_archive_replay) # Heuristic fix: some archived HTML outputs (e.g. mercury content.html) # are stored with HTML-escaped markup or markdown sources. If so, render sensibly. @@ -360,7 +421,7 @@ def serve_static_with_byterange_support(request, path, document_root=None, show_ response.headers["Content-Disposition"] = f'inline; filename="{fullpath.name}"' if encoding: response.headers["Content-Encoding"] = encoding - return response + return _apply_archive_replay_headers(response, fullpath=fullpath, content_type="text/html; charset=utf-8", is_archive_replay=is_archive_replay) if escaped_count and escaped_count > tag_count * 2: response = HttpResponse(decoded, content_type=content_type) response.headers["Last-Modified"] = http_date(statobj.st_mtime) @@ -372,7 +433,7 @@ def serve_static_with_byterange_support(request, path, document_root=None, show_ response.headers["Content-Disposition"] = f'inline; filename="{fullpath.name}"' if encoding: response.headers["Content-Encoding"] = encoding - return response + return _apply_archive_replay_headers(response, fullpath=fullpath, content_type=content_type, is_archive_replay=is_archive_replay) except Exception: pass @@ -416,7 +477,7 @@ def serve_static_with_byterange_support(request, path, document_root=None, show_ response.status_code = 206 if encoding: response.headers["Content-Encoding"] = encoding - return response + return _apply_archive_replay_headers(response, fullpath=fullpath, content_type=content_type, is_archive_replay=is_archive_replay) def serve_static(request, path, **kwargs): diff --git a/archivebox/tests/test_server_security_browser.py b/archivebox/tests/test_server_security_browser.py new file mode 100644 index 00000000..55244117 --- /dev/null +++ b/archivebox/tests/test_server_security_browser.py @@ -0,0 +1,578 @@ +#!/usr/bin/env python3 +"""Browser-level security mode tests using the existing Node/Puppeteer runtime.""" + +from __future__ import annotations + +import json +import os +import shutil +import signal +import socket +import subprocess +import sys +import textwrap +import time +from pathlib import Path +from urllib.parse import urlencode + +import pytest +import requests + +from .conftest import _ensure_puppeteer, _find_cached_chromium, _find_system_browser, run_python_cwd + + +PUPPETEER_PROBE_SCRIPT = """\ +const fs = require("node:fs"); +const puppeteer = require("puppeteer"); + +async function login(page, config) { + const result = { + reachable: false, + succeeded: false, + finalUrl: null, + status: null, + error: null, + }; + + try { + const response = await page.goto(config.adminLoginUrl, { + waitUntil: "networkidle2", + timeout: 15000, + }); + result.reachable = true; + result.status = response ? response.status() : null; + + const usernameInput = await page.$('input[name="username"]'); + const passwordInput = await page.$('input[name="password"]'); + if (!usernameInput || !passwordInput) { + result.finalUrl = page.url(); + return result; + } + + await usernameInput.type(config.username); + await passwordInput.type(config.password); + await Promise.all([ + page.waitForNavigation({waitUntil: "networkidle2", timeout: 15000}), + page.click('button[type="submit"], input[type="submit"]'), + ]); + + result.finalUrl = page.url(); + result.succeeded = !page.url().includes("/admin/login/"); + return result; + } catch (error) { + result.error = String(error); + result.finalUrl = page.url(); + return result; + } +} + +async function main() { + const config = JSON.parse(fs.readFileSync(0, "utf8")); + const browser = await puppeteer.launch({ + executablePath: config.chromePath, + headless: true, + args: [ + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-background-networking", + ], + }); + + const loginPage = await browser.newPage(); + const loginResult = await login(loginPage, config); + await loginPage.close(); + + const page = await browser.newPage(); + const consoleMessages = []; + const requestFailures = []; + page.on("console", (message) => { + consoleMessages.push({type: message.type(), text: message.text()}); + }); + page.on("pageerror", (error) => { + consoleMessages.push({type: "pageerror", text: String(error)}); + }); + page.on("requestfailed", (request) => { + requestFailures.push({ + url: request.url(), + error: request.failure() ? request.failure().errorText : "unknown", + }); + }); + + const response = await page.goto(config.dangerousUrl, { + waitUntil: "networkidle2", + timeout: 15000, + }); + + await new Promise((resolve) => setTimeout(resolve, 1500)); + + const pageState = await page.evaluate(() => ({ + href: location.href, + scriptRan: window.__dangerousScriptRan === true, + probeResults: window.__probeResults || null, + bodyText: document.body ? document.body.innerText.slice(0, 600) : "", + })); + + const output = { + mode: config.mode, + login: loginResult, + dangerousPage: { + status: response ? response.status() : null, + finalUrl: page.url(), + contentSecurityPolicy: response ? response.headers()["content-security-policy"] || null : null, + archiveboxSecurityMode: response ? response.headers()["x-archivebox-security-mode"] || null : null, + }, + pageState, + consoleMessages, + requestFailures, + }; + + console.log(JSON.stringify(output)); + await browser.close(); +} + +main().catch((error) => { + console.error(String(error)); + process.exit(1); +}); +""" + + +def _resolve_browser(shared_lib: Path) -> Path | None: + env_browser = os.environ.get("CHROME_BINARY") or os.environ.get("CHROME_BIN") + if env_browser: + candidate = Path(env_browser).expanduser() + if candidate.exists(): + return candidate + + cached = _find_cached_chromium(shared_lib) + if cached and cached.exists(): + return cached + + system = _find_system_browser() + if system and system.exists(): + return system + + which_candidates = ("chromium", "chromium-browser", "google-chrome", "google-chrome-stable", "chrome") + for binary in which_candidates: + resolved = shutil.which(binary) + if resolved: + return Path(resolved) + + mac_candidates = ( + Path("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"), + Path("/Applications/Chromium.app/Contents/MacOS/Chromium"), + ) + for candidate in mac_candidates: + if candidate.exists(): + return candidate + + return None + + +@pytest.fixture(scope="session") +def browser_runtime(tmp_path_factory): + if shutil.which("node") is None or shutil.which("npm") is None: + pytest.skip("Node.js and npm are required for browser security tests") + + shared_lib = tmp_path_factory.mktemp("archivebox_browser_lib") + _ensure_puppeteer(shared_lib) + + browser = _resolve_browser(shared_lib) + if not browser: + pytest.skip("No Chrome/Chromium binary available for browser security tests") + + return { + "node_modules_dir": shared_lib / "npm" / "node_modules", + "chrome_binary": browser, + } + + +def _seed_archive(data_dir: Path) -> dict[str, object]: + script = textwrap.dedent( + """ + import json + import os + from pathlib import Path + from django.utils import timezone + + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "archivebox.core.settings") + import django + django.setup() + + from django.contrib.auth import get_user_model + from archivebox.core.models import Snapshot + from archivebox.crawls.models import Crawl + + User = get_user_model() + admin, _ = User.objects.get_or_create( + username="testadmin", + defaults={"email": "admin@example.com", "is_staff": True, "is_superuser": True}, + ) + admin.set_password("testpassword") + admin.save() + + snapshots = {} + fixture_specs = ( + ("attacker", "https://attacker.example/entry", "Attacker Snapshot", "ATTACKER_SECRET"), + ("victim", "https://victim.example/private", "Victim Snapshot", "VICTIM_SECRET"), + ) + + for slug, url, title, secret in fixture_specs: + crawl = Crawl.objects.create( + urls=url, + created_by=admin, + status=Crawl.StatusChoices.SEALED, + retry_at=timezone.now(), + ) + snapshot = Snapshot.objects.create( + url=url, + title=title, + crawl=crawl, + status=Snapshot.StatusChoices.SEALED, + downloaded_at=timezone.now(), + ) + output_dir = Path(snapshot.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + (output_dir / "safe.json").write_text( + json.dumps({"slug": slug, "secret": secret}), + encoding="utf-8", + ) + if slug == "attacker": + (output_dir / "dangerous.html").write_text( + ''' + + + +

Dangerous Replay Fixture

+ + + + ''', + encoding="utf-8", + ) + snapshots[slug] = { + "id": str(snapshot.id), + "domain": snapshot.domain, + } + + print(json.dumps({ + "username": "testadmin", + "password": "testpassword", + "snapshots": snapshots, + })) + """ + ) + stdout, stderr, returncode = run_python_cwd(script, cwd=data_dir, timeout=120) + assert returncode == 0, stderr + return json.loads(stdout.strip()) + + +def _get_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return sock.getsockname()[1] + + +def _wait_for_http(port: int, host: str, timeout: float = 30.0) -> None: + deadline = time.time() + timeout + last_error = "server did not answer" + while time.time() < deadline: + try: + response = requests.get( + f"http://127.0.0.1:{port}/", + headers={"Host": host}, + timeout=2, + allow_redirects=False, + ) + if response.status_code < 500: + return + last_error = f"HTTP {response.status_code}" + except requests.RequestException as exc: + last_error = str(exc) + time.sleep(0.5) + raise AssertionError(f"Timed out waiting for {host}: {last_error}") + + +def _start_server(data_dir: Path, *, mode: str, port: int) -> subprocess.Popen[str]: + env = os.environ.copy() + env.pop("DATA_DIR", None) + env.update( + { + "PYTHONPATH": str(Path(__file__).resolve().parents[2]), + "LISTEN_HOST": f"archivebox.localhost:{port}", + "ALLOWED_HOSTS": "*", + "CSRF_TRUSTED_ORIGINS": f"http://archivebox.localhost:{port},http://admin.archivebox.localhost:{port}", + "SERVER_SECURITY_MODE": mode, + "USE_COLOR": "False", + "SHOW_PROGRESS": "False", + "SAVE_ARCHIVEDOTORG": "False", + "SAVE_TITLE": "False", + "SAVE_FAVICON": "False", + "SAVE_WGET": "False", + "SAVE_WARC": "False", + "SAVE_PDF": "False", + "SAVE_SCREENSHOT": "False", + "SAVE_DOM": "False", + "SAVE_SINGLEFILE": "False", + "SAVE_READABILITY": "False", + "SAVE_MERCURY": "False", + "SAVE_GIT": "False", + "SAVE_YTDLP": "False", + "SAVE_HEADERS": "False", + "SAVE_HTMLTOTEXT": "False", + "USE_CHROME": "False", + } + ) + process = subprocess.Popen( + [sys.executable, "-m", "archivebox", "server", "--debug", "--nothreading", f"127.0.0.1:{port}"], + cwd=data_dir, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + start_new_session=True, + ) + _wait_for_http(port, f"archivebox.localhost:{port}") + return process + + +def _stop_server(process: subprocess.Popen[str]) -> str: + try: + if process.poll() is None: + os.killpg(process.pid, signal.SIGTERM) + try: + stdout, _ = process.communicate(timeout=3) + except subprocess.TimeoutExpired: + os.killpg(process.pid, signal.SIGKILL) + stdout, _ = process.communicate(timeout=5) + else: + stdout, _ = process.communicate(timeout=5) + except ProcessLookupError: + stdout, _ = process.communicate(timeout=5) + return stdout + + +def _build_probe_config(mode: str, port: int, fixture: dict[str, object], runtime: dict[str, Path]) -> dict[str, str]: + snapshots = fixture["snapshots"] + attacker = snapshots["attacker"] + victim = snapshots["victim"] + base_origin = f"http://archivebox.localhost:{port}" + attacker_id = attacker["id"] + victim_id = victim["id"] + + if mode == "safe-subdomains-fullreplay": + attacker_origin = f"http://{attacker_id}.archivebox.localhost:{port}" + victim_url = f"http://{victim_id}.archivebox.localhost:{port}/safe.json" + dangerous_base = f"{attacker_origin}/dangerous.html" + admin_origin = f"http://admin.archivebox.localhost:{port}" + else: + attacker_origin = base_origin + victim_url = f"{base_origin}/snapshot/{victim_id}/safe.json" + dangerous_base = f"{base_origin}/snapshot/{attacker_id}/dangerous.html" + admin_origin = base_origin + + query = urlencode( + { + "own": "safe.json", + "victim": victim_url, + "admin": f"{admin_origin}/admin/", + "api": f"{admin_origin}/api/v1/docs", + } + ) + + return { + "mode": mode, + "chromePath": str(runtime["chrome_binary"]), + "adminLoginUrl": f"{admin_origin}/admin/login/", + "dangerousUrl": f"{dangerous_base}?{query}", + "username": fixture["username"], + "password": fixture["password"], + } + + +def _run_browser_probe(data_dir: Path, runtime: dict[str, Path], mode: str, fixture: dict[str, object], tmp_path: Path) -> dict[str, object]: + port = _get_free_port() + process = _start_server(data_dir, mode=mode, port=port) + probe_path = tmp_path / "server_security_probe.js" + probe_path.write_text(PUPPETEER_PROBE_SCRIPT, encoding="utf-8") + probe_config = _build_probe_config(mode, port, fixture, runtime) + + env = os.environ.copy() + env["NODE_PATH"] = str(runtime["node_modules_dir"]) + env["NODE_MODULES_DIR"] = str(runtime["node_modules_dir"]) + env["CHROME_BINARY"] = str(runtime["chrome_binary"]) + env["USE_COLOR"] = "False" + + try: + result = subprocess.run( + ["node", str(probe_path)], + cwd=data_dir, + env=env, + input=json.dumps(probe_config), + capture_output=True, + text=True, + timeout=120, + ) + finally: + server_log = _stop_server(process) + + assert result.returncode == 0, f"{result.stderr}\n\nSERVER LOG:\n{server_log}" + return json.loads(result.stdout.strip()) + + +@pytest.mark.parametrize( + ("mode", "expected"), + [ + ( + "safe-subdomains-fullreplay", + { + "login_succeeds": True, + "script_ran": True, + "victim_ok": False, + "admin_ok": False, + "admin_status": None, + "api_ok": False, + "api_status": None, + "csp_contains": None, + }, + ), + ( + "safe-onedomain-nojsreplay", + { + "login_succeeds": True, + "script_ran": False, + "victim_ok": None, + "admin_ok": None, + "admin_status": None, + "api_ok": None, + "api_status": None, + "csp_contains": "sandbox", + }, + ), + ( + "unsafe-onedomain-noadmin", + { + "login_succeeds": False, + "login_status": 403, + "script_ran": True, + "victim_ok": True, + "victim_status": 200, + "admin_ok": True, + "admin_status": 403, + "api_ok": True, + "api_status": 403, + "csp_contains": None, + }, + ), + ( + "danger-onedomain-fullreplay", + { + "login_succeeds": True, + "script_ran": True, + "victim_ok": True, + "victim_status": 200, + "admin_ok": True, + "admin_status": 200, + "api_ok": True, + "api_status": 200, + "csp_contains": None, + }, + ), + ], +) +def test_server_security_modes_in_chrome(initialized_archive: Path, browser_runtime, tmp_path: Path, mode: str, expected: dict[str, object]) -> None: + fixture = _seed_archive(initialized_archive) + result = _run_browser_probe(initialized_archive, browser_runtime, mode, fixture, tmp_path) + + login = result["login"] + dangerous_page = result["dangerousPage"] + page_state = result["pageState"] + probe_results = page_state["probeResults"] or {} + console_texts = [entry["text"] for entry in result["consoleMessages"]] + + assert dangerous_page["status"] == 200 + assert dangerous_page["archiveboxSecurityMode"] == mode + assert page_state["scriptRan"] is expected["script_ran"] + assert login["succeeded"] is expected["login_succeeds"] + + login_status = expected.get("login_status") + if login_status is not None: + assert login["status"] == login_status + + csp_contains = expected.get("csp_contains") + if csp_contains: + csp = dangerous_page["contentSecurityPolicy"] or "" + assert csp_contains in csp + else: + assert dangerous_page["contentSecurityPolicy"] is None + + if mode == "safe-subdomains-fullreplay": + assert probe_results["own"]["ok"] is True + assert probe_results["own"]["status"] == 200 + assert "ATTACKER_SECRET" in probe_results["own"]["sample"] + assert probe_results["victim"]["ok"] is expected["victim_ok"] + assert probe_results["admin"]["ok"] is expected["admin_ok"] + assert probe_results["api"]["ok"] is expected["api_ok"] + assert any("CORS policy" in text for text in console_texts) + return + + if mode == "safe-onedomain-nojsreplay": + assert probe_results == {} + assert "Dangerous Replay Fixture" in page_state["bodyText"] + assert any("Blocked script execution" in text for text in console_texts) + return + + assert probe_results["own"]["ok"] is True + assert probe_results["own"]["status"] == 200 + assert "ATTACKER_SECRET" in probe_results["own"]["sample"] + assert probe_results["victim"]["ok"] is expected["victim_ok"] + assert probe_results["victim"]["status"] == expected["victim_status"] + assert "VICTIM_SECRET" in probe_results["victim"]["sample"] + assert probe_results["admin"]["ok"] is expected["admin_ok"] + assert probe_results["admin"]["status"] == expected["admin_status"] + assert probe_results["api"]["ok"] is expected["api_ok"] + assert probe_results["api"]["status"] == expected["api_status"] + + if mode == "unsafe-onedomain-noadmin": + assert "control plane disabled" in probe_results["admin"]["sample"].lower() + assert "control plane disabled" in probe_results["api"]["sample"].lower() + elif mode == "danger-onedomain-fullreplay": + assert "ArchiveBox" in probe_results["admin"]["sample"] + assert "swagger" in probe_results["api"]["sample"].lower() diff --git a/archivebox/tests/test_urls.py b/archivebox/tests/test_urls.py index 627a39be..92503c5e 100644 --- a/archivebox/tests/test_urls.py +++ b/archivebox/tests/test_urls.py @@ -20,8 +20,10 @@ def _merge_pythonpath(env: dict[str, str]) -> dict[str, str]: return env -def _run_python(script: str, cwd: Path, timeout: int = 60) -> subprocess.CompletedProcess: +def _run_python(script: str, cwd: Path, timeout: int = 60, env_overrides: dict[str, str] | None = None) -> subprocess.CompletedProcess: env = _merge_pythonpath(os.environ.copy()) + if env_overrides: + env.update(env_overrides) return subprocess.run( [sys.executable, "-"], cwd=cwd, @@ -47,6 +49,7 @@ def _build_script(body: str) -> str: from django.contrib.auth import get_user_model from archivebox.core.models import Snapshot, ArchiveResult + from archivebox.crawls.models import Crawl from archivebox.config.common import SERVER_CONFIG from archivebox.core.host_utils import ( get_admin_host, @@ -58,6 +61,7 @@ def _build_script(body: str) -> str: split_host_port, host_matches, is_snapshot_subdomain, + build_snapshot_url, ) def response_body(resp): @@ -77,7 +81,41 @@ def _build_script(body: str) -> str: def get_snapshot(): snapshot = Snapshot.objects.order_by("-created_at").first() - assert snapshot is not None + if snapshot is None: + admin = ensure_admin_user() + crawl = Crawl.objects.create( + urls="https://example.com", + created_by=admin, + ) + snapshot = Snapshot.objects.create( + url="https://example.com", + title="Example Domain", + crawl=crawl, + status=Snapshot.StatusChoices.SEALED, + ) + snapshot_dir = Path(snapshot.output_dir) + snapshot_dir.mkdir(parents=True, exist_ok=True) + (snapshot_dir / "index.json").write_text('{"url": "https://example.com"}', encoding="utf-8") + (snapshot_dir / "favicon.ico").write_bytes(b"ico") + screenshot_dir = snapshot_dir / "screenshot" + screenshot_dir.mkdir(parents=True, exist_ok=True) + (screenshot_dir / "screenshot.png").write_bytes(b"png") + responses_root = snapshot_dir / "responses" / snapshot.domain + responses_root.mkdir(parents=True, exist_ok=True) + (responses_root / "index.html").write_text( + "

Example Domain

", + encoding="utf-8", + ) + ArchiveResult.objects.get_or_create( + snapshot=snapshot, + plugin="screenshot", + defaults={"status": "succeeded", "output_size": 1, "output_str": "."}, + ) + ArchiveResult.objects.get_or_create( + snapshot=snapshot, + plugin="responses", + defaults={"status": "succeeded", "output_size": 1, "output_str": "."}, + ) return snapshot def get_snapshot_files(snapshot): @@ -114,18 +152,39 @@ def _build_script(body: str) -> str: response_rel = str(response_file.relative_to(responses_root)) response_output_path = Path(snapshot.output_dir) / response_rel return output_rel, response_file, response_rel, response_output_path + + def write_replay_fixtures(snapshot): + dangerous_html = Path(snapshot.output_dir) / "dangerous.html" + dangerous_html.write_text( + "

Danger

", + encoding="utf-8", + ) + safe_json = Path(snapshot.output_dir) / "safe.json" + safe_json.write_text('{"ok": true}', encoding="utf-8") + responses_root = Path(snapshot.output_dir) / "responses" / snapshot.domain + responses_root.mkdir(parents=True, exist_ok=True) + sniffed_response = responses_root / "dangerous-response" + sniffed_response.write_text( + "

Response Danger

", + encoding="utf-8", + ) + return "dangerous.html", "safe.json", "dangerous-response" """ ) return prelude + "\n" + textwrap.dedent(body) -@pytest.mark.usefixtures("real_archive_with_example") class TestUrlRouting: data_dir: Path - def _run(self, body: str, timeout: int = 120) -> None: + @pytest.fixture(autouse=True) + def _setup_data_dir(self, initialized_archive: Path) -> None: + self.data_dir = initialized_archive + + def _run(self, body: str, timeout: int = 120, mode: str | None = None) -> None: script = _build_script(body) - result = _run_python(script, cwd=self.data_dir, timeout=timeout) + env_overrides = {"SERVER_SECURITY_MODE": mode} if mode else None + result = _run_python(script, cwd=self.data_dir, timeout=timeout, env_overrides=env_overrides) assert result.returncode == 0, result.stderr assert "OK" in result.stdout @@ -185,9 +244,6 @@ class TestUrlRouting: web_host = get_web_host() admin_host = get_admin_host() - resp = client.get("/add/", HTTP_HOST=web_host) - assert resp.status_code == 200 - resp = client.get("/admin/login/", HTTP_HOST=web_host) assert resp.status_code in (301, 302) assert admin_host in resp["Location"] @@ -250,6 +306,169 @@ class TestUrlRouting: """ ) + def test_safe_subdomains_fullreplay_leaves_risky_replay_unrestricted(self) -> None: + self._run( + """ + snapshot = get_snapshot() + dangerous_rel, safe_json_rel, sniffed_rel = write_replay_fixtures(snapshot) + snapshot_host = get_snapshot_host(str(snapshot.id)) + + client = Client() + + resp = client.get(f"/{dangerous_rel}", HTTP_HOST=snapshot_host) + assert resp.status_code == 200 + assert resp.headers.get("Content-Security-Policy") is None + assert resp.headers.get("X-Content-Type-Options") == "nosniff" + + resp = client.get(f"/{safe_json_rel}", HTTP_HOST=snapshot_host) + assert resp.status_code == 200 + assert resp.headers.get("Content-Security-Policy") is None + + resp = client.get(f"/{sniffed_rel}", HTTP_HOST=snapshot_host) + assert resp.status_code == 200 + assert resp.headers.get("Content-Security-Policy") is None + + print("OK") + """ + ) + + def test_safe_onedomain_nojsreplay_routes_and_neuters_risky_documents(self) -> None: + self._run( + """ + ensure_admin_user() + snapshot = get_snapshot() + dangerous_rel, safe_json_rel, sniffed_rel = write_replay_fixtures(snapshot) + snapshot_id = str(snapshot.id) + + client = Client() + base_host = SERVER_CONFIG.LISTEN_HOST + web_host = get_web_host() + admin_host = get_admin_host() + api_host = get_api_host() + + assert SERVER_CONFIG.SERVER_SECURITY_MODE == "safe-onedomain-nojsreplay" + assert web_host == base_host + assert admin_host == base_host + assert api_host == base_host + assert get_snapshot_host(snapshot_id) == base_host + assert get_original_host(snapshot.domain) == base_host + assert get_listen_subdomain(base_host) == "" + + replay_url = build_snapshot_url(snapshot_id, dangerous_rel) + assert replay_url == f"http://{base_host}/snapshot/{snapshot_id}/{dangerous_rel}" + + resp = client.get(f"/{snapshot.url_path}/{dangerous_rel}", HTTP_HOST=base_host) + assert resp.status_code in (301, 302) + assert resp["Location"] == replay_url + + resp = client.get("/admin/login/", HTTP_HOST=base_host) + assert resp.status_code == 200 + + resp = client.get("/api/v1/docs", HTTP_HOST=base_host) + assert resp.status_code == 200 + + resp = client.get(f"/snapshot/{snapshot_id}/{dangerous_rel}", HTTP_HOST=base_host) + assert resp.status_code == 200 + csp = resp.headers.get("Content-Security-Policy") or "" + assert "sandbox" in csp + assert "script-src 'none'" in csp + assert resp.headers.get("X-Content-Type-Options") == "nosniff" + + resp = client.get(f"/snapshot/{snapshot_id}/{safe_json_rel}", HTTP_HOST=base_host) + assert resp.status_code == 200 + assert resp.headers.get("Content-Security-Policy") is None + assert resp.headers.get("X-Content-Type-Options") == "nosniff" + + resp = client.get(f"/snapshot/{snapshot_id}/{sniffed_rel}", HTTP_HOST=base_host) + assert resp.status_code == 200 + csp = resp.headers.get("Content-Security-Policy") or "" + assert "sandbox" in csp + assert "script-src 'none'" in csp + + print("OK") + """, + mode="safe-onedomain-nojsreplay", + ) + + def test_unsafe_onedomain_noadmin_blocks_control_plane_and_unsafe_methods(self) -> None: + self._run( + """ + ensure_admin_user() + snapshot = get_snapshot() + dangerous_rel, _, _ = write_replay_fixtures(snapshot) + snapshot_id = str(snapshot.id) + + client = Client() + base_host = SERVER_CONFIG.LISTEN_HOST + + assert SERVER_CONFIG.SERVER_SECURITY_MODE == "unsafe-onedomain-noadmin" + assert SERVER_CONFIG.CONTROL_PLANE_ENABLED is False + assert SERVER_CONFIG.BLOCK_UNSAFE_METHODS is True + assert get_web_host() == base_host + assert get_admin_host() == base_host + assert get_api_host() == base_host + + for blocked_path in ("/admin/login/", "/api/v1/docs", "/add/", f"/web/{snapshot.domain}"): + resp = client.get(blocked_path, HTTP_HOST=base_host) + assert resp.status_code == 403, (blocked_path, resp.status_code) + + resp = client.post("/public/", data="x=1", content_type="application/x-www-form-urlencoded", HTTP_HOST=base_host) + assert resp.status_code == 403 + + resp = client.get(f"/snapshot/{snapshot_id}/{dangerous_rel}", HTTP_HOST=base_host) + assert resp.status_code == 200 + assert resp.headers.get("Content-Security-Policy") is None + assert resp.headers.get("X-Content-Type-Options") == "nosniff" + + print("OK") + """, + mode="unsafe-onedomain-noadmin", + ) + + def test_danger_onedomain_fullreplay_keeps_control_plane_and_raw_replay(self) -> None: + self._run( + """ + ensure_admin_user() + snapshot = get_snapshot() + dangerous_rel, _, _ = write_replay_fixtures(snapshot) + snapshot_id = str(snapshot.id) + + client = Client() + base_host = SERVER_CONFIG.LISTEN_HOST + + assert SERVER_CONFIG.SERVER_SECURITY_MODE == "danger-onedomain-fullreplay" + assert SERVER_CONFIG.CONTROL_PLANE_ENABLED is True + assert get_web_host() == base_host + assert get_admin_host() == base_host + assert get_api_host() == base_host + assert build_snapshot_url(snapshot_id, dangerous_rel) == f"http://{base_host}/snapshot/{snapshot_id}/{dangerous_rel}" + + resp = client.get("/admin/login/", HTTP_HOST=base_host) + assert resp.status_code == 200 + + resp = client.get("/api/v1/docs", HTTP_HOST=base_host) + assert resp.status_code == 200 + + payload = '{"username": "testadmin", "password": "testpassword"}' + resp = client.post( + "/api/v1/auth/get_api_token", + data=payload, + content_type="application/json", + HTTP_HOST=base_host, + ) + assert resp.status_code == 200 + assert resp.json().get("token") + + resp = client.get(f"/snapshot/{snapshot_id}/{dangerous_rel}", HTTP_HOST=base_host) + assert resp.status_code == 200 + assert resp.headers.get("Content-Security-Policy") is None + assert resp.headers.get("X-Content-Type-Options") == "nosniff" + + print("OK") + """, + mode="danger-onedomain-fullreplay", + ) + def test_template_and_admin_links(self) -> None: self._run( """