mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-06 07:47:53 +10:00
Fix plugin hook env and extractor retries
This commit is contained in:
@@ -364,6 +364,24 @@ def run_hook(
|
||||
env['ARCHIVE_DIR'] = str(getattr(settings, 'ARCHIVE_DIR', Path.cwd() / 'archive'))
|
||||
env.setdefault('MACHINE_ID', getattr(settings, 'MACHINE_ID', '') or os.environ.get('MACHINE_ID', ''))
|
||||
|
||||
resolved_output_dir = output_dir.resolve()
|
||||
output_parts = set(resolved_output_dir.parts)
|
||||
if 'snapshots' in output_parts:
|
||||
env['SNAP_DIR'] = str(resolved_output_dir.parent)
|
||||
if 'crawls' in output_parts:
|
||||
env['CRAWL_DIR'] = str(resolved_output_dir.parent)
|
||||
|
||||
crawl_id = kwargs.get('_crawl_id') or kwargs.get('crawl_id')
|
||||
if crawl_id:
|
||||
try:
|
||||
from archivebox.crawls.models import Crawl
|
||||
|
||||
crawl = Crawl.objects.filter(id=crawl_id).first()
|
||||
if crawl:
|
||||
env['CRAWL_DIR'] = str(crawl.output_dir)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Get LIB_DIR and LIB_BIN_DIR from config
|
||||
lib_dir = config.get('LIB_DIR', getattr(settings, 'LIB_DIR', None))
|
||||
lib_bin_dir = config.get('LIB_BIN_DIR', getattr(settings, 'LIB_BIN_DIR', None))
|
||||
@@ -426,7 +444,7 @@ def run_hook(
|
||||
|
||||
# Export all config values to environment (already merged by get_config())
|
||||
# Skip keys we've already handled specially above (PATH, LIB_DIR, LIB_BIN_DIR, NODE_PATH, etc.)
|
||||
SKIP_KEYS = {'PATH', 'LIB_DIR', 'LIB_BIN_DIR', 'NODE_PATH', 'NODE_MODULES_DIR', 'DATA_DIR', 'ARCHIVE_DIR', 'MACHINE_ID'}
|
||||
SKIP_KEYS = {'PATH', 'LIB_DIR', 'LIB_BIN_DIR', 'NODE_PATH', 'NODE_MODULES_DIR', 'DATA_DIR', 'ARCHIVE_DIR', 'MACHINE_ID', 'SNAP_DIR', 'CRAWL_DIR'}
|
||||
for key, value in config.items():
|
||||
if key in SKIP_KEYS:
|
||||
continue # Already handled specially above, don't overwrite
|
||||
|
||||
@@ -147,4 +147,6 @@ def test_headers_retrieved(tmp_path, process, disable_extractors_dict):
|
||||
assert output_file is not None, f"Expected headers output_files to include a JSON file, got: {output_files}"
|
||||
with open(output_file, 'r', encoding='utf-8') as f:
|
||||
headers = pyjson.load(f)
|
||||
assert 'Content-Type' in headers or 'content-type' in headers
|
||||
response_headers = headers.get("response_headers") or headers.get("headers") or {}
|
||||
assert isinstance(response_headers, dict), f"Expected response_headers dict, got: {response_headers!r}"
|
||||
assert 'Content-Type' in response_headers or 'content-type' in response_headers
|
||||
|
||||
@@ -165,30 +165,40 @@ class Worker:
|
||||
"""
|
||||
import signal
|
||||
import time
|
||||
from archivebox.hooks import is_finite_background_hook
|
||||
|
||||
if not background_processes:
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
|
||||
# Phase 1: Send SIGTERM to ALL background processes + children in parallel
|
||||
# Phase 1: Ask daemon hooks to shut down, but let finite hooks finish naturally.
|
||||
log_worker_event(
|
||||
worker_type=worker_type,
|
||||
event=f'Sending SIGTERM to {len(background_processes)} background hooks (+ children)',
|
||||
event=f'Finalizing {len(background_processes)} background hooks (+ children)',
|
||||
indent_level=indent_level,
|
||||
pid=self.pid,
|
||||
)
|
||||
|
||||
# Build deadline map first (before killing, to get accurate remaining time)
|
||||
# Build deadline map first (before signaling, to get accurate remaining time)
|
||||
deadlines = {}
|
||||
for hook_name, process in background_processes.items():
|
||||
elapsed = now - process.started_at.timestamp()
|
||||
remaining = max(0, process.timeout - elapsed)
|
||||
deadline = now + remaining
|
||||
deadlines[hook_name] = (process, deadline)
|
||||
deadlines[hook_name] = {
|
||||
'process': process,
|
||||
'soft_deadline': now + remaining,
|
||||
'hard_deadline': now + remaining,
|
||||
'is_finite': is_finite_background_hook(hook_name),
|
||||
'term_sent': False,
|
||||
}
|
||||
|
||||
# Send SIGTERM to all process trees in parallel (non-blocking)
|
||||
for hook_name, process in background_processes.items():
|
||||
# Send SIGTERM only to daemon-style hooks immediately.
|
||||
for hook_name, state in deadlines.items():
|
||||
if state['is_finite']:
|
||||
continue
|
||||
|
||||
process = state['process']
|
||||
try:
|
||||
# Get chrome children (renderer processes etc) before sending signal
|
||||
children_pids = process.get_children_pids()
|
||||
@@ -209,6 +219,7 @@ class Worker:
|
||||
else:
|
||||
# No children - normal kill
|
||||
os.kill(process.pid, signal.SIGTERM)
|
||||
state['term_sent'] = True
|
||||
except ProcessLookupError:
|
||||
pass # Already dead
|
||||
except Exception as e:
|
||||
@@ -220,11 +231,12 @@ class Worker:
|
||||
)
|
||||
|
||||
# Phase 2: Wait for all processes in parallel, respecting individual timeouts
|
||||
for hook_name, (process, deadline) in deadlines.items():
|
||||
remaining = deadline - now
|
||||
for hook_name, state in deadlines.items():
|
||||
remaining = state['soft_deadline'] - now
|
||||
action = 'finish naturally' if state['is_finite'] else 'shut down'
|
||||
log_worker_event(
|
||||
worker_type=worker_type,
|
||||
event=f'Waiting up to {remaining:.1f}s for {hook_name}',
|
||||
event=f'Waiting up to {remaining:.1f}s for {hook_name} to {action}',
|
||||
indent_level=indent_level,
|
||||
pid=self.pid,
|
||||
)
|
||||
@@ -237,7 +249,8 @@ class Worker:
|
||||
now = time.time()
|
||||
|
||||
for hook_name in list(still_running):
|
||||
process, deadline = deadlines[hook_name]
|
||||
state = deadlines[hook_name]
|
||||
process = state['process']
|
||||
|
||||
# Check if process exited using Process.poll()
|
||||
exit_code = process.poll()
|
||||
@@ -252,9 +265,41 @@ class Worker:
|
||||
)
|
||||
continue
|
||||
|
||||
# Check if deadline exceeded
|
||||
if now >= deadline:
|
||||
# Timeout exceeded - SIGKILL process tree
|
||||
# Finite hooks get to use their full timeout before we ask them to stop.
|
||||
if state['is_finite'] and not state['term_sent'] and now >= state['soft_deadline']:
|
||||
try:
|
||||
children_pids = process.get_children_pids()
|
||||
if children_pids:
|
||||
os.kill(process.pid, signal.SIGTERM)
|
||||
for child_pid in children_pids:
|
||||
try:
|
||||
os.kill(child_pid, signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
else:
|
||||
os.kill(process.pid, signal.SIGTERM)
|
||||
state['term_sent'] = True
|
||||
state['hard_deadline'] = now + 2.0
|
||||
log_worker_event(
|
||||
worker_type=worker_type,
|
||||
event=f'⚠ Sent SIGTERM to timed-out finite hook {hook_name}',
|
||||
indent_level=indent_level,
|
||||
pid=self.pid,
|
||||
)
|
||||
except ProcessLookupError:
|
||||
still_running.remove(hook_name)
|
||||
except Exception as e:
|
||||
log_worker_event(
|
||||
worker_type=worker_type,
|
||||
event=f'Failed to SIGTERM {hook_name}: {e}',
|
||||
indent_level=indent_level,
|
||||
pid=self.pid,
|
||||
)
|
||||
continue
|
||||
|
||||
# Check if hard deadline exceeded
|
||||
if state['term_sent'] and now >= state['hard_deadline']:
|
||||
# Shutdown grace period exceeded - SIGKILL process tree
|
||||
try:
|
||||
# Get children before killing (chrome may have spawned more)
|
||||
children_pids = process.get_children_pids()
|
||||
@@ -751,6 +796,9 @@ class SnapshotWorker(Worker):
|
||||
hooks = discover_hooks('Snapshot', config=config)
|
||||
hooks = sorted(hooks, key=lambda h: h.name) # Sort by name (includes step prefix)
|
||||
|
||||
foreground_hooks: list[tuple[Path, ArchiveResult]] = []
|
||||
launched_background_hooks = False
|
||||
|
||||
# Execute each hook sequentially
|
||||
for hook_path in hooks:
|
||||
self.snapshot.refresh_from_db()
|
||||
@@ -787,6 +835,7 @@ class SnapshotWorker(Worker):
|
||||
process = self._run_hook(hook_path, ar, config)
|
||||
|
||||
if is_background:
|
||||
launched_background_hooks = True
|
||||
# Track but don't wait
|
||||
self.background_processes[hook_name] = process
|
||||
log_worker_event(
|
||||
@@ -798,6 +847,7 @@ class SnapshotWorker(Worker):
|
||||
else:
|
||||
# Wait for foreground hook to complete
|
||||
self._wait_for_hook(process, ar)
|
||||
foreground_hooks.append((hook_path, ar))
|
||||
log_worker_event(
|
||||
worker_type='SnapshotWorker',
|
||||
event=f'Completed hook: {hook_name}',
|
||||
@@ -810,6 +860,8 @@ class SnapshotWorker(Worker):
|
||||
|
||||
# All hooks launched (or completed) - terminate bg hooks and seal
|
||||
self._finalize_background_hooks()
|
||||
if launched_background_hooks:
|
||||
self._retry_failed_empty_foreground_hooks(foreground_hooks, config)
|
||||
if self.snapshot.status != Snapshot.StatusChoices.SEALED:
|
||||
# This triggers enter_sealed() which calls cleanup() and checks parent crawl sealing
|
||||
self.snapshot.sm.seal()
|
||||
@@ -857,6 +909,7 @@ class SnapshotWorker(Worker):
|
||||
parent=self.db_process,
|
||||
url=str(self.snapshot.url),
|
||||
snapshot_id=str(self.snapshot.id),
|
||||
_crawl_id=str(self.snapshot.crawl_id) if self.snapshot.crawl_id else None,
|
||||
)
|
||||
|
||||
# Link ArchiveResult to Process for tracking
|
||||
@@ -897,6 +950,50 @@ class SnapshotWorker(Worker):
|
||||
|
||||
ar.save(update_fields=['status', 'end_ts', 'modified_at'])
|
||||
|
||||
def _retry_failed_empty_foreground_hooks(self, hooks: list[tuple[Path, Any]], config: dict) -> None:
|
||||
"""
|
||||
Retry foreground hooks after background hooks finish.
|
||||
|
||||
Some parser-style hooks depend on files created by finite background
|
||||
hooks such as wget. They can legitimately fail with no outputs during
|
||||
the first pass, then succeed once the background fetcher materializes
|
||||
the source HTML on disk. We give those sibling outputs a short settle
|
||||
window before giving up.
|
||||
"""
|
||||
import time
|
||||
from archivebox.core.models import Snapshot
|
||||
|
||||
retry_delays = (0.0, 0.25, 0.5, 1.0)
|
||||
|
||||
for hook_path, ar in hooks:
|
||||
for attempt, delay in enumerate(retry_delays, start=1):
|
||||
self.snapshot.refresh_from_db()
|
||||
if self.snapshot.status == Snapshot.StatusChoices.SEALED:
|
||||
return
|
||||
if self._snapshot_exceeded_hard_timeout():
|
||||
self._seal_snapshot_due_to_timeout()
|
||||
return
|
||||
|
||||
ar.refresh_from_db()
|
||||
if ar.output_files:
|
||||
break
|
||||
|
||||
if delay:
|
||||
time.sleep(delay)
|
||||
|
||||
process = self._run_hook(hook_path, ar, config)
|
||||
self._wait_for_hook(process, ar)
|
||||
log_worker_event(
|
||||
worker_type='SnapshotWorker',
|
||||
event=f'Retried foreground hook after background finalize: {hook_path.name} (attempt {attempt}/{len(retry_delays)})',
|
||||
indent_level=2,
|
||||
pid=self.pid,
|
||||
)
|
||||
|
||||
ar.refresh_from_db()
|
||||
if ar.output_files:
|
||||
break
|
||||
|
||||
def _finalize_background_hooks(self) -> None:
|
||||
"""Gracefully terminate background hooks and update their ArchiveResults."""
|
||||
if getattr(self, '_background_hooks_finalized', False):
|
||||
|
||||
Reference in New Issue
Block a user