Change hook timeout enforcement strategy (#1739)

<!-- IMPORTANT: Do not submit PRs with only formatting / PEP8 / line
length changes. -->

# Summary

<!--e.g. This PR fixes ABC or adds the ability to do XYZ...-->

# Related issues

<!-- e.g. #123 or Roadmap goal #
https://github.com/pirate/ArchiveBox/wiki/Roadmap -->

# Changes these areas

- [ ] Bugfixes
- [ ] Feature behavior
- [ ] Command line interface
- [ ] Configuration options
- [ ] Internal architecture
- [ ] Snapshot data layout on disk


<!-- This is an auto-generated description by cubic. -->
---
## Summary by cubic
Switch background hook cleanup to a graceful termination flow using
plugin-specific timeouts, only SIGKILLing if needed. This improves
reliability and records accurate exit codes and stderr for better result
reporting.

- **Refactors**
- Added graceful_terminate_background_hooks(): send SIGTERM to all
hooks, wait per plugin timeout, SIGKILL remaining, reap with waitpid,
write .returncode files.
- Snapshot.cleanup() now uses merged config (get_config) to apply
plugin-specific timeouts and terminate hooks gracefully.
- update_from_output() reads .returncode and .stderr.log, infers status
when no JSONL (handles signals like -9/-15), includes stderr on
failures, and cleans up .returncode files.

<sup>Written for commit 524e8e98c3.
Summary will update on new commits.</sup>

<!-- End of auto-generated description by cubic. -->
This commit is contained in:
Nick Sweeting
2025-12-31 01:32:43 -08:00
committed by GitHub
2 changed files with 228 additions and 11 deletions

View File

@@ -1407,17 +1407,22 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
Clean up background ArchiveResult hooks.
Called by the state machine when entering the 'sealed' state.
Kills any background hooks and finalizes their ArchiveResults.
Gracefully terminates background hooks using plugin-specific timeouts:
1. Send SIGTERM to all background hook processes
2. Wait up to each hook's plugin-specific timeout
3. Send SIGKILL to any hooks still running after timeout
"""
from archivebox.hooks import kill_process
from archivebox.hooks import graceful_terminate_background_hooks
from archivebox.config.configset import get_config
# Kill any background ArchiveResult hooks
if not self.OUTPUT_DIR.exists():
return
# Find all .pid files in this snapshot's output directory
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
kill_process(pid_file, validate=True)
# Get merged config for plugin-specific timeout lookup
config = get_config(crawl=self.crawl, snapshot=self)
# Gracefully terminate all background hooks with plugin-specific timeouts
graceful_terminate_background_hooks(self.OUTPUT_DIR, config)
# Update all STARTED ArchiveResults from filesystem
results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED)
@@ -2706,7 +2711,20 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
# Read and parse JSONL output from hook-specific stdout log
stdout_file = plugin_dir / f'{hook_basename}.stdout.log'
stderr_file = plugin_dir / f'{hook_basename}.stderr.log'
returncode_file = plugin_dir / f'{hook_basename}.returncode'
stdout = stdout_file.read_text() if stdout_file.exists() else ''
stderr = stderr_file.read_text() if stderr_file.exists() else ''
# Read returncode from file (written by graceful_terminate_background_hooks)
returncode = None
if returncode_file.exists():
try:
rc_text = returncode_file.read_text().strip()
returncode = int(rc_text) if rc_text else None
except (ValueError, OSError):
pass
records = []
for line in stdout.splitlines():
@@ -2741,9 +2759,30 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
self._set_binary_from_cmd(hook_data['cmd'])
# Note: cmd_version is derived from binary.version, not stored on Process
else:
# No ArchiveResult record = failed
self.status = self.StatusChoices.FAILED
self.output_str = 'Hook did not output ArchiveResult record'
# No ArchiveResult JSONL record - determine status from returncode
if returncode is not None:
if returncode == 0:
self.status = self.StatusChoices.SUCCEEDED
self.output_str = 'Hook completed successfully (no JSONL output)'
elif returncode < 0:
# Negative = killed by signal (e.g., -9 for SIGKILL, -15 for SIGTERM)
sig_num = abs(returncode)
sig_name = {9: 'SIGKILL', 15: 'SIGTERM'}.get(sig_num, f'signal {sig_num}')
self.status = self.StatusChoices.FAILED
self.output_str = f'Hook killed by {sig_name}'
if stderr:
self.output_str += f'\n\nstderr:\n{stderr[:2000]}'
else:
self.status = self.StatusChoices.FAILED
self.output_str = f'Hook failed with exit code {returncode}'
if stderr:
self.output_str += f'\n\nstderr:\n{stderr[:2000]}'
else:
# No returncode file and no JSONL = failed
self.status = self.StatusChoices.FAILED
self.output_str = 'Hook did not output ArchiveResult record'
if stderr:
self.output_str += f'\n\nstderr:\n{stderr[:2000]}'
# Walk filesystem and populate output_files, output_size, output_mimetypes
# Exclude hook output files (hook-specific names like on_Snapshot__50_wget.stdout.log)
@@ -2753,6 +2792,7 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
name.endswith('.stdout.log') or
name.endswith('.stderr.log') or
name.endswith('.pid') or
name.endswith('.returncode') or
(name.endswith('.sh') and name.startswith('on_'))
)
@@ -2821,10 +2861,10 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
}
process_hook_records(filtered_records, overrides=overrides)
# Cleanup PID files and empty logs (hook-specific names)
# Cleanup PID files, returncode files, and empty logs (hook-specific names)
pid_file = plugin_dir / f'{hook_basename}.pid'
pid_file.unlink(missing_ok=True)
stderr_file = plugin_dir / f'{hook_basename}.stderr.log'
returncode_file.unlink(missing_ok=True)
if stdout_file.exists() and stdout_file.stat().st_size == 0:
stdout_file.unlink()
if stderr_file.exists() and stderr_file.stat().st_size == 0:

View File

@@ -1266,3 +1266,180 @@ def kill_process(pid_file: Path, sig: int = signal.SIGTERM, validate: bool = Tru
pass
def graceful_terminate_background_hooks(
output_dir: Path,
config: Dict[str, Any],
poll_interval: float = 0.5,
) -> Dict[str, Dict[str, Any]]:
"""
Gracefully terminate all background hooks in an output directory.
Termination strategy:
1. Send SIGTERM to all background hook processes (polite shutdown request)
2. For each hook, wait up to its plugin-specific timeout
3. Send SIGKILL to any hooks still running after their timeout expires
4. Reap each process with waitpid() to get exit code
5. Write returncode to .returncode file for update_from_output()
Args:
output_dir: Snapshot output directory containing plugin subdirs with .pid files
config: Merged config dict from get_config() for timeout lookup
poll_interval: Seconds between process liveness checks (default: 0.5s)
Returns:
Dict mapping hook names to result info:
{
'hook_name': {
'status': 'sigterm' | 'sigkill' | 'already_dead' | 'invalid',
'returncode': int or None,
'pid': int or None,
}
}
Example:
from archivebox.config.configset import get_config
config = get_config(crawl=my_crawl, snapshot=my_snapshot)
results = graceful_terminate_background_hooks(snapshot.OUTPUT_DIR, config)
# {'on_Snapshot__20_chrome_tab.bg': {'status': 'sigterm', 'returncode': 0, 'pid': 12345}}
"""
from archivebox.misc.process_utils import validate_pid_file
if not output_dir.exists():
return {}
results = {}
# Collect all pid files and their metadata
pid_files = list(output_dir.glob('**/*.pid'))
if not pid_files:
return {}
# Phase 1: Send SIGTERM to all background hook processes
active_hooks = [] # List of (pid_file, hook_name, plugin_name, timeout, pid)
for pid_file in pid_files:
hook_name = pid_file.stem # e.g., "on_Snapshot__20_chrome_tab.bg"
cmd_file = pid_file.with_suffix('.sh')
# Validate and get PID
if not validate_pid_file(pid_file, cmd_file):
results[hook_name] = {'status': 'invalid', 'returncode': None, 'pid': None}
pid_file.unlink(missing_ok=True)
continue
try:
pid = int(pid_file.read_text().strip())
except (ValueError, OSError):
results[hook_name] = {'status': 'invalid', 'returncode': None, 'pid': None}
pid_file.unlink(missing_ok=True)
continue
# Check if process is still alive
if not process_is_alive(pid_file):
# Process already dead - try to reap it and get exit code
returncode = _reap_process(pid)
results[hook_name] = {'status': 'already_dead', 'returncode': returncode, 'pid': pid}
_write_returncode_file(pid_file, returncode)
pid_file.unlink(missing_ok=True)
continue
# Get plugin name from parent directory (e.g., "chrome_session")
plugin_name = pid_file.parent.name
# Get plugin-specific timeout
plugin_config = get_plugin_special_config(plugin_name, config)
timeout = plugin_config['timeout']
# Send SIGTERM
try:
os.kill(pid, signal.SIGTERM)
except (OSError, ProcessLookupError):
returncode = _reap_process(pid)
results[hook_name] = {'status': 'already_dead', 'returncode': returncode, 'pid': pid}
_write_returncode_file(pid_file, returncode)
pid_file.unlink(missing_ok=True)
continue
active_hooks.append((pid_file, hook_name, plugin_name, timeout, pid))
# Phase 2: Wait for each hook's timeout, then SIGKILL if still running
for pid_file, hook_name, plugin_name, timeout, pid in active_hooks:
deadline = time.time() + timeout
exited_cleanly = False
# Poll until deadline or process exits
while time.time() < deadline:
if not process_is_alive(pid_file):
exited_cleanly = True
break
time.sleep(poll_interval)
if exited_cleanly:
# Process exited from SIGTERM - reap it to get exit code
returncode = _reap_process(pid)
results[hook_name] = {'status': 'sigterm', 'returncode': returncode, 'pid': pid}
else:
# Timeout expired, send SIGKILL
try:
os.kill(pid, signal.SIGKILL)
except (OSError, ProcessLookupError):
pass # Process died between check and kill
# Wait briefly for SIGKILL to take effect, then reap
time.sleep(0.1)
returncode = _reap_process(pid)
# returncode from SIGKILL is typically -9 (negative signal number)
results[hook_name] = {'status': 'sigkill', 'returncode': returncode, 'pid': pid}
# Write returncode file for update_from_output() to read
_write_returncode_file(pid_file, results[hook_name]['returncode'])
pid_file.unlink(missing_ok=True)
return results
def _reap_process(pid: int) -> Optional[int]:
"""
Reap a terminated process and return its exit code.
Uses os.waitpid() with WNOHANG to avoid blocking.
Returns None if process cannot be reaped (not a child, already reaped, etc).
"""
try:
# WNOHANG: return immediately if process hasn't exited
# We call this after we know process is dead, so it should return immediately
wpid, status = os.waitpid(pid, os.WNOHANG)
if wpid == 0:
# Process still running (shouldn't happen since we checked)
return None
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
# Killed by signal - return negative signal number (convention)
return -os.WTERMSIG(status)
return None
except ChildProcessError:
# Not our child process (was started by subprocess.Popen which already reaped it,
# or process was started by different parent). This is expected for hooks.
return None
except OSError:
return None
def _write_returncode_file(pid_file: Path, returncode: Optional[int]) -> None:
"""
Write returncode to a .returncode file next to the .pid file.
This allows update_from_output() to know the exit code even for background hooks.
"""
returncode_file = pid_file.with_suffix('.returncode')
try:
if returncode is not None:
returncode_file.write_text(str(returncode))
else:
# Unknown exit code - write empty file to indicate process was terminated
returncode_file.write_text('')
except OSError:
pass # Best effort