tons of fixes with codex

This commit is contained in:
Nick Sweeting
2026-01-19 01:00:53 -08:00
parent eaf7256345
commit c7b2217cd6
184 changed files with 3943 additions and 2420 deletions

View File

@@ -2,7 +2,6 @@
import os
import sys
import json
import subprocess
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
@@ -110,16 +109,9 @@ def initialized_archive(isolated_data_dir):
# =============================================================================
def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]:
"""Parse JSONL output into list of dicts."""
records = []
for line in stdout.strip().split('\n'):
line = line.strip()
if line and line.startswith('{'):
try:
records.append(json.loads(line))
except json.JSONDecodeError:
pass
return records
"""Parse JSONL output into list of dicts via Process parser."""
from archivebox.machine.models import Process
return Process.parse_records_from_text(stdout or '')
def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1):

View File

@@ -0,0 +1,133 @@
import os
import signal
import sqlite3
import subprocess
import sys
import time
from pathlib import Path
def _run(cmd, data_dir: Path, env: dict, timeout: int = 120):
return subprocess.run(
cmd,
cwd=data_dir,
env=env,
capture_output=True,
text=True,
timeout=timeout,
)
def _make_env(data_dir: Path) -> dict:
env = os.environ.copy()
env["DATA_DIR"] = str(data_dir)
env["USE_COLOR"] = "False"
env["SHOW_PROGRESS"] = "False"
env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
env["PLUGINS"] = "title,favicon"
# Keep it fast but still real hooks
env["SAVE_TITLE"] = "True"
env["SAVE_FAVICON"] = "True"
env["SAVE_WGET"] = "False"
env["SAVE_WARC"] = "False"
env["SAVE_PDF"] = "False"
env["SAVE_SCREENSHOT"] = "False"
env["SAVE_DOM"] = "False"
env["SAVE_SINGLEFILE"] = "False"
env["SAVE_READABILITY"] = "False"
env["SAVE_MERCURY"] = "False"
env["SAVE_GIT"] = "False"
env["SAVE_YTDLP"] = "False"
env["SAVE_HEADERS"] = "False"
env["SAVE_HTMLTOTEXT"] = "False"
return env
def _count_running_processes(db_path: Path, where: str) -> int:
for _ in range(50):
try:
conn = sqlite3.connect(db_path, timeout=1)
cur = conn.cursor()
count = cur.execute(
f"SELECT COUNT(*) FROM machine_process WHERE status = 'running' AND {where}"
).fetchone()[0]
conn.close()
return count
except sqlite3.OperationalError:
time.sleep(0.1)
return 0
def _wait_for_count(db_path: Path, where: str, target: int, timeout: int = 20) -> bool:
start = time.time()
while time.time() - start < timeout:
if _count_running_processes(db_path, where) >= target:
return True
time.sleep(0.1)
return False
def test_add_parents_workers_to_orchestrator(tmp_path):
data_dir = tmp_path / "data"
data_dir.mkdir()
env = _make_env(data_dir)
init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
assert init.returncode == 0, init.stderr
add = _run([sys.executable, "-m", "archivebox", "add", "https://example.com"], data_dir, env, timeout=120)
assert add.returncode == 0, add.stderr
conn = sqlite3.connect(data_dir / "index.sqlite3")
cur = conn.cursor()
orchestrator = cur.execute(
"SELECT id FROM machine_process WHERE process_type = 'orchestrator' ORDER BY created_at DESC LIMIT 1"
).fetchone()
assert orchestrator is not None
orchestrator_id = orchestrator[0]
worker_count = cur.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'crawl' "
"AND parent_id = ?",
(orchestrator_id,),
).fetchone()[0]
conn.close()
assert worker_count >= 1, "Expected crawl worker to be parented to orchestrator"
def test_add_interrupt_cleans_orphaned_processes(tmp_path):
data_dir = tmp_path / "data"
data_dir.mkdir()
env = _make_env(data_dir)
init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
assert init.returncode == 0, init.stderr
proc = subprocess.Popen(
[sys.executable, "-m", "archivebox", "add", "https://example.com"],
cwd=data_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
db_path = data_dir / "index.sqlite3"
saw_worker = _wait_for_count(db_path, "process_type = 'worker'", 1, timeout=20)
assert saw_worker, "Expected at least one worker to start before interrupt"
proc.send_signal(signal.SIGINT)
proc.wait(timeout=30)
# Wait for workers/hooks to be cleaned up
start = time.time()
while time.time() - start < 30:
running = _count_running_processes(db_path, "process_type IN ('worker','hook')")
if running == 0:
break
time.sleep(0.2)
assert _count_running_processes(db_path, "process_type IN ('worker','hook')") == 0, (
"Expected no running worker/hook processes after interrupt"
)

View File

@@ -68,17 +68,8 @@ class TestJSONLParsing(unittest.TestCase):
def test_parse_clean_jsonl(self):
"""Clean JSONL format should be parsed correctly."""
stdout = '{"type": "ArchiveResult", "status": "succeeded", "output_str": "Done"}'
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
from archivebox.machine.models import Process
records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], 'ArchiveResult')
@@ -89,17 +80,8 @@ class TestJSONLParsing(unittest.TestCase):
"""Multiple JSONL records should all be parsed."""
stdout = '''{"type": "ArchiveResult", "status": "succeeded", "output_str": "Done"}
{"type": "Binary", "name": "wget", "abspath": "/usr/bin/wget"}'''
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
from archivebox.machine.models import Process
records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 2)
self.assertEqual(records[0]['type'], 'ArchiveResult')
@@ -111,59 +93,20 @@ class TestJSONLParsing(unittest.TestCase):
Processing URL: https://example.com
{"type": "ArchiveResult", "status": "succeeded", "output_str": "Downloaded"}
Hook completed successfully'''
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
from archivebox.machine.models import Process
records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['status'], 'succeeded')
def test_parse_legacy_result_json_format(self):
"""Legacy RESULT_JSON= format should be parsed for backwards compat."""
stdout = 'RESULT_JSON={"status": "succeeded", "output": "Done"}'
output_json = None
records = []
for line in stdout.splitlines():
line = line.strip()
if line.startswith('RESULT_JSON='):
try:
data = json.loads(line[len('RESULT_JSON='):])
if output_json is None:
output_json = data
data['type'] = 'ArchiveResult'
records.append(data)
except json.JSONDecodeError:
pass
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], 'ArchiveResult')
self.assertEqual(records[0]['status'], 'succeeded')
def test_ignore_invalid_json(self):
"""Invalid JSON should be silently ignored."""
stdout = '''{"type": "ArchiveResult", "status": "succeeded"}
{invalid json here}
not json at all
{"type": "Binary", "name": "wget"}'''
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
from archivebox.machine.models import Process
records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 2)
@@ -171,17 +114,8 @@ not json at all
"""JSON objects without 'type' field should be ignored."""
stdout = '''{"status": "succeeded", "output_str": "Done"}
{"type": "ArchiveResult", "status": "succeeded"}'''
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
pass
from archivebox.machine.models import Process
records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], 'ArchiveResult')
@@ -250,9 +184,9 @@ class TestHookDiscovery(unittest.TestCase):
(wget_dir / 'on_Snapshot__50_wget.py').write_text('# test hook')
(wget_dir / 'on_Crawl__00_install_wget.py').write_text('# install hook')
chrome_dir = self.plugins_dir / 'chrome_session'
chrome_dir = self.plugins_dir / 'chrome'
chrome_dir.mkdir()
(chrome_dir / 'on_Snapshot__20_chrome_session.bg.js').write_text('// background hook')
(chrome_dir / 'on_Snapshot__20_chrome_tab.bg.js').write_text('// background hook')
consolelog_dir = self.plugins_dir / 'consolelog'
consolelog_dir.mkdir()
@@ -274,7 +208,7 @@ class TestHookDiscovery(unittest.TestCase):
self.assertEqual(len(hooks), 3)
hook_names = [h.name for h in hooks]
self.assertIn('on_Snapshot__20_chrome_session.bg.js', hook_names)
self.assertIn('on_Snapshot__20_chrome_tab.bg.js', hook_names)
self.assertIn('on_Snapshot__21_consolelog.bg.js', hook_names)
self.assertIn('on_Snapshot__50_wget.py', hook_names)
@@ -288,7 +222,7 @@ class TestHookDiscovery(unittest.TestCase):
hooks = sorted(set(hooks), key=lambda p: p.name)
# Check numeric ordering
self.assertEqual(hooks[0].name, 'on_Snapshot__20_chrome_session.js')
self.assertEqual(hooks[0].name, 'on_Snapshot__20_chrome_tab.bg.js')
self.assertEqual(hooks[1].name, 'on_Snapshot__21_consolelog.bg.js')
self.assertEqual(hooks[2].name, 'on_Snapshot__50_wget.py')
@@ -348,9 +282,11 @@ print(json.dumps({"type": "ArchiveResult", "status": "succeeded", "output_str":
)
self.assertEqual(result.returncode, 0)
output = json.loads(result.stdout.strip())
self.assertEqual(output['type'], 'ArchiveResult')
self.assertEqual(output['status'], 'succeeded')
from archivebox.machine.models import Process
records = Process.parse_records_from_text(result.stdout)
self.assertTrue(records)
self.assertEqual(records[0]['type'], 'ArchiveResult')
self.assertEqual(records[0]['status'], 'succeeded')
def test_js_hook_execution(self):
"""JavaScript hook should execute and output JSONL."""
@@ -371,9 +307,11 @@ console.log(JSON.stringify({type: 'ArchiveResult', status: 'succeeded', output_s
)
self.assertEqual(result.returncode, 0)
output = json.loads(result.stdout.strip())
self.assertEqual(output['type'], 'ArchiveResult')
self.assertEqual(output['status'], 'succeeded')
from archivebox.machine.models import Process
records = Process.parse_records_from_text(result.stdout)
self.assertTrue(records)
self.assertEqual(records[0]['type'], 'ArchiveResult')
self.assertEqual(records[0]['status'], 'succeeded')
def test_hook_receives_cli_args(self):
"""Hook should receive CLI arguments."""
@@ -398,8 +336,10 @@ print(json.dumps({"type": "ArchiveResult", "status": "succeeded", "url": args.ge
)
self.assertEqual(result.returncode, 0)
output = json.loads(result.stdout.strip())
self.assertEqual(output['url'], 'https://example.com')
from archivebox.machine.models import Process
records = Process.parse_records_from_text(result.stdout)
self.assertTrue(records)
self.assertEqual(records[0]['url'], 'https://example.com')
class TestInstallHookOutput(unittest.TestCase):
@@ -424,7 +364,8 @@ class TestInstallHookOutput(unittest.TestCase):
'binprovider': 'apt',
})
data = json.loads(hook_output)
from archivebox.machine.models import Process
data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'Binary')
self.assertEqual(data['name'], 'wget')
self.assertTrue(data['abspath'].startswith('/'))
@@ -433,15 +374,16 @@ class TestInstallHookOutput(unittest.TestCase):
"""Install hook should output Machine config update JSONL."""
hook_output = json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/WGET_BINARY',
'value': '/usr/bin/wget',
'config': {
'WGET_BINARY': '/usr/bin/wget',
},
})
data = json.loads(hook_output)
from archivebox.machine.models import Process
data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'Machine')
self.assertEqual(data['_method'], 'update')
self.assertEqual(data['key'], 'config/WGET_BINARY')
self.assertIn('config', data)
self.assertEqual(data['config']['WGET_BINARY'], '/usr/bin/wget')
class TestSnapshotHookOutput(unittest.TestCase):
@@ -455,7 +397,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
'output_str': 'Downloaded 5 files',
})
data = json.loads(hook_output)
from archivebox.machine.models import Process
data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'ArchiveResult')
self.assertEqual(data['status'], 'succeeded')
self.assertIn('output_str', data)
@@ -469,7 +412,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
'cmd': ['/usr/bin/wget', '-p', '-k', 'https://example.com'],
})
data = json.loads(hook_output)
from archivebox.machine.models import Process
data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'ArchiveResult')
self.assertIsInstance(data['cmd'], list)
self.assertEqual(data['cmd'][0], '/usr/bin/wget')
@@ -487,7 +431,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
},
})
data = json.loads(hook_output)
from archivebox.machine.models import Process
data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'ArchiveResult')
self.assertIsInstance(data['output_json'], dict)
self.assertEqual(data['output_json']['status-code'], 200)
@@ -500,7 +445,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
'output_str': 'SAVE_WGET=False',
})
data = json.loads(hook_output)
from archivebox.machine.models import Process
data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['status'], 'skipped')
def test_snapshot_hook_failed_status(self):
@@ -511,7 +457,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
'output_str': '404 Not Found',
})
data = json.loads(hook_output)
from archivebox.machine.models import Process
data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['status'], 'failed')

View File

@@ -18,11 +18,10 @@ def test_search_json(process, disable_extractors_dict):
clean_str = re.sub(r'\x1b\[[0-9;]*m', '', output_str)
clean_str = re.sub(r'[\x00-\x1f\x7f]', lambda m: ' ' if m.group(0) in '\t\n\r' else '', clean_str)
output_json = json.loads(clean_str)
# With --index-only, only source file snapshots are created (file:// URLs)
# Verify we get at least one snapshot back
assert len(output_json) >= 1
# The snapshot should be a file:// URL pointing to sources
assert any("sources" in entry.get("url", "") for entry in output_json)
# Should include the requested URL
assert any("example.com" in entry.get("url", "") for entry in output_json)
def test_search_json_headers(process, disable_extractors_dict):
@@ -65,16 +64,17 @@ def test_search_csv(process, disable_extractors_dict):
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--csv", "url"], capture_output=True)
output_csv = search_process.stdout.decode("utf-8")
# Should contain the source file URL
assert "file://" in output_csv or "sources" in output_csv
# Should contain the requested URL
assert "example.com" in output_csv
def test_search_csv_headers(process, disable_extractors_dict):
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--csv", "url", "--with-headers"], capture_output=True)
output_csv = search_process.stdout.decode("utf-8")
# Should have url header and source file content
# Should have url header and requested URL
assert "url" in output_csv
assert "example.com" in output_csv
def test_search_with_headers_requires_format(process):
search_process = subprocess.run(["archivebox", "search", "--with-headers"], capture_output=True)

View File

@@ -0,0 +1,133 @@
import os
import sqlite3
import subprocess
from pathlib import Path
def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
candidates = {snapshot_id}
if len(snapshot_id) == 32:
hyphenated = f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}"
candidates.add(hyphenated)
elif len(snapshot_id) == 36 and '-' in snapshot_id:
candidates.add(snapshot_id.replace('-', ''))
for needle in candidates:
for path in data_dir.rglob(needle):
if path.is_dir():
return path
return None
def _find_html_with_text(root: Path, needle: str) -> list[Path]:
hits: list[Path] = []
for path in root.rglob("*.htm*"):
if not path.is_file():
continue
try:
if needle in path.read_text(errors="ignore"):
hits.append(path)
except Exception:
continue
return hits
def test_add_real_world_example_domain(tmp_path):
os.chdir(tmp_path)
tmp_short = Path("/tmp") / f"abx-{tmp_path.name}"
tmp_short.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["TMP_DIR"] = str(tmp_short)
env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
init = subprocess.run(
["archivebox", "init"],
capture_output=True,
text=True,
timeout=120,
env=env,
)
assert init.returncode == 0, f"archivebox init failed: {init.stderr}"
result = subprocess.run(
["archivebox", "add", "https://example.com"],
capture_output=True,
text=True,
timeout=900,
env=env,
)
assert result.returncode == 0, (
"archivebox add failed.\n"
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
conn = sqlite3.connect(tmp_path / "index.sqlite3")
c = conn.cursor()
snapshot_row = c.execute(
"SELECT id, url, title FROM core_snapshot WHERE url = ?",
("https://example.com",),
).fetchone()
assert snapshot_row is not None, "Snapshot for https://example.com not found in DB"
snapshot_id, snapshot_url, snapshot_title = snapshot_row
assert snapshot_title and "Example Domain" in snapshot_title, (
f"Expected title to contain Example Domain, got: {snapshot_title}"
)
failed_results = c.execute(
"SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ? AND status = 'failed'",
(snapshot_id,),
).fetchone()[0]
assert failed_results == 0, "Some archive results failed for example.com snapshot"
binary_workers = c.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary'"
).fetchone()[0]
assert binary_workers > 0, "Expected BinaryWorker to run installs via BinaryMachine"
failed_binary_workers = c.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary' "
"AND exit_code IS NOT NULL AND exit_code != 0"
).fetchone()[0]
assert failed_binary_workers == 0, "BinaryWorker reported non-zero exit codes"
queued_binaries = c.execute(
"SELECT name FROM machine_binary WHERE status != 'installed'"
).fetchall()
assert not queued_binaries, f"Some binaries did not install: {queued_binaries}"
conn.close()
snapshot_dir = _find_snapshot_dir(tmp_path, str(snapshot_id))
assert snapshot_dir is not None, "Snapshot output directory not found"
title_path = snapshot_dir / "title" / "title.txt"
assert title_path.exists(), f"Missing title output: {title_path}"
assert "Example Domain" in title_path.read_text(errors="ignore")
html_sources = []
for candidate in ("wget", "singlefile", "dom"):
for candidate_dir in (snapshot_dir / candidate, *snapshot_dir.glob(f"*_{candidate}")):
if candidate_dir.exists():
html_sources.extend(_find_html_with_text(candidate_dir, "Example Domain"))
assert len(html_sources) >= 2, (
"Expected HTML outputs from multiple extractors to contain Example Domain "
f"(found {len(html_sources)})."
)
text_hits = 0
for path in (
*snapshot_dir.glob("*_readability/content.txt"),
snapshot_dir / "readability" / "content.txt",
):
if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
text_hits += 1
for path in (
*snapshot_dir.glob("*_htmltotext/htmltotext.txt"),
snapshot_dir / "htmltotext" / "htmltotext.txt",
):
if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
text_hits += 1
assert text_hits >= 2, (
"Expected multiple text extractors to contain Example Domain "
f"(readability/htmltotext hits={text_hits})."
)

View File

@@ -0,0 +1,8 @@
from django.test import TestCase
class TestSignalWebhooksSettings(TestCase):
def test_task_handler_is_sync_in_tests(self):
from signal_webhooks.settings import webhook_settings
assert webhook_settings.TASK_HANDLER.__name__ == "sync_task_handler"

View File

@@ -4,7 +4,11 @@
import os
import subprocess
import sqlite3
import json
from archivebox.machine.models import Process
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
import uuid
import pytest
@@ -16,19 +20,51 @@ def test_snapshot_creates_snapshot_with_correct_url(tmp_path, process, disable_e
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'snapshot', 'https://example.com'],
['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
result = c.execute("SELECT url FROM core_snapshot WHERE url = ?",
('https://example.com',)).fetchone()
snapshot_row = c.execute(
"SELECT id, created_at, url, crawl_id FROM core_snapshot WHERE url = ?",
('https://example.com',)
).fetchone()
assert snapshot_row is not None
crawl_row = c.execute(
"SELECT id, created_at, urls, created_by_id FROM crawls_crawl WHERE id = ?",
(snapshot_row[3],)
).fetchone()
assert crawl_row is not None
user_row = c.execute(
"SELECT username FROM auth_user WHERE id = ?",
(crawl_row[3],)
).fetchone()
assert user_row is not None
conn.close()
assert result is not None
assert result[0] == 'https://example.com'
snapshot_id_raw, snapshot_created_at, snapshot_url, crawl_id = snapshot_row
snapshot_id = str(uuid.UUID(snapshot_id_raw))
crawl_id, crawl_created_at, crawl_urls, crawl_created_by_id = crawl_row
username = user_row[0]
crawl_date_str = datetime.fromisoformat(crawl_created_at).strftime('%Y%m%d')
snapshot_date_str = datetime.fromisoformat(snapshot_created_at).strftime('%Y%m%d')
domain = urlparse(snapshot_url).hostname or 'unknown'
# Verify crawl symlink exists and is relative
target_path = tmp_path / 'users' / username / 'snapshots' / snapshot_date_str / domain / snapshot_id
symlinks = [
p for p in tmp_path.rglob(str(snapshot_id))
if p.is_symlink()
]
assert symlinks, "Snapshot symlink should exist under crawl dir"
link_path = symlinks[0]
assert link_path.is_symlink(), "Snapshot symlink should exist under crawl dir"
link_target = os.readlink(link_path)
assert not os.path.isabs(link_target), "Symlink should be relative"
assert link_path.resolve() == target_path.resolve()
def test_snapshot_multiple_urls_creates_multiple_records(tmp_path, process, disable_extractors_dict):
@@ -36,11 +72,11 @@ def test_snapshot_multiple_urls_creates_multiple_records(tmp_path, process, disa
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'snapshot',
['archivebox', 'snapshot', 'create',
'https://example.com',
'https://iana.org'],
capture_output=True,
env=disable_extractors_dict,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
@@ -59,10 +95,10 @@ def test_snapshot_tag_creates_tag_and_links_to_snapshot(tmp_path, process, disab
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'snapshot', '--tag=mytesttag',
['archivebox', 'snapshot', 'create', '--tag=mytesttag',
'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
@@ -95,22 +131,15 @@ def test_snapshot_jsonl_output_has_correct_structure(tmp_path, process, disable_
# Pass URL as argument instead of stdin for more reliable behavior
result = subprocess.run(
['archivebox', 'snapshot', 'https://example.com'],
['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
text=True,
env=disable_extractors_dict,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
# Parse JSONL output lines
snapshot_records = []
for line in result.stdout.strip().split('\n'):
if line:
try:
record = json.loads(line)
if record.get('type') == 'Snapshot':
snapshot_records.append(record)
except json.JSONDecodeError:
continue
records = Process.parse_records_from_text(result.stdout)
snapshot_records = [r for r in records if r.get('type') == 'Snapshot']
assert len(snapshot_records) >= 1, "Should output at least one Snapshot JSONL record"
@@ -127,10 +156,10 @@ def test_snapshot_with_tag_stores_tag_name(tmp_path, process, disable_extractors
# Use command line args instead of stdin
subprocess.run(
['archivebox', 'snapshot', '--tag=customtag', 'https://example.com'],
['archivebox', 'snapshot', 'create', '--tag=customtag', 'https://example.com'],
capture_output=True,
text=True,
env=disable_extractors_dict,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
@@ -145,40 +174,40 @@ def test_snapshot_with_tag_stores_tag_name(tmp_path, process, disable_extractors
assert tag[0] == 'customtag'
def test_snapshot_with_depth_creates_crawl_object(tmp_path, process, disable_extractors_dict):
"""Test that --depth > 0 creates a Crawl object with correct max_depth."""
def test_snapshot_with_depth_sets_snapshot_depth(tmp_path, process, disable_extractors_dict):
"""Test that --depth sets snapshot depth when creating snapshots."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'snapshot', '--depth=1',
['archivebox', 'snapshot', 'create', '--depth=1',
'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
crawl = c.execute("SELECT max_depth FROM crawls_crawl ORDER BY created_at DESC LIMIT 1").fetchone()
snapshot = c.execute("SELECT depth FROM core_snapshot ORDER BY created_at DESC LIMIT 1").fetchone()
conn.close()
assert crawl is not None, "Crawl object should be created when depth > 0"
assert crawl[0] == 1, "Crawl max_depth should match --depth value"
assert snapshot is not None, "Snapshot should be created when depth is provided"
assert snapshot[0] == 1, "Snapshot depth should match --depth value"
def test_snapshot_deduplicates_urls(tmp_path, process, disable_extractors_dict):
"""Test that adding the same URL twice doesn't create duplicate snapshots."""
def test_snapshot_allows_duplicate_urls_across_crawls(tmp_path, process, disable_extractors_dict):
"""Snapshot create auto-creates a crawl per run; same URL can appear multiple times."""
os.chdir(tmp_path)
# Add same URL twice
subprocess.run(
['archivebox', 'snapshot', 'https://example.com'],
['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
subprocess.run(
['archivebox', 'snapshot', 'https://example.com'],
['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
@@ -187,7 +216,7 @@ def test_snapshot_deduplicates_urls(tmp_path, process, disable_extractors_dict):
('https://example.com',)).fetchone()[0]
conn.close()
assert count == 1, "Same URL should not create duplicate snapshots"
assert count == 2, "Same URL should create separate snapshots across different crawls"
if __name__ == '__main__':