more migration id/uuid and config propagation fixes

This commit is contained in:
Nick Sweeting
2026-01-04 16:16:26 -08:00
parent 839ae744cf
commit 456aaee287
16 changed files with 789 additions and 94 deletions

View File

@@ -308,8 +308,8 @@ class Worker:
crawl = Crawl.objects.get(id=crawl_id)
cmd = [sys.executable, '-m', 'archivebox', 'run', '--crawl-id', str(crawl_id)]
pwd = Path(crawl.OUTPUT_DIR) # Run in crawl's output directory
env = get_config(scope='crawl', crawl=crawl)
pwd = Path(crawl.output_dir) # Run in crawl's output directory
env = get_config(crawl=crawl)
elif cls.name == 'snapshot':
snapshot_id = kwargs.get('snapshot_id')
@@ -321,7 +321,7 @@ class Worker:
cmd = [sys.executable, '-m', 'archivebox', 'run', '--snapshot-id', str(snapshot_id)]
pwd = Path(snapshot.output_dir) # Run in snapshot's output directory
env = get_config(scope='snapshot', snapshot=snapshot)
env = get_config(snapshot=snapshot)
else:
raise ValueError(f"Unknown worker type: {cls.name}")
@@ -459,6 +459,8 @@ class CrawlWorker(Worker):
from pathlib import Path
from archivebox.core.models import Snapshot
from archivebox.machine.models import Process
import sys
import threading
debug_log = Path('/tmp/archivebox_crawl_worker_debug.log')
@@ -514,7 +516,9 @@ class CrawlWorker(Worker):
with open(debug_log, 'a') as f:
f.write(f' Spawning worker for {snapshot.url} (status={snapshot.status})\n')
f.flush()
SnapshotWorker.start(parent=self.db_process, snapshot_id=str(snapshot.id))
pid = SnapshotWorker.start(parent=self.db_process, snapshot_id=str(snapshot.id))
log_worker_event(
worker_type='CrawlWorker',
event=f'Spawned SnapshotWorker for {snapshot.url}',
@@ -522,6 +526,18 @@ class CrawlWorker(Worker):
pid=self.pid,
)
# Pipe the SnapshotWorker's stderr to our stderr so we can see what's happening
# Get the Process record that was just created
worker_process = Process.objects.filter(pid=pid).first()
if worker_process:
# Pipe stderr in background thread so it doesn't block
def pipe_worker_stderr():
for line in worker_process.tail_stderr(lines=0, follow=True):
print(f' [SnapshotWorker] {line}', file=sys.stderr, flush=True)
thread = threading.Thread(target=pipe_worker_stderr, daemon=True)
thread.start()
def _is_crawl_finished(self) -> bool:
"""Check if all snapshots are sealed."""
from pathlib import Path
@@ -626,16 +642,28 @@ class SnapshotWorker(Worker):
"""Execute all hooks sequentially."""
from archivebox.hooks import discover_hooks, is_background_hook, extract_step
from archivebox.core.models import ArchiveResult
from archivebox.config.configset import get_config
self.on_startup()
try:
# Get merged config (includes env vars passed via Process.env, snapshot.config, defaults, etc.)
config = get_config(snapshot=self.snapshot)
# Discover all hooks for this snapshot
hooks = discover_hooks('Snapshot', config=self.snapshot.config)
hooks = discover_hooks('Snapshot', config=config)
hooks = sorted(hooks, key=lambda h: h.name) # Sort by name (includes step prefix)
import sys
print(f'[SnapshotWorker] Discovered {len(hooks)} hooks for snapshot {self.snapshot.url}', file=sys.stderr, flush=True)
if hooks:
print(f'[SnapshotWorker] First 5 hooks: {[h.name for h in hooks[:5]]}', file=sys.stderr, flush=True)
else:
print(f'[SnapshotWorker] WARNING: No hooks discovered! Config keys: {list(config.keys())[:10]}...', file=sys.stderr, flush=True)
# Execute each hook sequentially
for hook_path in hooks:
print(f'[SnapshotWorker] Running hook: {hook_path.name}', file=sys.stderr, flush=True)
hook_name = hook_path.name
plugin = self._extract_plugin_name(hook_name)
hook_step = extract_step(hook_name)
@@ -661,7 +689,7 @@ class SnapshotWorker(Worker):
ar.save(update_fields=['status', 'start_ts', 'modified_at'])
# Fork and run the hook
process = self._run_hook(hook_path, ar)
process = self._run_hook(hook_path, ar, config)
if is_background:
# Track but don't wait
@@ -698,7 +726,7 @@ class SnapshotWorker(Worker):
finally:
self.on_shutdown()
def _run_hook(self, hook_path: Path, ar: Any) -> Any:
def _run_hook(self, hook_path: Path, ar: Any, config: dict) -> Any:
"""Fork and run a hook using Process model, return Process."""
from archivebox.hooks import run_hook
@@ -710,7 +738,7 @@ class SnapshotWorker(Worker):
process = run_hook(
script=hook_path,
output_dir=output_dir,
config=self.snapshot.config,
config=config,
timeout=120,
parent=self.db_process,
url=str(self.snapshot.url),