Files
ArchiveBox/archivebox/misc/process_utils.py
2025-12-28 03:39:59 -08:00

265 lines
7.5 KiB
Python

"""
Cross-platform process validation utilities using psutil.
Uses filesystem mtime as a "password" to validate PIDs haven't been reused.
Since filesystem mtimes can be set arbitrarily, but process start times cannot,
we can detect PID reuse by comparing:
- PID file mtime (set to process start time when we launched it)
- Actual process start time (from psutil)
If they match (within tolerance), it's our process.
If they don't match, the PID was reused by a different process.
"""
__package__ = 'archivebox.misc'
import os
import time
from pathlib import Path
from typing import Optional
try:
import psutil
except ImportError:
psutil = None
def get_process_info(pid: int) -> Optional[dict]:
"""
Get process information using psutil.
Args:
pid: Process ID
Returns:
Dict with 'start_time', 'cmdline', 'name', 'status' or None if not found
"""
if psutil is None:
return None
try:
proc = psutil.Process(pid)
return {
'start_time': proc.create_time(), # Unix epoch seconds
'cmdline': proc.cmdline(),
'name': proc.name(),
'status': proc.status(),
}
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return None
def validate_pid_file(
pid_file: Path,
cmd_file: Optional[Path] = None,
tolerance_seconds: float = 5.0
) -> bool:
"""
Validate PID file using mtime as "password".
Returns True only if ALL checks pass:
1. PID file exists and contains valid integer
2. Process with that PID exists
3. File mtime matches process start time (within tolerance)
4. If cmd_file provided, process cmdline contains expected args
Args:
pid_file: Path to .pid file
cmd_file: Optional path to cmd.sh for command validation
tolerance_seconds: Allowed difference between mtime and start time
Returns:
True if PID is validated, False if reused/invalid
"""
if psutil is None:
# Fallback: just check if process exists (no validation)
return _validate_pid_file_without_psutil(pid_file)
# Check PID file exists
if not pid_file.exists():
return False
# Read PID
try:
pid = int(pid_file.read_text().strip())
except (ValueError, OSError):
return False
# Get process info
proc_info = get_process_info(pid)
if proc_info is None:
return False # Process doesn't exist
# Check mtime matches process start time
try:
file_mtime = pid_file.stat().st_mtime
except OSError:
return False
proc_start_time = proc_info['start_time']
time_diff = abs(file_mtime - proc_start_time)
if time_diff > tolerance_seconds:
# PID was reused by different process
return False
# Validate command if provided
if cmd_file and cmd_file.exists():
try:
expected_cmd = cmd_file.read_text().strip()
actual_cmdline = ' '.join(proc_info['cmdline'])
# Check for key indicators (chrome, debug port, etc.)
# This is a heuristic - just checks if critical args are present
if '--remote-debugging-port' in expected_cmd:
if '--remote-debugging-port' not in actual_cmdline:
return False
if 'chrome' in expected_cmd.lower() or 'chromium' in expected_cmd.lower():
proc_name_lower = proc_info['name'].lower()
if 'chrome' not in proc_name_lower and 'chromium' not in proc_name_lower:
return False
except OSError:
pass # Can't validate command, but other checks passed
return True
def _validate_pid_file_without_psutil(pid_file: Path) -> bool:
"""
Fallback validation when psutil not available.
Only checks if process exists, no validation.
"""
if not pid_file.exists():
return False
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, 0) # Signal 0 = check existence
return True
except (OSError, ValueError, ProcessLookupError):
return False
def write_pid_file_with_mtime(pid_file: Path, pid: int, start_time: float):
"""
Write PID file and set mtime to process start time.
This creates a "password" that can be validated later to ensure
the PID hasn't been reused by a different process.
Args:
pid_file: Path to .pid file to create
pid: Process ID to write
start_time: Process start time as Unix epoch seconds
"""
pid_file.write_text(str(pid))
# Set both atime and mtime to process start time
try:
os.utime(pid_file, (start_time, start_time))
except OSError:
# If we can't set mtime, file is still written
# Validation will be less reliable but won't break
pass
def write_cmd_file(cmd_file: Path, cmd: list[str]):
"""
Write command script for validation.
Args:
cmd_file: Path to cmd.sh to create
cmd: Command list (e.g., ['chrome', '--remote-debugging-port=9222', ...])
"""
# Shell escape arguments with spaces or special chars
def shell_escape(arg: str) -> str:
if ' ' in arg or '"' in arg or "'" in arg or '$' in arg:
# Escape double quotes and wrap in double quotes
return f'"{arg.replace(chr(34), chr(92) + chr(34))}"'
return arg
escaped_cmd = [shell_escape(arg) for arg in cmd]
script = '#!/bin/bash\n' + ' '.join(escaped_cmd) + '\n'
cmd_file.write_text(script)
try:
cmd_file.chmod(0o755)
except OSError:
pass # Best effort
def safe_kill_process(
pid_file: Path,
cmd_file: Optional[Path] = None,
signal_num: int = 15, # SIGTERM
validate: bool = True
) -> bool:
"""
Safely kill a process with validation.
Args:
pid_file: Path to .pid file
cmd_file: Optional path to cmd.sh for validation
signal_num: Signal to send (default SIGTERM=15)
validate: If True, validate process identity before killing
Returns:
True if process was killed, False if not found or validation failed
"""
if not pid_file.exists():
return False
# Validate process identity first
if validate:
if not validate_pid_file(pid_file, cmd_file):
# PID reused by different process, don't kill
# Clean up stale PID file
try:
pid_file.unlink()
except OSError:
pass
return False
# Read PID and kill
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, signal_num)
return True
except (OSError, ValueError, ProcessLookupError):
return False
def cleanup_stale_pid_files(directory: Path, cmd_file_name: str = 'cmd.sh') -> int:
"""
Remove stale PID files from directory.
A PID file is stale if:
- Process no longer exists, OR
- Process exists but validation fails (PID reused)
Args:
directory: Directory to scan for *.pid files
cmd_file_name: Name of command file for validation (default: cmd.sh)
Returns:
Number of stale PID files removed
"""
if not directory.exists():
return 0
removed = 0
for pid_file in directory.glob('**/*.pid'):
cmd_file = pid_file.parent / cmd_file_name
# Check if valid
if not validate_pid_file(pid_file, cmd_file):
try:
pid_file.unlink()
removed += 1
except OSError:
pass
return removed