{% if result.result %}
{# Use plugin-specific thumbnail template when ArchiveResult is available #}
- {% extractor_thumbnail result.result %}
+ {% plugin_thumbnail result.result %}
{% else %}
{# Fall back to generic iframe for filesystem-discovered files #}
@@ -476,7 +476,7 @@
{% if best_result.result %}
{# Use plugin-specific fullscreen template when ArchiveResult is available #}
- {% extractor_fullscreen best_result.result %}
+ {% plugin_fullscreen best_result.result %}
{% else %}
{# Fall back to generic iframe #}
diff --git a/archivebox/templates/static/admin.css b/archivebox/templates/static/admin.css
index 63bf87b2..0afdfe72 100755
--- a/archivebox/templates/static/admin.css
+++ b/archivebox/templates/static/admin.css
@@ -403,6 +403,38 @@ body.model-snapshot.change-list #content .object-tools {
margin-top: 1px;
}
+.files-icons {
+ display: inline-flex;
+ flex-wrap: wrap;
+ gap: 4px;
+ vertical-align: middle;
+}
+
+.files-icons a {
+ display: inline-flex;
+ align-items: center;
+ justify-content: center;
+ text-decoration: none;
+}
+
+.files-icons .abx-output-icon {
+ width: 18px;
+ height: 18px;
+ display: inline-flex;
+ align-items: center;
+ justify-content: center;
+ border-radius: 4px;
+ color: #1f2937;
+ background: rgba(15, 23, 42, 0.08);
+ box-shadow: inset 0 0 0 1px rgba(15, 23, 42, 0.08);
+}
+
+.files-icons .abx-output-icon svg {
+ width: 14px;
+ height: 14px;
+ display: block;
+}
+
.exists-False {
opacity: 0.1;
filter: grayscale(100%);
diff --git a/archivebox/tests/conftest.py b/archivebox/tests/conftest.py
index ff6f1875..ed2e5316 100644
--- a/archivebox/tests/conftest.py
+++ b/archivebox/tests/conftest.py
@@ -2,7 +2,6 @@
import os
import sys
-import json
import subprocess
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
@@ -110,16 +109,9 @@ def initialized_archive(isolated_data_dir):
# =============================================================================
def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]:
- """Parse JSONL output into list of dicts."""
- records = []
- for line in stdout.strip().split('\n'):
- line = line.strip()
- if line and line.startswith('{'):
- try:
- records.append(json.loads(line))
- except json.JSONDecodeError:
- pass
- return records
+ """Parse JSONL output into list of dicts via Process parser."""
+ from archivebox.machine.models import Process
+ return Process.parse_records_from_text(stdout or '')
def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1):
diff --git a/archivebox/tests/test_cli_add_interrupt.py b/archivebox/tests/test_cli_add_interrupt.py
new file mode 100644
index 00000000..a9343391
--- /dev/null
+++ b/archivebox/tests/test_cli_add_interrupt.py
@@ -0,0 +1,133 @@
+import os
+import signal
+import sqlite3
+import subprocess
+import sys
+import time
+from pathlib import Path
+
+
+def _run(cmd, data_dir: Path, env: dict, timeout: int = 120):
+ return subprocess.run(
+ cmd,
+ cwd=data_dir,
+ env=env,
+ capture_output=True,
+ text=True,
+ timeout=timeout,
+ )
+
+
+def _make_env(data_dir: Path) -> dict:
+ env = os.environ.copy()
+ env["DATA_DIR"] = str(data_dir)
+ env["USE_COLOR"] = "False"
+ env["SHOW_PROGRESS"] = "False"
+ env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
+ env["PLUGINS"] = "title,favicon"
+ # Keep it fast but still real hooks
+ env["SAVE_TITLE"] = "True"
+ env["SAVE_FAVICON"] = "True"
+ env["SAVE_WGET"] = "False"
+ env["SAVE_WARC"] = "False"
+ env["SAVE_PDF"] = "False"
+ env["SAVE_SCREENSHOT"] = "False"
+ env["SAVE_DOM"] = "False"
+ env["SAVE_SINGLEFILE"] = "False"
+ env["SAVE_READABILITY"] = "False"
+ env["SAVE_MERCURY"] = "False"
+ env["SAVE_GIT"] = "False"
+ env["SAVE_YTDLP"] = "False"
+ env["SAVE_HEADERS"] = "False"
+ env["SAVE_HTMLTOTEXT"] = "False"
+ return env
+
+
+def _count_running_processes(db_path: Path, where: str) -> int:
+ for _ in range(50):
+ try:
+ conn = sqlite3.connect(db_path, timeout=1)
+ cur = conn.cursor()
+ count = cur.execute(
+ f"SELECT COUNT(*) FROM machine_process WHERE status = 'running' AND {where}"
+ ).fetchone()[0]
+ conn.close()
+ return count
+ except sqlite3.OperationalError:
+ time.sleep(0.1)
+ return 0
+
+
+def _wait_for_count(db_path: Path, where: str, target: int, timeout: int = 20) -> bool:
+ start = time.time()
+ while time.time() - start < timeout:
+ if _count_running_processes(db_path, where) >= target:
+ return True
+ time.sleep(0.1)
+ return False
+
+
+def test_add_parents_workers_to_orchestrator(tmp_path):
+ data_dir = tmp_path / "data"
+ data_dir.mkdir()
+ env = _make_env(data_dir)
+
+ init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
+ assert init.returncode == 0, init.stderr
+
+ add = _run([sys.executable, "-m", "archivebox", "add", "https://example.com"], data_dir, env, timeout=120)
+ assert add.returncode == 0, add.stderr
+
+ conn = sqlite3.connect(data_dir / "index.sqlite3")
+ cur = conn.cursor()
+ orchestrator = cur.execute(
+ "SELECT id FROM machine_process WHERE process_type = 'orchestrator' ORDER BY created_at DESC LIMIT 1"
+ ).fetchone()
+ assert orchestrator is not None
+ orchestrator_id = orchestrator[0]
+
+ worker_count = cur.execute(
+ "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'crawl' "
+ "AND parent_id = ?",
+ (orchestrator_id,),
+ ).fetchone()[0]
+ conn.close()
+
+ assert worker_count >= 1, "Expected crawl worker to be parented to orchestrator"
+
+
+def test_add_interrupt_cleans_orphaned_processes(tmp_path):
+ data_dir = tmp_path / "data"
+ data_dir.mkdir()
+ env = _make_env(data_dir)
+
+ init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
+ assert init.returncode == 0, init.stderr
+
+ proc = subprocess.Popen(
+ [sys.executable, "-m", "archivebox", "add", "https://example.com"],
+ cwd=data_dir,
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ )
+
+ db_path = data_dir / "index.sqlite3"
+ saw_worker = _wait_for_count(db_path, "process_type = 'worker'", 1, timeout=20)
+ assert saw_worker, "Expected at least one worker to start before interrupt"
+
+ proc.send_signal(signal.SIGINT)
+ proc.wait(timeout=30)
+
+ # Wait for workers/hooks to be cleaned up
+ start = time.time()
+ while time.time() - start < 30:
+ running = _count_running_processes(db_path, "process_type IN ('worker','hook')")
+ if running == 0:
+ break
+ time.sleep(0.2)
+
+ assert _count_running_processes(db_path, "process_type IN ('worker','hook')") == 0, (
+ "Expected no running worker/hook processes after interrupt"
+ )
diff --git a/archivebox/tests/test_hooks.py b/archivebox/tests/test_hooks.py
index 54ac210a..308633ba 100755
--- a/archivebox/tests/test_hooks.py
+++ b/archivebox/tests/test_hooks.py
@@ -68,17 +68,8 @@ class TestJSONLParsing(unittest.TestCase):
def test_parse_clean_jsonl(self):
"""Clean JSONL format should be parsed correctly."""
stdout = '{"type": "ArchiveResult", "status": "succeeded", "output_str": "Done"}'
- records = []
- for line in stdout.splitlines():
- line = line.strip()
- if not line or not line.startswith('{'):
- continue
- try:
- data = json.loads(line)
- if 'type' in data:
- records.append(data)
- except json.JSONDecodeError:
- pass
+ from archivebox.machine.models import Process
+ records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], 'ArchiveResult')
@@ -89,17 +80,8 @@ class TestJSONLParsing(unittest.TestCase):
"""Multiple JSONL records should all be parsed."""
stdout = '''{"type": "ArchiveResult", "status": "succeeded", "output_str": "Done"}
{"type": "Binary", "name": "wget", "abspath": "/usr/bin/wget"}'''
- records = []
- for line in stdout.splitlines():
- line = line.strip()
- if not line or not line.startswith('{'):
- continue
- try:
- data = json.loads(line)
- if 'type' in data:
- records.append(data)
- except json.JSONDecodeError:
- pass
+ from archivebox.machine.models import Process
+ records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 2)
self.assertEqual(records[0]['type'], 'ArchiveResult')
@@ -111,59 +93,20 @@ class TestJSONLParsing(unittest.TestCase):
Processing URL: https://example.com
{"type": "ArchiveResult", "status": "succeeded", "output_str": "Downloaded"}
Hook completed successfully'''
- records = []
- for line in stdout.splitlines():
- line = line.strip()
- if not line or not line.startswith('{'):
- continue
- try:
- data = json.loads(line)
- if 'type' in data:
- records.append(data)
- except json.JSONDecodeError:
- pass
+ from archivebox.machine.models import Process
+ records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['status'], 'succeeded')
- def test_parse_legacy_result_json_format(self):
- """Legacy RESULT_JSON= format should be parsed for backwards compat."""
- stdout = 'RESULT_JSON={"status": "succeeded", "output": "Done"}'
- output_json = None
- records = []
- for line in stdout.splitlines():
- line = line.strip()
- if line.startswith('RESULT_JSON='):
- try:
- data = json.loads(line[len('RESULT_JSON='):])
- if output_json is None:
- output_json = data
- data['type'] = 'ArchiveResult'
- records.append(data)
- except json.JSONDecodeError:
- pass
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['type'], 'ArchiveResult')
- self.assertEqual(records[0]['status'], 'succeeded')
-
def test_ignore_invalid_json(self):
"""Invalid JSON should be silently ignored."""
stdout = '''{"type": "ArchiveResult", "status": "succeeded"}
{invalid json here}
not json at all
{"type": "Binary", "name": "wget"}'''
- records = []
- for line in stdout.splitlines():
- line = line.strip()
- if not line or not line.startswith('{'):
- continue
- try:
- data = json.loads(line)
- if 'type' in data:
- records.append(data)
- except json.JSONDecodeError:
- pass
+ from archivebox.machine.models import Process
+ records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 2)
@@ -171,17 +114,8 @@ not json at all
"""JSON objects without 'type' field should be ignored."""
stdout = '''{"status": "succeeded", "output_str": "Done"}
{"type": "ArchiveResult", "status": "succeeded"}'''
- records = []
- for line in stdout.splitlines():
- line = line.strip()
- if not line or not line.startswith('{'):
- continue
- try:
- data = json.loads(line)
- if 'type' in data:
- records.append(data)
- except json.JSONDecodeError:
- pass
+ from archivebox.machine.models import Process
+ records = Process.parse_records_from_text(stdout)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], 'ArchiveResult')
@@ -250,9 +184,9 @@ class TestHookDiscovery(unittest.TestCase):
(wget_dir / 'on_Snapshot__50_wget.py').write_text('# test hook')
(wget_dir / 'on_Crawl__00_install_wget.py').write_text('# install hook')
- chrome_dir = self.plugins_dir / 'chrome_session'
+ chrome_dir = self.plugins_dir / 'chrome'
chrome_dir.mkdir()
- (chrome_dir / 'on_Snapshot__20_chrome_session.bg.js').write_text('// background hook')
+ (chrome_dir / 'on_Snapshot__20_chrome_tab.bg.js').write_text('// background hook')
consolelog_dir = self.plugins_dir / 'consolelog'
consolelog_dir.mkdir()
@@ -274,7 +208,7 @@ class TestHookDiscovery(unittest.TestCase):
self.assertEqual(len(hooks), 3)
hook_names = [h.name for h in hooks]
- self.assertIn('on_Snapshot__20_chrome_session.bg.js', hook_names)
+ self.assertIn('on_Snapshot__20_chrome_tab.bg.js', hook_names)
self.assertIn('on_Snapshot__21_consolelog.bg.js', hook_names)
self.assertIn('on_Snapshot__50_wget.py', hook_names)
@@ -288,7 +222,7 @@ class TestHookDiscovery(unittest.TestCase):
hooks = sorted(set(hooks), key=lambda p: p.name)
# Check numeric ordering
- self.assertEqual(hooks[0].name, 'on_Snapshot__20_chrome_session.js')
+ self.assertEqual(hooks[0].name, 'on_Snapshot__20_chrome_tab.bg.js')
self.assertEqual(hooks[1].name, 'on_Snapshot__21_consolelog.bg.js')
self.assertEqual(hooks[2].name, 'on_Snapshot__50_wget.py')
@@ -348,9 +282,11 @@ print(json.dumps({"type": "ArchiveResult", "status": "succeeded", "output_str":
)
self.assertEqual(result.returncode, 0)
- output = json.loads(result.stdout.strip())
- self.assertEqual(output['type'], 'ArchiveResult')
- self.assertEqual(output['status'], 'succeeded')
+ from archivebox.machine.models import Process
+ records = Process.parse_records_from_text(result.stdout)
+ self.assertTrue(records)
+ self.assertEqual(records[0]['type'], 'ArchiveResult')
+ self.assertEqual(records[0]['status'], 'succeeded')
def test_js_hook_execution(self):
"""JavaScript hook should execute and output JSONL."""
@@ -371,9 +307,11 @@ console.log(JSON.stringify({type: 'ArchiveResult', status: 'succeeded', output_s
)
self.assertEqual(result.returncode, 0)
- output = json.loads(result.stdout.strip())
- self.assertEqual(output['type'], 'ArchiveResult')
- self.assertEqual(output['status'], 'succeeded')
+ from archivebox.machine.models import Process
+ records = Process.parse_records_from_text(result.stdout)
+ self.assertTrue(records)
+ self.assertEqual(records[0]['type'], 'ArchiveResult')
+ self.assertEqual(records[0]['status'], 'succeeded')
def test_hook_receives_cli_args(self):
"""Hook should receive CLI arguments."""
@@ -398,8 +336,10 @@ print(json.dumps({"type": "ArchiveResult", "status": "succeeded", "url": args.ge
)
self.assertEqual(result.returncode, 0)
- output = json.loads(result.stdout.strip())
- self.assertEqual(output['url'], 'https://example.com')
+ from archivebox.machine.models import Process
+ records = Process.parse_records_from_text(result.stdout)
+ self.assertTrue(records)
+ self.assertEqual(records[0]['url'], 'https://example.com')
class TestInstallHookOutput(unittest.TestCase):
@@ -424,7 +364,8 @@ class TestInstallHookOutput(unittest.TestCase):
'binprovider': 'apt',
})
- data = json.loads(hook_output)
+ from archivebox.machine.models import Process
+ data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'Binary')
self.assertEqual(data['name'], 'wget')
self.assertTrue(data['abspath'].startswith('/'))
@@ -433,15 +374,16 @@ class TestInstallHookOutput(unittest.TestCase):
"""Install hook should output Machine config update JSONL."""
hook_output = json.dumps({
'type': 'Machine',
- '_method': 'update',
- 'key': 'config/WGET_BINARY',
- 'value': '/usr/bin/wget',
+ 'config': {
+ 'WGET_BINARY': '/usr/bin/wget',
+ },
})
- data = json.loads(hook_output)
+ from archivebox.machine.models import Process
+ data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'Machine')
- self.assertEqual(data['_method'], 'update')
- self.assertEqual(data['key'], 'config/WGET_BINARY')
+ self.assertIn('config', data)
+ self.assertEqual(data['config']['WGET_BINARY'], '/usr/bin/wget')
class TestSnapshotHookOutput(unittest.TestCase):
@@ -455,7 +397,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
'output_str': 'Downloaded 5 files',
})
- data = json.loads(hook_output)
+ from archivebox.machine.models import Process
+ data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'ArchiveResult')
self.assertEqual(data['status'], 'succeeded')
self.assertIn('output_str', data)
@@ -469,7 +412,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
'cmd': ['/usr/bin/wget', '-p', '-k', 'https://example.com'],
})
- data = json.loads(hook_output)
+ from archivebox.machine.models import Process
+ data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'ArchiveResult')
self.assertIsInstance(data['cmd'], list)
self.assertEqual(data['cmd'][0], '/usr/bin/wget')
@@ -487,7 +431,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
},
})
- data = json.loads(hook_output)
+ from archivebox.machine.models import Process
+ data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['type'], 'ArchiveResult')
self.assertIsInstance(data['output_json'], dict)
self.assertEqual(data['output_json']['status-code'], 200)
@@ -500,7 +445,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
'output_str': 'SAVE_WGET=False',
})
- data = json.loads(hook_output)
+ from archivebox.machine.models import Process
+ data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['status'], 'skipped')
def test_snapshot_hook_failed_status(self):
@@ -511,7 +457,8 @@ class TestSnapshotHookOutput(unittest.TestCase):
'output_str': '404 Not Found',
})
- data = json.loads(hook_output)
+ from archivebox.machine.models import Process
+ data = Process.parse_records_from_text(hook_output)[0]
self.assertEqual(data['status'], 'failed')
diff --git a/archivebox/tests/test_list.py b/archivebox/tests/test_list.py
index b46596fa..d527fa5d 100644
--- a/archivebox/tests/test_list.py
+++ b/archivebox/tests/test_list.py
@@ -18,11 +18,10 @@ def test_search_json(process, disable_extractors_dict):
clean_str = re.sub(r'\x1b\[[0-9;]*m', '', output_str)
clean_str = re.sub(r'[\x00-\x1f\x7f]', lambda m: ' ' if m.group(0) in '\t\n\r' else '', clean_str)
output_json = json.loads(clean_str)
- # With --index-only, only source file snapshots are created (file:// URLs)
# Verify we get at least one snapshot back
assert len(output_json) >= 1
- # The snapshot should be a file:// URL pointing to sources
- assert any("sources" in entry.get("url", "") for entry in output_json)
+ # Should include the requested URL
+ assert any("example.com" in entry.get("url", "") for entry in output_json)
def test_search_json_headers(process, disable_extractors_dict):
@@ -65,16 +64,17 @@ def test_search_csv(process, disable_extractors_dict):
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--csv", "url"], capture_output=True)
output_csv = search_process.stdout.decode("utf-8")
- # Should contain the source file URL
- assert "file://" in output_csv or "sources" in output_csv
+ # Should contain the requested URL
+ assert "example.com" in output_csv
def test_search_csv_headers(process, disable_extractors_dict):
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--csv", "url", "--with-headers"], capture_output=True)
output_csv = search_process.stdout.decode("utf-8")
- # Should have url header and source file content
+ # Should have url header and requested URL
assert "url" in output_csv
+ assert "example.com" in output_csv
def test_search_with_headers_requires_format(process):
search_process = subprocess.run(["archivebox", "search", "--with-headers"], capture_output=True)
diff --git a/archivebox/tests/test_real_world_add.py b/archivebox/tests/test_real_world_add.py
new file mode 100644
index 00000000..3c72e622
--- /dev/null
+++ b/archivebox/tests/test_real_world_add.py
@@ -0,0 +1,133 @@
+import os
+import sqlite3
+import subprocess
+from pathlib import Path
+
+
+def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
+ candidates = {snapshot_id}
+ if len(snapshot_id) == 32:
+ hyphenated = f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}"
+ candidates.add(hyphenated)
+ elif len(snapshot_id) == 36 and '-' in snapshot_id:
+ candidates.add(snapshot_id.replace('-', ''))
+
+ for needle in candidates:
+ for path in data_dir.rglob(needle):
+ if path.is_dir():
+ return path
+ return None
+
+
+def _find_html_with_text(root: Path, needle: str) -> list[Path]:
+ hits: list[Path] = []
+ for path in root.rglob("*.htm*"):
+ if not path.is_file():
+ continue
+ try:
+ if needle in path.read_text(errors="ignore"):
+ hits.append(path)
+ except Exception:
+ continue
+ return hits
+
+
+def test_add_real_world_example_domain(tmp_path):
+ os.chdir(tmp_path)
+ tmp_short = Path("/tmp") / f"abx-{tmp_path.name}"
+ tmp_short.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["TMP_DIR"] = str(tmp_short)
+ env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
+
+ init = subprocess.run(
+ ["archivebox", "init"],
+ capture_output=True,
+ text=True,
+ timeout=120,
+ env=env,
+ )
+ assert init.returncode == 0, f"archivebox init failed: {init.stderr}"
+
+ result = subprocess.run(
+ ["archivebox", "add", "https://example.com"],
+ capture_output=True,
+ text=True,
+ timeout=900,
+ env=env,
+ )
+ assert result.returncode == 0, (
+ "archivebox add failed.\n"
+ f"stdout:\n{result.stdout}\n"
+ f"stderr:\n{result.stderr}"
+ )
+
+ conn = sqlite3.connect(tmp_path / "index.sqlite3")
+ c = conn.cursor()
+ snapshot_row = c.execute(
+ "SELECT id, url, title FROM core_snapshot WHERE url = ?",
+ ("https://example.com",),
+ ).fetchone()
+ assert snapshot_row is not None, "Snapshot for https://example.com not found in DB"
+ snapshot_id, snapshot_url, snapshot_title = snapshot_row
+ assert snapshot_title and "Example Domain" in snapshot_title, (
+ f"Expected title to contain Example Domain, got: {snapshot_title}"
+ )
+
+ failed_results = c.execute(
+ "SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ? AND status = 'failed'",
+ (snapshot_id,),
+ ).fetchone()[0]
+ assert failed_results == 0, "Some archive results failed for example.com snapshot"
+
+ binary_workers = c.execute(
+ "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary'"
+ ).fetchone()[0]
+ assert binary_workers > 0, "Expected BinaryWorker to run installs via BinaryMachine"
+
+ failed_binary_workers = c.execute(
+ "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary' "
+ "AND exit_code IS NOT NULL AND exit_code != 0"
+ ).fetchone()[0]
+ assert failed_binary_workers == 0, "BinaryWorker reported non-zero exit codes"
+
+ queued_binaries = c.execute(
+ "SELECT name FROM machine_binary WHERE status != 'installed'"
+ ).fetchall()
+ assert not queued_binaries, f"Some binaries did not install: {queued_binaries}"
+ conn.close()
+
+ snapshot_dir = _find_snapshot_dir(tmp_path, str(snapshot_id))
+ assert snapshot_dir is not None, "Snapshot output directory not found"
+
+ title_path = snapshot_dir / "title" / "title.txt"
+ assert title_path.exists(), f"Missing title output: {title_path}"
+ assert "Example Domain" in title_path.read_text(errors="ignore")
+
+ html_sources = []
+ for candidate in ("wget", "singlefile", "dom"):
+ for candidate_dir in (snapshot_dir / candidate, *snapshot_dir.glob(f"*_{candidate}")):
+ if candidate_dir.exists():
+ html_sources.extend(_find_html_with_text(candidate_dir, "Example Domain"))
+ assert len(html_sources) >= 2, (
+ "Expected HTML outputs from multiple extractors to contain Example Domain "
+ f"(found {len(html_sources)})."
+ )
+
+ text_hits = 0
+ for path in (
+ *snapshot_dir.glob("*_readability/content.txt"),
+ snapshot_dir / "readability" / "content.txt",
+ ):
+ if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
+ text_hits += 1
+ for path in (
+ *snapshot_dir.glob("*_htmltotext/htmltotext.txt"),
+ snapshot_dir / "htmltotext" / "htmltotext.txt",
+ ):
+ if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
+ text_hits += 1
+ assert text_hits >= 2, (
+ "Expected multiple text extractors to contain Example Domain "
+ f"(readability/htmltotext hits={text_hits})."
+ )
diff --git a/archivebox/tests/test_settings_signal_webhooks.py b/archivebox/tests/test_settings_signal_webhooks.py
new file mode 100644
index 00000000..acb6367d
--- /dev/null
+++ b/archivebox/tests/test_settings_signal_webhooks.py
@@ -0,0 +1,8 @@
+from django.test import TestCase
+
+
+class TestSignalWebhooksSettings(TestCase):
+ def test_task_handler_is_sync_in_tests(self):
+ from signal_webhooks.settings import webhook_settings
+
+ assert webhook_settings.TASK_HANDLER.__name__ == "sync_task_handler"
diff --git a/archivebox/tests/test_snapshot.py b/archivebox/tests/test_snapshot.py
index 7ca8e5c8..8d2fc3fc 100644
--- a/archivebox/tests/test_snapshot.py
+++ b/archivebox/tests/test_snapshot.py
@@ -4,7 +4,11 @@
import os
import subprocess
import sqlite3
-import json
+from archivebox.machine.models import Process
+from datetime import datetime
+from pathlib import Path
+from urllib.parse import urlparse
+import uuid
import pytest
@@ -16,19 +20,51 @@ def test_snapshot_creates_snapshot_with_correct_url(tmp_path, process, disable_e
os.chdir(tmp_path)
subprocess.run(
- ['archivebox', 'snapshot', 'https://example.com'],
+ ['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
- env=disable_extractors_dict,
+ env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
- result = c.execute("SELECT url FROM core_snapshot WHERE url = ?",
- ('https://example.com',)).fetchone()
+ snapshot_row = c.execute(
+ "SELECT id, created_at, url, crawl_id FROM core_snapshot WHERE url = ?",
+ ('https://example.com',)
+ ).fetchone()
+ assert snapshot_row is not None
+ crawl_row = c.execute(
+ "SELECT id, created_at, urls, created_by_id FROM crawls_crawl WHERE id = ?",
+ (snapshot_row[3],)
+ ).fetchone()
+ assert crawl_row is not None
+ user_row = c.execute(
+ "SELECT username FROM auth_user WHERE id = ?",
+ (crawl_row[3],)
+ ).fetchone()
+ assert user_row is not None
conn.close()
- assert result is not None
- assert result[0] == 'https://example.com'
+ snapshot_id_raw, snapshot_created_at, snapshot_url, crawl_id = snapshot_row
+ snapshot_id = str(uuid.UUID(snapshot_id_raw))
+ crawl_id, crawl_created_at, crawl_urls, crawl_created_by_id = crawl_row
+ username = user_row[0]
+ crawl_date_str = datetime.fromisoformat(crawl_created_at).strftime('%Y%m%d')
+ snapshot_date_str = datetime.fromisoformat(snapshot_created_at).strftime('%Y%m%d')
+ domain = urlparse(snapshot_url).hostname or 'unknown'
+
+ # Verify crawl symlink exists and is relative
+ target_path = tmp_path / 'users' / username / 'snapshots' / snapshot_date_str / domain / snapshot_id
+ symlinks = [
+ p for p in tmp_path.rglob(str(snapshot_id))
+ if p.is_symlink()
+ ]
+ assert symlinks, "Snapshot symlink should exist under crawl dir"
+ link_path = symlinks[0]
+
+ assert link_path.is_symlink(), "Snapshot symlink should exist under crawl dir"
+ link_target = os.readlink(link_path)
+ assert not os.path.isabs(link_target), "Symlink should be relative"
+ assert link_path.resolve() == target_path.resolve()
def test_snapshot_multiple_urls_creates_multiple_records(tmp_path, process, disable_extractors_dict):
@@ -36,11 +72,11 @@ def test_snapshot_multiple_urls_creates_multiple_records(tmp_path, process, disa
os.chdir(tmp_path)
subprocess.run(
- ['archivebox', 'snapshot',
+ ['archivebox', 'snapshot', 'create',
'https://example.com',
'https://iana.org'],
capture_output=True,
- env=disable_extractors_dict,
+ env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
@@ -59,10 +95,10 @@ def test_snapshot_tag_creates_tag_and_links_to_snapshot(tmp_path, process, disab
os.chdir(tmp_path)
subprocess.run(
- ['archivebox', 'snapshot', '--tag=mytesttag',
+ ['archivebox', 'snapshot', 'create', '--tag=mytesttag',
'https://example.com'],
capture_output=True,
- env=disable_extractors_dict,
+ env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
@@ -95,22 +131,15 @@ def test_snapshot_jsonl_output_has_correct_structure(tmp_path, process, disable_
# Pass URL as argument instead of stdin for more reliable behavior
result = subprocess.run(
- ['archivebox', 'snapshot', 'https://example.com'],
+ ['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
text=True,
- env=disable_extractors_dict,
+ env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
# Parse JSONL output lines
- snapshot_records = []
- for line in result.stdout.strip().split('\n'):
- if line:
- try:
- record = json.loads(line)
- if record.get('type') == 'Snapshot':
- snapshot_records.append(record)
- except json.JSONDecodeError:
- continue
+ records = Process.parse_records_from_text(result.stdout)
+ snapshot_records = [r for r in records if r.get('type') == 'Snapshot']
assert len(snapshot_records) >= 1, "Should output at least one Snapshot JSONL record"
@@ -127,10 +156,10 @@ def test_snapshot_with_tag_stores_tag_name(tmp_path, process, disable_extractors
# Use command line args instead of stdin
subprocess.run(
- ['archivebox', 'snapshot', '--tag=customtag', 'https://example.com'],
+ ['archivebox', 'snapshot', 'create', '--tag=customtag', 'https://example.com'],
capture_output=True,
text=True,
- env=disable_extractors_dict,
+ env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
@@ -145,40 +174,40 @@ def test_snapshot_with_tag_stores_tag_name(tmp_path, process, disable_extractors
assert tag[0] == 'customtag'
-def test_snapshot_with_depth_creates_crawl_object(tmp_path, process, disable_extractors_dict):
- """Test that --depth > 0 creates a Crawl object with correct max_depth."""
+def test_snapshot_with_depth_sets_snapshot_depth(tmp_path, process, disable_extractors_dict):
+ """Test that --depth sets snapshot depth when creating snapshots."""
os.chdir(tmp_path)
subprocess.run(
- ['archivebox', 'snapshot', '--depth=1',
+ ['archivebox', 'snapshot', 'create', '--depth=1',
'https://example.com'],
capture_output=True,
- env=disable_extractors_dict,
+ env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
- crawl = c.execute("SELECT max_depth FROM crawls_crawl ORDER BY created_at DESC LIMIT 1").fetchone()
+ snapshot = c.execute("SELECT depth FROM core_snapshot ORDER BY created_at DESC LIMIT 1").fetchone()
conn.close()
- assert crawl is not None, "Crawl object should be created when depth > 0"
- assert crawl[0] == 1, "Crawl max_depth should match --depth value"
+ assert snapshot is not None, "Snapshot should be created when depth is provided"
+ assert snapshot[0] == 1, "Snapshot depth should match --depth value"
-def test_snapshot_deduplicates_urls(tmp_path, process, disable_extractors_dict):
- """Test that adding the same URL twice doesn't create duplicate snapshots."""
+def test_snapshot_allows_duplicate_urls_across_crawls(tmp_path, process, disable_extractors_dict):
+ """Snapshot create auto-creates a crawl per run; same URL can appear multiple times."""
os.chdir(tmp_path)
# Add same URL twice
subprocess.run(
- ['archivebox', 'snapshot', 'https://example.com'],
+ ['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
- env=disable_extractors_dict,
+ env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
subprocess.run(
- ['archivebox', 'snapshot', 'https://example.com'],
+ ['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
- env=disable_extractors_dict,
+ env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
@@ -187,7 +216,7 @@ def test_snapshot_deduplicates_urls(tmp_path, process, disable_extractors_dict):
('https://example.com',)).fetchone()[0]
conn.close()
- assert count == 1, "Same URL should not create duplicate snapshots"
+ assert count == 2, "Same URL should create separate snapshots across different crawls"
if __name__ == '__main__':
diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py
index 4b8a2827..358c6ad9 100644
--- a/archivebox/workers/orchestrator.py
+++ b/archivebox/workers/orchestrator.py
@@ -83,6 +83,10 @@ class Orchestrator:
# In foreground mode (exit_on_idle=True), limit to 1 CrawlWorker
if self.exit_on_idle:
self.MAX_CRAWL_WORKERS = 1
+ # Faster UI updates for interactive runs
+ self.POLL_INTERVAL = 0.25
+ # Exit quickly once idle in foreground mode
+ self.IDLE_TIMEOUT = 1
def __repr__(self) -> str:
return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]'
@@ -111,8 +115,14 @@ class Orchestrator:
# Clean up any stale Process records from previous runs
stale_count = Process.cleanup_stale_running()
- # Clean up orphaned Chrome processes from previous crashes
- chrome_count = Process.cleanup_orphaned_chrome()
+ # Foreground runs should start fast; skip expensive orphan cleanup unless in daemon mode.
+ chrome_count = 0
+ orphaned_workers = 0
+ if not self.exit_on_idle:
+ # Clean up orphaned Chrome processes from previous crashes
+ chrome_count = Process.cleanup_orphaned_chrome()
+ # Clean up orphaned workers from previous crashes
+ orphaned_workers = Process.cleanup_orphaned_workers()
# Collect startup metadata
metadata = {
@@ -123,6 +133,8 @@ class Orchestrator:
metadata['cleaned_stale_pids'] = stale_count
if chrome_count:
metadata['cleaned_orphaned_chrome'] = chrome_count
+ if orphaned_workers:
+ metadata['cleaned_orphaned_workers'] = orphaned_workers
log_worker_event(
worker_type='Orchestrator',
@@ -135,30 +147,26 @@ class Orchestrator:
def terminate_all_workers(self) -> None:
"""Terminate all running worker processes."""
from archivebox.machine.models import Process
- import signal
-
- # Get all running worker processes
- running_workers = Process.objects.filter(
- process_type=Process.TypeChoices.WORKER,
- status__in=['running', 'started']
- )
+ # Get running worker processes scoped to this orchestrator when possible
+ if getattr(self, 'db_process', None):
+ running_workers = self._get_scoped_running_workers()
+ else:
+ running_workers = Process.objects.filter(
+ process_type=Process.TypeChoices.WORKER,
+ status=Process.StatusChoices.RUNNING,
+ )
for worker_process in running_workers:
try:
- # Send SIGTERM to gracefully terminate the worker
- os.kill(worker_process.pid, signal.SIGTERM)
- except ProcessLookupError:
- # Process already dead
- pass
+ # Gracefully terminate the worker and update Process status
+ worker_process.terminate(graceful_timeout=5.0)
except Exception:
- # Ignore other errors during shutdown
pass
def on_shutdown(self, error: BaseException | None = None) -> None:
"""Called when orchestrator shuts down."""
- # Terminate all worker processes in exit_on_idle mode
- if self.exit_on_idle:
- self.terminate_all_workers()
+ # Terminate all worker processes on shutdown
+ self.terminate_all_workers()
# Update Process record status
if hasattr(self, 'db_process') and self.db_process:
@@ -188,11 +196,26 @@ class Orchestrator:
Process.cleanup_stale_running()
self._last_cleanup_time = now
+ if self.crawl_id and getattr(self, 'db_process', None):
+ return self._get_scoped_running_workers().count()
+
return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES)
def get_running_workers_for_type(self, WorkerClass: Type[Worker]) -> int:
"""Get count of running workers for a specific worker type."""
+ if self.crawl_id and getattr(self, 'db_process', None):
+ return self._get_scoped_running_workers().filter(worker_type=WorkerClass.name).count()
return len(WorkerClass.get_running_workers())
+
+ def _get_scoped_running_workers(self):
+ """Get running workers scoped to this orchestrator process tree."""
+ from archivebox.machine.models import Process
+
+ descendants = self.db_process.get_descendants(include_self=False)
+ return descendants.filter(
+ process_type=Process.TypeChoices.WORKER,
+ status=Process.StatusChoices.RUNNING,
+ )
def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool:
"""Determine if we should spawn a new worker."""
@@ -208,8 +231,11 @@ class Orchestrator:
max_workers = 1 # Default for unknown types
# Check worker limit
- running_workers = WorkerClass.get_running_workers()
- running_count = len(running_workers)
+ if self.crawl_id and getattr(self, 'db_process', None) and WorkerClass.name != 'binary':
+ running_count = self._get_scoped_running_workers().filter(worker_type=WorkerClass.name).count()
+ else:
+ running_workers = WorkerClass.get_running_workers()
+ running_count = len(running_workers)
if running_count >= max_workers:
return False
@@ -225,9 +251,13 @@ class Orchestrator:
"""Spawn a new worker process. Returns PID or None if spawn failed."""
try:
print(f'[yellow]DEBUG: Spawning {WorkerClass.name} worker with crawl_id={self.crawl_id}...[/yellow]')
- pid = WorkerClass.start(crawl_id=self.crawl_id)
+ pid = WorkerClass.start(parent=self.db_process, crawl_id=self.crawl_id)
print(f'[yellow]DEBUG: Spawned {WorkerClass.name} worker with PID={pid}[/yellow]')
+ if self.exit_on_idle:
+ # Foreground runs have MAX_CRAWL_WORKERS=1; avoid blocking startup on registration.
+ return pid
+
# CRITICAL: Block until worker registers itself in Process table
# This prevents race condition where orchestrator spawns multiple workers
# before any of them finish on_startup() and register
@@ -316,7 +346,7 @@ class Orchestrator:
if binary_count > 0:
running_binary_workers_list = BinaryWorker.get_running_workers()
if len(running_binary_workers_list) == 0:
- BinaryWorker.start()
+ BinaryWorker.start(parent=self.db_process)
# Check if any BinaryWorkers are still running
running_binary_workers = len(BinaryWorker.get_running_workers())
@@ -344,7 +374,7 @@ class Orchestrator:
# Claim next crawl
crawl = crawl_queue.first()
if crawl and self._claim_crawl(crawl):
- CrawlWorker.start(crawl_id=str(crawl.id))
+ CrawlWorker.start(parent=self.db_process, crawl_id=str(crawl.id))
return queue_sizes
@@ -463,7 +493,7 @@ class Orchestrator:
with Live(
progress_layout.get_layout(),
- refresh_per_second=4,
+ refresh_per_second=8,
screen=True,
console=orchestrator_console,
):
@@ -521,41 +551,147 @@ class Orchestrator:
else:
status = "Idle"
+ binary_workers_count = worker_counts.get('binary', 0)
# Update orchestrator status
progress_layout.update_orchestrator_status(
status=status,
crawl_queue_count=crawl_queue_count,
crawl_workers_count=crawl_workers_count,
+ binary_queue_count=queue_sizes.get('binary', 0),
+ binary_workers_count=binary_workers_count,
max_crawl_workers=self.MAX_CRAWL_WORKERS,
)
- # Update CrawlWorker logs by tailing Process stdout/stderr
- if crawl_workers_count > 0:
- from archivebox.machine.models import Process
- crawl_worker_process = Process.objects.filter(
- process_type=Process.TypeChoices.WORKER,
- worker_type='crawl',
- status__in=['running', 'started']
- ).first()
- if crawl_worker_process:
- progress_layout.update_crawl_worker_logs(crawl_worker_process)
+ # Update crawl queue tree (active + recently completed)
+ from archivebox.crawls.models import Crawl
+ from archivebox.core.models import Snapshot, ArchiveResult
+ recent_cutoff = timezone.now() - timedelta(minutes=5)
+ pending_snapshot_candidates: list[Snapshot] = []
+ hooks_by_snapshot: dict[str, list] = {}
- # Log queue size changes
- if queue_sizes != last_queue_sizes:
- for worker_type, count in queue_sizes.items():
- old_count = last_queue_sizes.get(worker_type, 0)
- if count != old_count:
- if count > old_count:
- progress_layout.log_event(
- f"{worker_type.capitalize()} queue: {old_count} → {count}",
- style="yellow"
- )
- else:
- progress_layout.log_event(
- f"{worker_type.capitalize()} queue: {old_count} → {count}",
- style="green"
- )
- last_queue_sizes = queue_sizes.copy()
+ active_qs = Crawl.objects.exclude(status__in=Crawl.FINAL_STATES)
+ if self.crawl_id:
+ active_qs = active_qs.filter(id=self.crawl_id)
+ active_qs = active_qs.order_by('retry_at')
+
+ recent_done_qs = Crawl.objects.filter(
+ status__in=Crawl.FINAL_STATES,
+ modified_at__gte=recent_cutoff,
+ )
+ if self.crawl_id:
+ recent_done_qs = recent_done_qs.filter(id=self.crawl_id)
+ recent_done_qs = recent_done_qs.order_by('-modified_at')
+
+ crawls = list(active_qs)
+ active_ids = {c.id for c in crawls}
+ for crawl in recent_done_qs:
+ if crawl.id not in active_ids:
+ crawls.append(crawl)
+
+ def _abbrev(text: str, max_len: int = 80) -> str:
+ return text if len(text) <= max_len else f"{text[:max_len - 3]}..."
+
+ tree_data: list[dict] = []
+ for crawl in crawls:
+ urls = crawl.get_urls_list()
+ url_count = len(urls)
+ label = f"{url_count} url" + ("s" if url_count != 1 else "")
+ label = _abbrev(label)
+
+ snapshots = []
+ snap_qs = Snapshot.objects.filter(crawl_id=crawl.id)
+ active_snaps = list(
+ snap_qs.filter(status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED])
+ .order_by('created_at')[:16]
+ )
+ recent_snaps = list(
+ snap_qs.filter(status__in=Snapshot.FINAL_STATES)
+ .order_by('-modified_at')[:8]
+ )
+ snap_ids = {s.id for s in active_snaps}
+ for s in recent_snaps:
+ if s.id not in snap_ids:
+ active_snaps.append(s)
+
+ for snap in active_snaps:
+ total = snap.archiveresult_set.count()
+ completed = snap.archiveresult_set.filter(status__in=[
+ ArchiveResult.StatusChoices.SUCCEEDED,
+ ArchiveResult.StatusChoices.SKIPPED,
+ ArchiveResult.StatusChoices.FAILED,
+ ]).count()
+ running = snap.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED).count()
+ try:
+ from archivebox.config.configset import get_config
+ from archivebox.hooks import discover_hooks
+ hooks_list = discover_hooks('Snapshot', config=get_config(snapshot=snap))
+ total_hooks = len(hooks_list)
+ hooks_by_snapshot[str(snap.id)] = hooks_list
+ except Exception:
+ total_hooks = total
+ pending = max(total_hooks - completed - running, 0)
+ snap_label = _abbrev(snap.url or str(snap.id), max_len=60)
+ snapshots.append({
+ 'id': str(snap.id),
+ 'status': snap.status,
+ 'label': snap_label,
+ 'hooks': {'completed': completed, 'running': running, 'pending': pending} if total else {},
+ })
+ pending_snapshot_candidates.append(snap)
+
+ tree_data.append({
+ 'id': str(crawl.id),
+ 'status': crawl.status,
+ 'label': label,
+ 'snapshots': snapshots,
+ })
+
+ progress_layout.update_crawl_tree(tree_data)
+
+ # Update running process panels (tail stdout/stderr for each running process)
+ from archivebox.machine.models import Process
+ if self.crawl_id and getattr(self, 'db_process', None):
+ process_qs = self.db_process.get_descendants(include_self=False)
+ process_qs = process_qs.filter(status=Process.StatusChoices.RUNNING)
+ else:
+ process_qs = Process.objects.filter(
+ status=Process.StatusChoices.RUNNING,
+ ).exclude(process_type=Process.TypeChoices.ORCHESTRATOR)
+
+ running_processes = [
+ proc for proc in process_qs.order_by('process_type', 'worker_type', 'started_at')
+ if proc.is_running
+ ]
+ pending_processes = []
+ try:
+ from types import SimpleNamespace
+ for snap in pending_snapshot_candidates:
+ hooks_list = hooks_by_snapshot.get(str(snap.id), [])
+ if not hooks_list:
+ continue
+ existing = set(
+ snap.archiveresult_set.exclude(hook_name='').values_list('hook_name', flat=True)
+ )
+ for hook_path in hooks_list:
+ if hook_path.name in existing:
+ continue
+ pending_processes.append(SimpleNamespace(
+ process_type='hook',
+ worker_type='',
+ pid=None,
+ cmd=['', str(hook_path)],
+ url=snap.url,
+ status='queued',
+ started_at=None,
+ timeout=None,
+ pwd=None,
+ ))
+ except Exception:
+ pending_processes = []
+
+ progress_layout.update_process_panels(running_processes, pending=pending_processes)
+
+ last_queue_sizes = queue_sizes.copy()
# Update snapshot progress
from archivebox.core.models import Snapshot
@@ -641,11 +777,10 @@ class Orchestrator:
# Hooks created but none started yet
current_plugin = "waiting"
- # Update snapshot worker (show even if no hooks yet)
# Debug: Log first time we see this snapshot
- if snapshot.id not in progress_layout.snapshot_to_worker:
+ if snapshot.id not in snapshot_progress:
progress_layout.log_event(
- f"Assigning to worker: {snapshot.url[:50]}",
+ f"Tracking snapshot: {snapshot.url[:50]}",
style="grey53"
)
@@ -656,17 +791,21 @@ class Orchestrator:
if prev_progress != curr_progress:
prev_total, prev_completed, prev_plugin = prev_progress
- # Log hooks created
- if total > prev_total:
- progress_layout.log_event(
- f"Hooks created: {total} for {snapshot.url[:40]}",
- style="cyan"
- )
-
# Log hook completion
if completed > prev_completed:
+ completed_ar = snapshot.archiveresult_set.filter(
+ status__in=['succeeded', 'skipped', 'failed']
+ ).order_by('-end_ts', '-modified_at').first()
+ hook_label = ''
+ if completed_ar:
+ hook_name = completed_ar.hook_name or completed_ar.plugin or ''
+ if hook_name:
+ hook_label = hook_name.split('__')[-1] if '__' in hook_name else hook_name
+ hook_label = hook_label.replace('.py', '').replace('.js', '').replace('.sh', '').replace('.bg', '')
+ if not hook_label:
+ hook_label = f"{completed}/{total}"
progress_layout.log_event(
- f"Hook completed: {completed}/{total} for {snapshot.url[:40]}",
+ f"Hook completed: {hook_label}",
style="green"
)
@@ -686,23 +825,15 @@ class Orchestrator:
style="red"
)
- progress_layout.update_snapshot_worker(
- snapshot_id=snapshot.id,
- url=snapshot.url,
- total=max(total, 1), # Show at least 1 to avoid division by zero
- completed=completed,
- current_plugin=current_plugin,
- )
+ # No per-snapshot panels; logs only
- # Remove snapshots that are no longer active
- for snapshot_id in list(progress_layout.snapshot_to_worker.keys()):
+ # Cleanup progress tracking for completed snapshots
+ for snapshot_id in list(snapshot_progress.keys()):
if snapshot_id not in active_ids:
progress_layout.log_event(
f"Snapshot completed/removed",
style="blue"
)
- progress_layout.remove_snapshot_worker(snapshot_id)
- # Also clean up progress tracking
if snapshot_id in snapshot_progress:
del snapshot_progress[snapshot_id]
@@ -734,6 +865,7 @@ class Orchestrator:
if progress_layout:
progress_layout.log_event("Interrupted by user", style="red")
print() # Newline after ^C
+ self.on_shutdown(error=KeyboardInterrupt())
except BaseException as e:
if progress_layout:
progress_layout.log_event(f"Error: {e}", style="red")
diff --git a/archivebox/workers/tests/test_orchestrator.py b/archivebox/workers/tests/test_orchestrator.py
index d54331ec..79d37f95 100644
--- a/archivebox/workers/tests/test_orchestrator.py
+++ b/archivebox/workers/tests/test_orchestrator.py
@@ -215,6 +215,46 @@ class TestOrchestratorWithProcess(TestCase):
mock_count.assert_called()
self.assertTrue(result)
+ def test_orchestrator_scoped_worker_count(self):
+ """Orchestrator with crawl_id should count only descendant workers."""
+ import time
+ from archivebox.machine.models import Process, Machine
+
+ machine = Machine.current()
+ orchestrator = Orchestrator(exit_on_idle=True, crawl_id='test-crawl')
+
+ orchestrator.db_process = Process.objects.create(
+ machine=machine,
+ process_type=Process.TypeChoices.ORCHESTRATOR,
+ status=Process.StatusChoices.RUNNING,
+ pid=12345,
+ started_at=timezone.now(),
+ )
+
+ # Prevent cleanup from marking fake PIDs as exited
+ orchestrator._last_cleanup_time = time.time()
+
+ Process.objects.create(
+ machine=machine,
+ process_type=Process.TypeChoices.WORKER,
+ worker_type='crawl',
+ status=Process.StatusChoices.RUNNING,
+ pid=12346,
+ parent=orchestrator.db_process,
+ started_at=timezone.now(),
+ )
+
+ Process.objects.create(
+ machine=machine,
+ process_type=Process.TypeChoices.WORKER,
+ worker_type='crawl',
+ status=Process.StatusChoices.RUNNING,
+ pid=12347,
+ started_at=timezone.now(),
+ )
+
+ self.assertEqual(orchestrator.get_total_worker_count(), 1)
+
class TestProcessBasedWorkerTracking(TestCase):
"""Test Process model methods that replace pid_utils functionality."""
diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py
index 7546a02a..38f5361b 100644
--- a/archivebox/workers/worker.py
+++ b/archivebox/workers/worker.py
@@ -23,6 +23,7 @@ from django.db.models import QuerySet
from django.utils import timezone
from django.conf import settings
+from statemachine.exceptions import TransitionNotAllowed
from rich import print
from archivebox.misc.logging_util import log_worker_event
@@ -450,13 +451,34 @@ class CrawlWorker(Worker):
def runloop(self) -> None:
"""Run crawl state machine, spawn SnapshotWorkers."""
import sys
+ from archivebox.crawls.models import Crawl
self.on_startup()
try:
print(f'🔄 CrawlWorker starting for crawl {self.crawl_id}', file=sys.stderr)
+ if self.crawl.status == Crawl.StatusChoices.SEALED:
+ print(
+ '✅ This crawl has already completed and there are no tasks remaining.\n'
+ ' To re-crawl it, create a new crawl with the same URLs, e.g.\n'
+ ' archivebox crawl create
| archivebox run',
+ file=sys.stderr,
+ )
+ return
+
# Advance state machine: QUEUED → STARTED (triggers run() via @started.enter)
- self.crawl.sm.tick()
+ try:
+ self.crawl.sm.tick()
+ except TransitionNotAllowed:
+ if self.crawl.status == Crawl.StatusChoices.SEALED:
+ print(
+ '✅ This crawl has already completed and there are no tasks remaining.\n'
+ ' To re-crawl it, create a new crawl with the same URLs, e.g.\n'
+ ' archivebox crawl create | archivebox run',
+ file=sys.stderr,
+ )
+ return
+ raise
self.crawl.refresh_from_db()
print(f'🔄 tick() complete, crawl status={self.crawl.status}', file=sys.stderr)
@@ -509,13 +531,20 @@ class CrawlWorker(Worker):
status__in=['running', 'started'],
)
- # Extract snapshot IDs from their pwd (contains snapshot ID at the end)
+ # Extract snapshot IDs from worker cmd args (more reliable than pwd paths)
running_snapshot_ids = []
for proc in running_processes:
- if proc.pwd:
- # pwd is like: /path/to/archive/{timestamp}
- # We need to match this against snapshot.output_dir
- running_snapshot_ids.append(proc.pwd)
+ cmd = proc.cmd or []
+ snapshot_id = None
+ for i, part in enumerate(cmd):
+ if part == '--snapshot-id' and i + 1 < len(cmd):
+ snapshot_id = cmd[i + 1]
+ break
+ if part.startswith('--snapshot-id='):
+ snapshot_id = part.split('=', 1)[1]
+ break
+ if snapshot_id:
+ running_snapshot_ids.append(snapshot_id)
# Find snapshots that don't have a running worker
all_snapshots = Snapshot.objects.filter(
@@ -526,7 +555,7 @@ class CrawlWorker(Worker):
# Filter out snapshots that already have workers
pending_snapshots = [
snap for snap in all_snapshots
- if snap.output_dir not in running_snapshot_ids
+ if str(snap.id) not in running_snapshot_ids
][:self.MAX_SNAPSHOT_WORKERS - running_count]
with open(debug_log, 'a') as f:
@@ -631,7 +660,6 @@ class SnapshotWorker(Worker):
b. If foreground: wait for completion
c. If background: track but continue to next hook
d. Update ArchiveResult status
- e. Advance current_step when all step's hooks complete
4. When all hooks done: seal snapshot
5. On shutdown: SIGTERM all background hooks
"""
@@ -662,7 +690,7 @@ class SnapshotWorker(Worker):
def runloop(self) -> None:
"""Execute all hooks sequentially."""
- from archivebox.hooks import discover_hooks, is_background_hook, extract_step
+ from archivebox.hooks import discover_hooks, is_background_hook
from archivebox.core.models import ArchiveResult
from archivebox.config.configset import get_config
@@ -679,8 +707,7 @@ class SnapshotWorker(Worker):
# Execute each hook sequentially
for hook_path in hooks:
hook_name = hook_path.name
- plugin = self._extract_plugin_name(hook_name)
- hook_step = extract_step(hook_name)
+ plugin = self._extract_plugin_name(hook_path, hook_name)
is_background = is_background_hook(hook_name)
# Create ArchiveResult for THIS HOOK (not per plugin)
@@ -724,16 +751,18 @@ class SnapshotWorker(Worker):
pid=self.pid,
)
- # Check if we can advance to next step
- self._try_advance_step()
+ # Reap any background hooks that finished while we worked
+ self._reap_background_hooks()
- # All hooks launched (or completed) - seal using state machine
+ # All hooks launched (or completed) - terminate bg hooks and seal
+ self._finalize_background_hooks()
# This triggers enter_sealed() which calls cleanup() and checks parent crawl sealing
self.snapshot.sm.seal()
self.snapshot.refresh_from_db()
except Exception as e:
# Mark snapshot as sealed even on error (still triggers cleanup)
+ self._finalize_background_hooks()
self.snapshot.sm.seal()
self.snapshot.refresh_from_db()
raise
@@ -753,7 +782,6 @@ class SnapshotWorker(Worker):
script=hook_path,
output_dir=output_dir,
config=config,
- timeout=120,
parent=self.db_process,
url=str(self.snapshot.url),
snapshot_id=str(self.snapshot.id),
@@ -773,12 +801,22 @@ class SnapshotWorker(Worker):
except TimeoutError:
# Hook exceeded timeout - kill it
process.kill(signal_num=9)
- exit_code = -1
+ exit_code = process.exit_code or 137
# Update ArchiveResult from hook output
ar.update_from_output()
ar.end_ts = timezone.now()
+ # Apply hook-emitted JSONL records regardless of exit code
+ from archivebox.hooks import extract_records_from_process, process_hook_records
+
+ records = extract_records_from_process(process)
+ if records:
+ process_hook_records(
+ records,
+ overrides={'snapshot': self.snapshot, 'crawl': self.snapshot.crawl},
+ )
+
# Determine final status from hook exit code
if exit_code == 0:
ar.status = ar.StatusChoices.SUCCEEDED
@@ -787,34 +825,53 @@ class SnapshotWorker(Worker):
ar.save(update_fields=['status', 'end_ts', 'modified_at'])
- def _try_advance_step(self) -> None:
- """Advance current_step if all foreground hooks in current step are done."""
- from django.db.models import Q
+ def _finalize_background_hooks(self) -> None:
+ """Gracefully terminate background hooks and update their ArchiveResults."""
+ if getattr(self, '_background_hooks_finalized', False):
+ return
+
+ self._background_hooks_finalized = True
+
+ # Send SIGTERM and wait up to each hook's remaining timeout
+ self._terminate_background_hooks(
+ background_processes=self.background_processes,
+ worker_type='SnapshotWorker',
+ indent_level=2,
+ )
+
+ # Clear to avoid double-termination during on_shutdown
+ self.background_processes = {}
+
+ # Update STARTED background results now that hooks are done
from archivebox.core.models import ArchiveResult
- current_step = self.snapshot.current_step
-
- # Single query: foreground hooks in current step that aren't finished
- # Foreground hooks: hook_name doesn't contain '.bg.'
- pending_foreground = self.snapshot.archiveresult_set.filter(
- Q(hook_name__contains=f'__{current_step}_') & # Current step
- ~Q(hook_name__contains='.bg.') & # Not background
- ~Q(status__in=ArchiveResult.FINAL_STATES) # Not finished
- ).exists()
-
- if pending_foreground:
- return # Still waiting for hooks
-
- # All foreground hooks done - advance!
- self.snapshot.current_step += 1
- self.snapshot.save(update_fields=['current_step', 'modified_at'])
-
- log_worker_event(
- worker_type='SnapshotWorker',
- event=f'Advanced to step {self.snapshot.current_step}',
- indent_level=2,
- pid=self.pid,
+ started_bg = self.snapshot.archiveresult_set.filter(
+ status=ArchiveResult.StatusChoices.STARTED,
+ hook_name__contains='.bg.',
)
+ for ar in started_bg:
+ ar.update_from_output()
+
+ def _reap_background_hooks(self) -> None:
+ """Update ArchiveResults for background hooks that already exited."""
+ if getattr(self, '_background_hooks_finalized', False):
+ return
+ if not self.background_processes:
+ return
+
+ from archivebox.core.models import ArchiveResult
+
+ for hook_name, process in list(self.background_processes.items()):
+ exit_code = process.poll()
+ if exit_code is None:
+ continue
+
+ ar = self.snapshot.archiveresult_set.filter(hook_name=hook_name).first()
+ if ar and ar.status == ArchiveResult.StatusChoices.STARTED:
+ ar.update_from_output()
+
+ # Remove completed hook from tracking
+ self.background_processes.pop(hook_name, None)
def on_shutdown(self, error: BaseException | None = None) -> None:
"""
@@ -834,12 +891,15 @@ class SnapshotWorker(Worker):
super().on_shutdown(error)
@staticmethod
- def _extract_plugin_name(hook_name: str) -> str:
- """Extract plugin name from hook filename."""
- # on_Snapshot__50_wget.py -> wget
- name = hook_name.split('__')[-1] # Get part after last __
+ def _extract_plugin_name(hook_path: Path, hook_name: str) -> str:
+ """Extract plugin name from hook path (fallback to filename)."""
+ plugin_dir = hook_path.parent.name
+ if plugin_dir not in ('plugins', '.'):
+ return plugin_dir
+ # Fallback: on_Snapshot__50_wget.py -> wget
+ name = hook_name.split('__')[-1]
name = name.replace('.py', '').replace('.js', '').replace('.sh', '')
- name = name.replace('.bg', '') # Remove .bg suffix
+ name = name.replace('.bg', '')
return name
@@ -888,7 +948,7 @@ class BinaryWorker(Worker):
machine=machine,
status=Binary.StatusChoices.QUEUED,
retry_at__lte=timezone.now()
- ).order_by('retry_at')
+ ).order_by('retry_at', 'created_at', 'name')
def runloop(self) -> None:
"""Install binary(ies)."""