Add configurable server security modes (#1773)

Fixes https://github.com/ArchiveBox/ArchiveBox/issues/239

## Summary
- add `SERVER_SECURITY_MODE` presets for safe subdomain replay, safe
one-domain no-JS replay, unsafe one-domain no-admin, and dangerous
one-domain full replay
- make host routing, replay URLs, static serving, and control-plane
access mode-aware
- add strict routing/header coverage plus a browser-backed
Chrome/Puppeteer test that verifies real same-origin behavior in all
four modes

## Testing
- `uv run pytest archivebox/tests/test_urls.py -v`
- `uv run pytest archivebox/tests/test_admin_views.py -v`
- `uv run pytest archivebox/tests/test_server_security_browser.py -v`

<!-- devin-review-badge-begin -->

---

<a href="https://app.devin.ai/review/archivebox/archivebox/pull/1773"
target="_blank">
  <picture>
<source media="(prefers-color-scheme: dark)"
srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
<img
src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1"
alt="Open with Devin">
  </picture>
</a>
<!-- devin-review-badge-end -->


<!-- This is an auto-generated description by cubic. -->
---
## Summary by cubic
Adds configurable server security modes to isolate admin/API from
archived content, with a safe subdomain default and single-domain
fallbacks. Routing, replay endpoints, headers, and middleware are
mode-aware, with browser tests validating same-origin behavior.

- New Features
- Introduced SERVER_SECURITY_MODE with presets:
safe-subdomains-fullreplay (default), safe-onedomain-nojsreplay,
unsafe-onedomain-noadmin, danger-onedomain-fullreplay.
- Mode-aware routing and base URLs; one-domain modes use path-based
replay: /snapshot/<id>/... and /original/<domain>/....
- Control plane gate: block admin/API and non-GET methods in
unsafe-onedomain-noadmin; allow full access in
danger-onedomain-fullreplay.
- Safer replay: detect risky HTML/SVG and apply CSP sandbox (no scripts)
in safe-onedomain-nojsreplay; add X-ArchiveBox-Security-Mode and
X-Content-Type-Options: nosniff on replay responses.
- Middleware and serving: added ServerSecurityModeMiddleware, improved
HostRouting, and static server byte-range/CSP handling.
- Tests: added Chrome/Puppeteer browser tests and stricter URL routing
tests covering all modes.

- Migration
- Default requires wildcard subdomains for full isolation (admin., web.,
api., and snapshot-id.<base>).
- To run on one domain, set SERVER_SECURITY_MODE to a one-domain preset;
URLs switch to /snapshot/<id>/ and /original/<domain>/ paths.
- For production, prefer safe-subdomains-fullreplay; lower-security
modes print a startup warning.

<sup>Written for commit ad41b15581.
Summary will update on new commits.</sup>

<!-- End of auto-generated description by cubic. -->
This commit is contained in:
Nick Sweeting
2026-03-22 20:17:21 -07:00
committed by GitHub
9 changed files with 1139 additions and 82 deletions

View File

@@ -0,0 +1,578 @@
#!/usr/bin/env python3
"""Browser-level security mode tests using the existing Node/Puppeteer runtime."""
from __future__ import annotations
import json
import os
import shutil
import signal
import socket
import subprocess
import sys
import textwrap
import time
from pathlib import Path
from urllib.parse import urlencode
import pytest
import requests
from .conftest import _ensure_puppeteer, _find_cached_chromium, _find_system_browser, run_python_cwd
PUPPETEER_PROBE_SCRIPT = """\
const fs = require("node:fs");
const puppeteer = require("puppeteer");
async function login(page, config) {
const result = {
reachable: false,
succeeded: false,
finalUrl: null,
status: null,
error: null,
};
try {
const response = await page.goto(config.adminLoginUrl, {
waitUntil: "networkidle2",
timeout: 15000,
});
result.reachable = true;
result.status = response ? response.status() : null;
const usernameInput = await page.$('input[name="username"]');
const passwordInput = await page.$('input[name="password"]');
if (!usernameInput || !passwordInput) {
result.finalUrl = page.url();
return result;
}
await usernameInput.type(config.username);
await passwordInput.type(config.password);
await Promise.all([
page.waitForNavigation({waitUntil: "networkidle2", timeout: 15000}),
page.click('button[type="submit"], input[type="submit"]'),
]);
result.finalUrl = page.url();
result.succeeded = !page.url().includes("/admin/login/");
return result;
} catch (error) {
result.error = String(error);
result.finalUrl = page.url();
return result;
}
}
async function main() {
const config = JSON.parse(fs.readFileSync(0, "utf8"));
const browser = await puppeteer.launch({
executablePath: config.chromePath,
headless: true,
args: [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-background-networking",
],
});
const loginPage = await browser.newPage();
const loginResult = await login(loginPage, config);
await loginPage.close();
const page = await browser.newPage();
const consoleMessages = [];
const requestFailures = [];
page.on("console", (message) => {
consoleMessages.push({type: message.type(), text: message.text()});
});
page.on("pageerror", (error) => {
consoleMessages.push({type: "pageerror", text: String(error)});
});
page.on("requestfailed", (request) => {
requestFailures.push({
url: request.url(),
error: request.failure() ? request.failure().errorText : "unknown",
});
});
const response = await page.goto(config.dangerousUrl, {
waitUntil: "networkidle2",
timeout: 15000,
});
await new Promise((resolve) => setTimeout(resolve, 1500));
const pageState = await page.evaluate(() => ({
href: location.href,
scriptRan: window.__dangerousScriptRan === true,
probeResults: window.__probeResults || null,
bodyText: document.body ? document.body.innerText.slice(0, 600) : "",
}));
const output = {
mode: config.mode,
login: loginResult,
dangerousPage: {
status: response ? response.status() : null,
finalUrl: page.url(),
contentSecurityPolicy: response ? response.headers()["content-security-policy"] || null : null,
archiveboxSecurityMode: response ? response.headers()["x-archivebox-security-mode"] || null : null,
},
pageState,
consoleMessages,
requestFailures,
};
console.log(JSON.stringify(output));
await browser.close();
}
main().catch((error) => {
console.error(String(error));
process.exit(1);
});
"""
def _resolve_browser(shared_lib: Path) -> Path | None:
env_browser = os.environ.get("CHROME_BINARY") or os.environ.get("CHROME_BIN")
if env_browser:
candidate = Path(env_browser).expanduser()
if candidate.exists():
return candidate
cached = _find_cached_chromium(shared_lib)
if cached and cached.exists():
return cached
system = _find_system_browser()
if system and system.exists():
return system
which_candidates = ("chromium", "chromium-browser", "google-chrome", "google-chrome-stable", "chrome")
for binary in which_candidates:
resolved = shutil.which(binary)
if resolved:
return Path(resolved)
mac_candidates = (
Path("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"),
Path("/Applications/Chromium.app/Contents/MacOS/Chromium"),
)
for candidate in mac_candidates:
if candidate.exists():
return candidate
return None
@pytest.fixture(scope="session")
def browser_runtime(tmp_path_factory):
if shutil.which("node") is None or shutil.which("npm") is None:
pytest.skip("Node.js and npm are required for browser security tests")
shared_lib = tmp_path_factory.mktemp("archivebox_browser_lib")
_ensure_puppeteer(shared_lib)
browser = _resolve_browser(shared_lib)
if not browser:
pytest.skip("No Chrome/Chromium binary available for browser security tests")
return {
"node_modules_dir": shared_lib / "npm" / "node_modules",
"chrome_binary": browser,
}
def _seed_archive(data_dir: Path) -> dict[str, object]:
script = textwrap.dedent(
"""
import json
import os
from pathlib import Path
from django.utils import timezone
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "archivebox.core.settings")
import django
django.setup()
from django.contrib.auth import get_user_model
from archivebox.core.models import Snapshot
from archivebox.crawls.models import Crawl
User = get_user_model()
admin, _ = User.objects.get_or_create(
username="testadmin",
defaults={"email": "admin@example.com", "is_staff": True, "is_superuser": True},
)
admin.set_password("testpassword")
admin.save()
snapshots = {}
fixture_specs = (
("attacker", "https://attacker.example/entry", "Attacker Snapshot", "ATTACKER_SECRET"),
("victim", "https://victim.example/private", "Victim Snapshot", "VICTIM_SECRET"),
)
for slug, url, title, secret in fixture_specs:
crawl = Crawl.objects.create(
urls=url,
created_by=admin,
status=Crawl.StatusChoices.SEALED,
retry_at=timezone.now(),
)
snapshot = Snapshot.objects.create(
url=url,
title=title,
crawl=crawl,
status=Snapshot.StatusChoices.SEALED,
downloaded_at=timezone.now(),
)
output_dir = Path(snapshot.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "safe.json").write_text(
json.dumps({"slug": slug, "secret": secret}),
encoding="utf-8",
)
if slug == "attacker":
(output_dir / "dangerous.html").write_text(
'''
<!doctype html>
<html>
<body>
<h1>Dangerous Replay Fixture</h1>
<script>
window.__dangerousScriptRan = true;
(async () => {
const params = new URLSearchParams(location.search);
const targets = {
own: params.get("own") || "safe.json",
victim: params.get("victim"),
admin: params.get("admin"),
api: params.get("api"),
};
const results = {};
for (const [label, url] of Object.entries(targets)) {
if (!url) continue;
try {
const response = await fetch(url, {credentials: "include"});
const text = await response.text();
results[label] = {
ok: true,
status: response.status,
url: response.url,
sample: text.slice(0, 120),
};
} catch (error) {
results[label] = {
ok: false,
error: String(error),
};
}
}
window.__probeResults = results;
const pre = document.createElement("pre");
pre.id = "probe-results";
pre.textContent = JSON.stringify(results);
document.body.appendChild(pre);
})().catch((error) => {
window.__probeResults = {fatal: String(error)};
});
</script>
</body>
</html>
''',
encoding="utf-8",
)
snapshots[slug] = {
"id": str(snapshot.id),
"domain": snapshot.domain,
}
print(json.dumps({
"username": "testadmin",
"password": "testpassword",
"snapshots": snapshots,
}))
"""
)
stdout, stderr, returncode = run_python_cwd(script, cwd=data_dir, timeout=120)
assert returncode == 0, stderr
return json.loads(stdout.strip())
def _get_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]
def _wait_for_http(port: int, host: str, timeout: float = 30.0) -> None:
deadline = time.time() + timeout
last_error = "server did not answer"
while time.time() < deadline:
try:
response = requests.get(
f"http://127.0.0.1:{port}/",
headers={"Host": host},
timeout=2,
allow_redirects=False,
)
if response.status_code < 500:
return
last_error = f"HTTP {response.status_code}"
except requests.RequestException as exc:
last_error = str(exc)
time.sleep(0.5)
raise AssertionError(f"Timed out waiting for {host}: {last_error}")
def _start_server(data_dir: Path, *, mode: str, port: int) -> subprocess.Popen[str]:
env = os.environ.copy()
env.pop("DATA_DIR", None)
env.update(
{
"PYTHONPATH": str(Path(__file__).resolve().parents[2]),
"LISTEN_HOST": f"archivebox.localhost:{port}",
"ALLOWED_HOSTS": "*",
"CSRF_TRUSTED_ORIGINS": f"http://archivebox.localhost:{port},http://admin.archivebox.localhost:{port}",
"SERVER_SECURITY_MODE": mode,
"USE_COLOR": "False",
"SHOW_PROGRESS": "False",
"SAVE_ARCHIVEDOTORG": "False",
"SAVE_TITLE": "False",
"SAVE_FAVICON": "False",
"SAVE_WGET": "False",
"SAVE_WARC": "False",
"SAVE_PDF": "False",
"SAVE_SCREENSHOT": "False",
"SAVE_DOM": "False",
"SAVE_SINGLEFILE": "False",
"SAVE_READABILITY": "False",
"SAVE_MERCURY": "False",
"SAVE_GIT": "False",
"SAVE_YTDLP": "False",
"SAVE_HEADERS": "False",
"SAVE_HTMLTOTEXT": "False",
"USE_CHROME": "False",
}
)
process = subprocess.Popen(
[sys.executable, "-m", "archivebox", "server", "--debug", "--nothreading", f"127.0.0.1:{port}"],
cwd=data_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
start_new_session=True,
)
_wait_for_http(port, f"archivebox.localhost:{port}")
return process
def _stop_server(process: subprocess.Popen[str]) -> str:
try:
if process.poll() is None:
os.killpg(process.pid, signal.SIGTERM)
try:
stdout, _ = process.communicate(timeout=3)
except subprocess.TimeoutExpired:
os.killpg(process.pid, signal.SIGKILL)
stdout, _ = process.communicate(timeout=5)
else:
stdout, _ = process.communicate(timeout=5)
except ProcessLookupError:
stdout, _ = process.communicate(timeout=5)
return stdout
def _build_probe_config(mode: str, port: int, fixture: dict[str, object], runtime: dict[str, Path]) -> dict[str, str]:
snapshots = fixture["snapshots"]
attacker = snapshots["attacker"]
victim = snapshots["victim"]
base_origin = f"http://archivebox.localhost:{port}"
attacker_id = attacker["id"]
victim_id = victim["id"]
if mode == "safe-subdomains-fullreplay":
attacker_origin = f"http://{attacker_id}.archivebox.localhost:{port}"
victim_url = f"http://{victim_id}.archivebox.localhost:{port}/safe.json"
dangerous_base = f"{attacker_origin}/dangerous.html"
admin_origin = f"http://admin.archivebox.localhost:{port}"
else:
attacker_origin = base_origin
victim_url = f"{base_origin}/snapshot/{victim_id}/safe.json"
dangerous_base = f"{base_origin}/snapshot/{attacker_id}/dangerous.html"
admin_origin = base_origin
query = urlencode(
{
"own": "safe.json",
"victim": victim_url,
"admin": f"{admin_origin}/admin/",
"api": f"{admin_origin}/api/v1/docs",
}
)
return {
"mode": mode,
"chromePath": str(runtime["chrome_binary"]),
"adminLoginUrl": f"{admin_origin}/admin/login/",
"dangerousUrl": f"{dangerous_base}?{query}",
"username": fixture["username"],
"password": fixture["password"],
}
def _run_browser_probe(data_dir: Path, runtime: dict[str, Path], mode: str, fixture: dict[str, object], tmp_path: Path) -> dict[str, object]:
port = _get_free_port()
process = _start_server(data_dir, mode=mode, port=port)
probe_path = tmp_path / "server_security_probe.js"
probe_path.write_text(PUPPETEER_PROBE_SCRIPT, encoding="utf-8")
probe_config = _build_probe_config(mode, port, fixture, runtime)
env = os.environ.copy()
env["NODE_PATH"] = str(runtime["node_modules_dir"])
env["NODE_MODULES_DIR"] = str(runtime["node_modules_dir"])
env["CHROME_BINARY"] = str(runtime["chrome_binary"])
env["USE_COLOR"] = "False"
try:
result = subprocess.run(
["node", str(probe_path)],
cwd=data_dir,
env=env,
input=json.dumps(probe_config),
capture_output=True,
text=True,
timeout=120,
)
finally:
server_log = _stop_server(process)
assert result.returncode == 0, f"{result.stderr}\n\nSERVER LOG:\n{server_log}"
return json.loads(result.stdout.strip())
@pytest.mark.parametrize(
("mode", "expected"),
[
(
"safe-subdomains-fullreplay",
{
"login_succeeds": True,
"script_ran": True,
"victim_ok": False,
"admin_ok": False,
"admin_status": None,
"api_ok": False,
"api_status": None,
"csp_contains": None,
},
),
(
"safe-onedomain-nojsreplay",
{
"login_succeeds": True,
"script_ran": False,
"victim_ok": None,
"admin_ok": None,
"admin_status": None,
"api_ok": None,
"api_status": None,
"csp_contains": "sandbox",
},
),
(
"unsafe-onedomain-noadmin",
{
"login_succeeds": False,
"login_status": 403,
"script_ran": True,
"victim_ok": True,
"victim_status": 200,
"admin_ok": True,
"admin_status": 403,
"api_ok": True,
"api_status": 403,
"csp_contains": None,
},
),
(
"danger-onedomain-fullreplay",
{
"login_succeeds": True,
"script_ran": True,
"victim_ok": True,
"victim_status": 200,
"admin_ok": True,
"admin_status": 200,
"api_ok": True,
"api_status": 200,
"csp_contains": None,
},
),
],
)
def test_server_security_modes_in_chrome(initialized_archive: Path, browser_runtime, tmp_path: Path, mode: str, expected: dict[str, object]) -> None:
fixture = _seed_archive(initialized_archive)
result = _run_browser_probe(initialized_archive, browser_runtime, mode, fixture, tmp_path)
login = result["login"]
dangerous_page = result["dangerousPage"]
page_state = result["pageState"]
probe_results = page_state["probeResults"] or {}
console_texts = [entry["text"] for entry in result["consoleMessages"]]
assert dangerous_page["status"] == 200
assert dangerous_page["archiveboxSecurityMode"] == mode
assert page_state["scriptRan"] is expected["script_ran"]
assert login["succeeded"] is expected["login_succeeds"]
login_status = expected.get("login_status")
if login_status is not None:
assert login["status"] == login_status
csp_contains = expected.get("csp_contains")
if csp_contains:
csp = dangerous_page["contentSecurityPolicy"] or ""
assert csp_contains in csp
else:
assert dangerous_page["contentSecurityPolicy"] is None
if mode == "safe-subdomains-fullreplay":
assert probe_results["own"]["ok"] is True
assert probe_results["own"]["status"] == 200
assert "ATTACKER_SECRET" in probe_results["own"]["sample"]
assert probe_results["victim"]["ok"] is expected["victim_ok"]
assert probe_results["admin"]["ok"] is expected["admin_ok"]
assert probe_results["api"]["ok"] is expected["api_ok"]
assert any("CORS policy" in text for text in console_texts)
return
if mode == "safe-onedomain-nojsreplay":
assert probe_results == {}
assert "Dangerous Replay Fixture" in page_state["bodyText"]
assert any("Blocked script execution" in text for text in console_texts)
return
assert probe_results["own"]["ok"] is True
assert probe_results["own"]["status"] == 200
assert "ATTACKER_SECRET" in probe_results["own"]["sample"]
assert probe_results["victim"]["ok"] is expected["victim_ok"]
assert probe_results["victim"]["status"] == expected["victim_status"]
assert "VICTIM_SECRET" in probe_results["victim"]["sample"]
assert probe_results["admin"]["ok"] is expected["admin_ok"]
assert probe_results["admin"]["status"] == expected["admin_status"]
assert probe_results["api"]["ok"] is expected["api_ok"]
assert probe_results["api"]["status"] == expected["api_status"]
if mode == "unsafe-onedomain-noadmin":
assert "control plane disabled" in probe_results["admin"]["sample"].lower()
assert "control plane disabled" in probe_results["api"]["sample"].lower()
elif mode == "danger-onedomain-fullreplay":
assert "ArchiveBox" in probe_results["admin"]["sample"]
assert "swagger" in probe_results["api"]["sample"].lower()

View File

@@ -20,8 +20,10 @@ def _merge_pythonpath(env: dict[str, str]) -> dict[str, str]:
return env
def _run_python(script: str, cwd: Path, timeout: int = 60) -> subprocess.CompletedProcess:
def _run_python(script: str, cwd: Path, timeout: int = 60, env_overrides: dict[str, str] | None = None) -> subprocess.CompletedProcess:
env = _merge_pythonpath(os.environ.copy())
if env_overrides:
env.update(env_overrides)
return subprocess.run(
[sys.executable, "-"],
cwd=cwd,
@@ -47,6 +49,7 @@ def _build_script(body: str) -> str:
from django.contrib.auth import get_user_model
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.crawls.models import Crawl
from archivebox.config.common import SERVER_CONFIG
from archivebox.core.host_utils import (
get_admin_host,
@@ -58,6 +61,7 @@ def _build_script(body: str) -> str:
split_host_port,
host_matches,
is_snapshot_subdomain,
build_snapshot_url,
)
def response_body(resp):
@@ -77,7 +81,41 @@ def _build_script(body: str) -> str:
def get_snapshot():
snapshot = Snapshot.objects.order_by("-created_at").first()
assert snapshot is not None
if snapshot is None:
admin = ensure_admin_user()
crawl = Crawl.objects.create(
urls="https://example.com",
created_by=admin,
)
snapshot = Snapshot.objects.create(
url="https://example.com",
title="Example Domain",
crawl=crawl,
status=Snapshot.StatusChoices.SEALED,
)
snapshot_dir = Path(snapshot.output_dir)
snapshot_dir.mkdir(parents=True, exist_ok=True)
(snapshot_dir / "index.json").write_text('{"url": "https://example.com"}', encoding="utf-8")
(snapshot_dir / "favicon.ico").write_bytes(b"ico")
screenshot_dir = snapshot_dir / "screenshot"
screenshot_dir.mkdir(parents=True, exist_ok=True)
(screenshot_dir / "screenshot.png").write_bytes(b"png")
responses_root = snapshot_dir / "responses" / snapshot.domain
responses_root.mkdir(parents=True, exist_ok=True)
(responses_root / "index.html").write_text(
"<!doctype html><html><body><h1>Example Domain</h1></body></html>",
encoding="utf-8",
)
ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin="screenshot",
defaults={"status": "succeeded", "output_size": 1, "output_str": "."},
)
ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin="responses",
defaults={"status": "succeeded", "output_size": 1, "output_str": "."},
)
return snapshot
def get_snapshot_files(snapshot):
@@ -114,18 +152,39 @@ def _build_script(body: str) -> str:
response_rel = str(response_file.relative_to(responses_root))
response_output_path = Path(snapshot.output_dir) / response_rel
return output_rel, response_file, response_rel, response_output_path
def write_replay_fixtures(snapshot):
dangerous_html = Path(snapshot.output_dir) / "dangerous.html"
dangerous_html.write_text(
"<!doctype html><html><body><script>window.__archivebox_danger__ = true;</script><h1>Danger</h1></body></html>",
encoding="utf-8",
)
safe_json = Path(snapshot.output_dir) / "safe.json"
safe_json.write_text('{"ok": true}', encoding="utf-8")
responses_root = Path(snapshot.output_dir) / "responses" / snapshot.domain
responses_root.mkdir(parents=True, exist_ok=True)
sniffed_response = responses_root / "dangerous-response"
sniffed_response.write_text(
"<!doctype html><html><body><script>window.__archivebox_response__ = true;</script><p>Response Danger</p></body></html>",
encoding="utf-8",
)
return "dangerous.html", "safe.json", "dangerous-response"
"""
)
return prelude + "\n" + textwrap.dedent(body)
@pytest.mark.usefixtures("real_archive_with_example")
class TestUrlRouting:
data_dir: Path
def _run(self, body: str, timeout: int = 120) -> None:
@pytest.fixture(autouse=True)
def _setup_data_dir(self, initialized_archive: Path) -> None:
self.data_dir = initialized_archive
def _run(self, body: str, timeout: int = 120, mode: str | None = None) -> None:
script = _build_script(body)
result = _run_python(script, cwd=self.data_dir, timeout=timeout)
env_overrides = {"SERVER_SECURITY_MODE": mode} if mode else None
result = _run_python(script, cwd=self.data_dir, timeout=timeout, env_overrides=env_overrides)
assert result.returncode == 0, result.stderr
assert "OK" in result.stdout
@@ -185,9 +244,6 @@ class TestUrlRouting:
web_host = get_web_host()
admin_host = get_admin_host()
resp = client.get("/add/", HTTP_HOST=web_host)
assert resp.status_code == 200
resp = client.get("/admin/login/", HTTP_HOST=web_host)
assert resp.status_code in (301, 302)
assert admin_host in resp["Location"]
@@ -250,6 +306,169 @@ class TestUrlRouting:
"""
)
def test_safe_subdomains_fullreplay_leaves_risky_replay_unrestricted(self) -> None:
self._run(
"""
snapshot = get_snapshot()
dangerous_rel, safe_json_rel, sniffed_rel = write_replay_fixtures(snapshot)
snapshot_host = get_snapshot_host(str(snapshot.id))
client = Client()
resp = client.get(f"/{dangerous_rel}", HTTP_HOST=snapshot_host)
assert resp.status_code == 200
assert resp.headers.get("Content-Security-Policy") is None
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
resp = client.get(f"/{safe_json_rel}", HTTP_HOST=snapshot_host)
assert resp.status_code == 200
assert resp.headers.get("Content-Security-Policy") is None
resp = client.get(f"/{sniffed_rel}", HTTP_HOST=snapshot_host)
assert resp.status_code == 200
assert resp.headers.get("Content-Security-Policy") is None
print("OK")
"""
)
def test_safe_onedomain_nojsreplay_routes_and_neuters_risky_documents(self) -> None:
self._run(
"""
ensure_admin_user()
snapshot = get_snapshot()
dangerous_rel, safe_json_rel, sniffed_rel = write_replay_fixtures(snapshot)
snapshot_id = str(snapshot.id)
client = Client()
base_host = SERVER_CONFIG.LISTEN_HOST
web_host = get_web_host()
admin_host = get_admin_host()
api_host = get_api_host()
assert SERVER_CONFIG.SERVER_SECURITY_MODE == "safe-onedomain-nojsreplay"
assert web_host == base_host
assert admin_host == base_host
assert api_host == base_host
assert get_snapshot_host(snapshot_id) == base_host
assert get_original_host(snapshot.domain) == base_host
assert get_listen_subdomain(base_host) == ""
replay_url = build_snapshot_url(snapshot_id, dangerous_rel)
assert replay_url == f"http://{base_host}/snapshot/{snapshot_id}/{dangerous_rel}"
resp = client.get(f"/{snapshot.url_path}/{dangerous_rel}", HTTP_HOST=base_host)
assert resp.status_code in (301, 302)
assert resp["Location"] == replay_url
resp = client.get("/admin/login/", HTTP_HOST=base_host)
assert resp.status_code == 200
resp = client.get("/api/v1/docs", HTTP_HOST=base_host)
assert resp.status_code == 200
resp = client.get(f"/snapshot/{snapshot_id}/{dangerous_rel}", HTTP_HOST=base_host)
assert resp.status_code == 200
csp = resp.headers.get("Content-Security-Policy") or ""
assert "sandbox" in csp
assert "script-src 'none'" in csp
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
resp = client.get(f"/snapshot/{snapshot_id}/{safe_json_rel}", HTTP_HOST=base_host)
assert resp.status_code == 200
assert resp.headers.get("Content-Security-Policy") is None
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
resp = client.get(f"/snapshot/{snapshot_id}/{sniffed_rel}", HTTP_HOST=base_host)
assert resp.status_code == 200
csp = resp.headers.get("Content-Security-Policy") or ""
assert "sandbox" in csp
assert "script-src 'none'" in csp
print("OK")
""",
mode="safe-onedomain-nojsreplay",
)
def test_unsafe_onedomain_noadmin_blocks_control_plane_and_unsafe_methods(self) -> None:
self._run(
"""
ensure_admin_user()
snapshot = get_snapshot()
dangerous_rel, _, _ = write_replay_fixtures(snapshot)
snapshot_id = str(snapshot.id)
client = Client()
base_host = SERVER_CONFIG.LISTEN_HOST
assert SERVER_CONFIG.SERVER_SECURITY_MODE == "unsafe-onedomain-noadmin"
assert SERVER_CONFIG.CONTROL_PLANE_ENABLED is False
assert SERVER_CONFIG.BLOCK_UNSAFE_METHODS is True
assert get_web_host() == base_host
assert get_admin_host() == base_host
assert get_api_host() == base_host
for blocked_path in ("/admin/login/", "/api/v1/docs", "/add/", f"/web/{snapshot.domain}"):
resp = client.get(blocked_path, HTTP_HOST=base_host)
assert resp.status_code == 403, (blocked_path, resp.status_code)
resp = client.post("/public/", data="x=1", content_type="application/x-www-form-urlencoded", HTTP_HOST=base_host)
assert resp.status_code == 403
resp = client.get(f"/snapshot/{snapshot_id}/{dangerous_rel}", HTTP_HOST=base_host)
assert resp.status_code == 200
assert resp.headers.get("Content-Security-Policy") is None
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
print("OK")
""",
mode="unsafe-onedomain-noadmin",
)
def test_danger_onedomain_fullreplay_keeps_control_plane_and_raw_replay(self) -> None:
self._run(
"""
ensure_admin_user()
snapshot = get_snapshot()
dangerous_rel, _, _ = write_replay_fixtures(snapshot)
snapshot_id = str(snapshot.id)
client = Client()
base_host = SERVER_CONFIG.LISTEN_HOST
assert SERVER_CONFIG.SERVER_SECURITY_MODE == "danger-onedomain-fullreplay"
assert SERVER_CONFIG.CONTROL_PLANE_ENABLED is True
assert get_web_host() == base_host
assert get_admin_host() == base_host
assert get_api_host() == base_host
assert build_snapshot_url(snapshot_id, dangerous_rel) == f"http://{base_host}/snapshot/{snapshot_id}/{dangerous_rel}"
resp = client.get("/admin/login/", HTTP_HOST=base_host)
assert resp.status_code == 200
resp = client.get("/api/v1/docs", HTTP_HOST=base_host)
assert resp.status_code == 200
payload = '{"username": "testadmin", "password": "testpassword"}'
resp = client.post(
"/api/v1/auth/get_api_token",
data=payload,
content_type="application/json",
HTTP_HOST=base_host,
)
assert resp.status_code == 200
assert resp.json().get("token")
resp = client.get(f"/snapshot/{snapshot_id}/{dangerous_rel}", HTTP_HOST=base_host)
assert resp.status_code == 200
assert resp.headers.get("Content-Security-Policy") is None
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
print("OK")
""",
mode="danger-onedomain-fullreplay",
)
def test_template_and_admin_links(self) -> None:
self._run(
"""