From b822352fc3aa571079edac71a160b20151f07eea Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 31 Dec 2025 10:15:22 +0000 Subject: [PATCH] Delete pid_utils.py and migrate to Process model DELETED: - workers/pid_utils.py (-192 lines) - replaced by Process model methods SIMPLIFIED: - crawls/models.py Crawl.cleanup() (80 lines -> 10 lines) - hooks.py: deleted process_is_alive() and kill_process() (-45 lines) UPDATED to use Process model: - core/models.py: Snapshot.cleanup() and has_running_background_hooks() - machine/models.py: Binary.cleanup() - workers/worker.py: Worker.on_startup/shutdown, get_running_workers, start - workers/orchestrator.py: Orchestrator.on_startup/shutdown, is_running All subprocess management now uses: - Process.current() for registering current process - Process.get_running() / get_running_count() for querying - Process.cleanup_stale_running() for cleanup - safe_kill_process() for validated PID killing Total line reduction: ~250 lines --- archivebox/core/models.py | 10 +- archivebox/crawls/models.py | 75 +---------- archivebox/hooks.py | 49 -------- archivebox/machine/models.py | 6 +- archivebox/workers/orchestrator.py | 44 ++++--- archivebox/workers/pid_utils.py | 191 ----------------------------- archivebox/workers/worker.py | 47 ++++--- 7 files changed, 63 insertions(+), 359 deletions(-) delete mode 100644 archivebox/workers/pid_utils.py diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 883733c5..f7b45ba9 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -1385,7 +1385,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea Called by the state machine when entering the 'sealed' state. Kills any background hooks and finalizes their ArchiveResults. """ - from archivebox.hooks import kill_process + from archivebox.misc.process_utils import safe_kill_process # Kill any background ArchiveResult hooks if not self.OUTPUT_DIR.exists(): @@ -1393,7 +1393,8 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea # Find all .pid files in this snapshot's output directory for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): - kill_process(pid_file, validate=True) + cmd_file = pid_file.parent / 'cmd.sh' + safe_kill_process(pid_file, cmd_file) # Update all STARTED ArchiveResults from filesystem results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED) @@ -1406,7 +1407,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea Used by state machine to determine if snapshot is finished. """ - from archivebox.hooks import process_is_alive + from archivebox.misc.process_utils import validate_pid_file if not self.OUTPUT_DIR.exists(): return False @@ -1415,7 +1416,8 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea if not plugin_dir.is_dir(): continue pid_file = plugin_dir / 'hook.pid' - if process_is_alive(pid_file): + cmd_file = plugin_dir / 'cmd.sh' + if validate_pid_file(pid_file, cmd_file): return True return False diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 3e1a53f9..abf21175 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -417,84 +417,15 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith def cleanup(self): """Clean up background hooks and run on_CrawlEnd hooks.""" - import os - import signal - import time - from pathlib import Path from archivebox.hooks import run_hook, discover_hooks - from archivebox.misc.process_utils import validate_pid_file - - def is_process_alive(pid): - """Check if a process exists.""" - try: - os.kill(pid, 0) # Signal 0 checks existence without killing - return True - except (OSError, ProcessLookupError): - return False + from archivebox.misc.process_utils import safe_kill_process # Kill any background processes by scanning for all .pid files if self.OUTPUT_DIR.exists(): for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): - # Validate PID before killing to avoid killing unrelated processes cmd_file = pid_file.parent / 'cmd.sh' - if not validate_pid_file(pid_file, cmd_file): - # PID reused by different process or process dead - pid_file.unlink(missing_ok=True) - continue - - try: - pid = int(pid_file.read_text().strip()) - - # Step 1: Send SIGTERM for graceful shutdown - try: - # Try to kill process group first (handles detached processes like Chrome) - try: - os.killpg(pid, signal.SIGTERM) - except (OSError, ProcessLookupError): - # Fall back to killing just the process - os.kill(pid, signal.SIGTERM) - except ProcessLookupError: - # Already dead - pid_file.unlink(missing_ok=True) - continue - - # Step 2: Wait for graceful shutdown - time.sleep(2) - - # Step 3: Check if still alive - if not is_process_alive(pid): - # Process terminated gracefully - pid_file.unlink(missing_ok=True) - continue - - # Step 4: Process still alive, force kill ENTIRE process group with SIGKILL - try: - try: - # Always kill entire process group with SIGKILL (not individual processes) - os.killpg(pid, signal.SIGKILL) - except (OSError, ProcessLookupError) as e: - # Process group kill failed, try single process as fallback - os.kill(pid, signal.SIGKILL) - except ProcessLookupError: - # Process died between check and kill - pid_file.unlink(missing_ok=True) - continue - - # Step 5: Wait and verify death - time.sleep(1) - - if is_process_alive(pid): - # Process is unkillable (likely in UNE state on macOS) - # This happens when Chrome crashes in kernel syscall (IOSurface) - # Log but don't block cleanup - process will remain until reboot - print(f'[yellow]⚠️ Process {pid} is unkillable (likely crashed in kernel). Will remain until reboot.[/yellow]') - else: - # Successfully killed - pid_file.unlink(missing_ok=True) - - except (ValueError, OSError) as e: - # Invalid PID file or permission error - pass + safe_kill_process(pid_file, cmd_file) + pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks from archivebox.config.configset import get_config diff --git a/archivebox/hooks.py b/archivebox/hooks.py index 6485f2c0..73febfa0 100644 --- a/archivebox/hooks.py +++ b/archivebox/hooks.py @@ -1233,52 +1233,3 @@ def process_hook_records(records: List[Dict[str, Any]], overrides: Dict[str, Any continue return stats - - -def process_is_alive(pid_file: Path) -> bool: - """ - Check if process in PID file is still running. - - Args: - pid_file: Path to hook.pid file - - Returns: - True if process is alive, False otherwise - """ - if not pid_file.exists(): - return False - - try: - pid = int(pid_file.read_text().strip()) - os.kill(pid, 0) # Signal 0 = check if process exists without killing it - return True - except (OSError, ValueError): - return False - - -def kill_process(pid_file: Path, sig: int = signal.SIGTERM, validate: bool = True): - """ - Kill process in PID file with optional validation. - - Args: - pid_file: Path to hook.pid file - sig: Signal to send (default SIGTERM) - validate: If True, validate process identity before killing (default: True) - """ - from archivebox.misc.process_utils import safe_kill_process - - if validate: - # Use safe kill with validation - cmd_file = pid_file.parent / 'cmd.sh' - safe_kill_process(pid_file, cmd_file, signal_num=sig) - else: - # Legacy behavior - kill without validation - if not pid_file.exists(): - return - try: - pid = int(pid_file.read_text().strip()) - os.kill(pid, sig) - except (OSError, ValueError): - pass - - diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index c19f320f..4bac79d6 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -449,7 +449,7 @@ class Binary(ModelWithHealthStats): since installations are foreground, but included for consistency). """ from pathlib import Path - from archivebox.hooks import kill_process + from archivebox.misc.process_utils import safe_kill_process output_dir = self.OUTPUT_DIR if not output_dir.exists(): @@ -460,8 +460,8 @@ class Binary(ModelWithHealthStats): if not plugin_dir.is_dir(): continue pid_file = plugin_dir / 'hook.pid' - if pid_file.exists(): - kill_process(pid_file) + cmd_file = plugin_dir / 'cmd.sh' + safe_kill_process(pid_file, cmd_file) # ============================================================================= diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index 1b1789cb..370adf85 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -30,7 +30,7 @@ __package__ = 'archivebox.workers' import os import time from typing import Type -from multiprocessing import Process +from multiprocessing import Process as MPProcess from django.utils import timezone @@ -38,12 +38,6 @@ from rich import print from archivebox.misc.logging_util import log_worker_event from .worker import Worker, CrawlWorker, SnapshotWorker, ArchiveResultWorker -from .pid_utils import ( - write_pid_file, - remove_pid_file, - get_all_worker_pids, - cleanup_stale_pid_files, -) def _run_orchestrator_process(exit_on_idle: bool) -> None: @@ -85,16 +79,20 @@ class Orchestrator: @classmethod def is_running(cls) -> bool: """Check if an orchestrator is already running.""" - workers = get_all_worker_pids('orchestrator') - return len(workers) > 0 - + from archivebox.machine.models import Process + + return Process.get_running_count(process_type='orchestrator') > 0 + def on_startup(self) -> None: """Called when orchestrator starts.""" - self.pid = os.getpid() - self.pid_file = write_pid_file('orchestrator', worker_id=0) + from archivebox.machine.models import Process - # Clean up any stale PID files from previous runs - stale_count = cleanup_stale_pid_files() + self.pid = os.getpid() + # Register orchestrator process in database + self.db_process = Process.current() + + # Clean up any stale Process records from previous runs + stale_count = Process.cleanup_stale_running() # Collect startup metadata metadata = { @@ -112,11 +110,15 @@ class Orchestrator: pid=self.pid, metadata=metadata, ) - + def on_shutdown(self, error: BaseException | None = None) -> None: """Called when orchestrator shuts down.""" - if self.pid_file: - remove_pid_file(self.pid_file) + # Update Process record status + if hasattr(self, 'db_process') and self.db_process: + self.db_process.exit_code = 1 if error else 0 + self.db_process.status = self.db_process.StatusChoices.EXITED + self.db_process.ended_at = timezone.now() + self.db_process.save() log_worker_event( worker_type='Orchestrator', @@ -125,10 +127,12 @@ class Orchestrator: pid=self.pid, error=error if error and not isinstance(error, KeyboardInterrupt) else None, ) - + def get_total_worker_count(self) -> int: """Get total count of running workers across all types.""" - cleanup_stale_pid_files() + from archivebox.machine.models import Process + + Process.cleanup_stale_running() return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES) def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool: @@ -287,7 +291,7 @@ class Orchestrator: Returns the PID of the new process. """ # Use module-level function to avoid pickle errors with local functions - proc = Process( + proc = MPProcess( target=_run_orchestrator_process, args=(self.exit_on_idle,), name='orchestrator' diff --git a/archivebox/workers/pid_utils.py b/archivebox/workers/pid_utils.py deleted file mode 100644 index 020fce70..00000000 --- a/archivebox/workers/pid_utils.py +++ /dev/null @@ -1,191 +0,0 @@ -""" -PID file utilities for tracking worker and orchestrator processes. - -PID files are stored in data/tmp/workers/ and contain: -- Line 1: PID -- Line 2: Worker type (orchestrator, crawl, snapshot, archiveresult) -- Line 3: Extractor filter (optional, for archiveresult workers) -- Line 4: Started at ISO timestamp -""" - -__package__ = 'archivebox.workers' - -import os -import signal -from pathlib import Path -from datetime import datetime, timezone - -from django.conf import settings - - -def get_pid_dir() -> Path: - """Get the directory for PID files, creating it if needed.""" - pid_dir = Path(settings.DATA_DIR) / 'tmp' / 'workers' - pid_dir.mkdir(parents=True, exist_ok=True) - return pid_dir - - -def write_pid_file(worker_type: str, worker_id: int = 0, extractor: str | None = None) -> Path: - """ - Write a PID file for the current process. - Returns the path to the PID file. - """ - pid_dir = get_pid_dir() - - if worker_type == 'orchestrator': - pid_file = pid_dir / 'orchestrator.pid' - else: - pid_file = pid_dir / f'{worker_type}_worker_{worker_id}.pid' - - content = f"{os.getpid()}\n{worker_type}\n{extractor or ''}\n{datetime.now(timezone.utc).isoformat()}\n" - pid_file.write_text(content) - - return pid_file - - -def read_pid_file(path: Path) -> dict | None: - """ - Read and parse a PID file. - Returns dict with pid, worker_type, extractor, started_at or None if invalid. - """ - try: - if not path.exists(): - return None - - lines = path.read_text().strip().split('\n') - if len(lines) < 4: - return None - - return { - 'pid': int(lines[0]), - 'worker_type': lines[1], - 'extractor': lines[2] or None, - 'started_at': datetime.fromisoformat(lines[3]), - 'pid_file': path, - } - except (ValueError, IndexError, OSError): - return None - - -def remove_pid_file(path: Path) -> None: - """Remove a PID file if it exists.""" - try: - path.unlink(missing_ok=True) - except OSError: - pass - - -def is_process_alive(pid: int) -> bool: - """Check if a process with the given PID is still running.""" - try: - os.kill(pid, 0) # Signal 0 doesn't kill, just checks - return True - except (OSError, ProcessLookupError): - return False - - -def get_all_pid_files() -> list[Path]: - """Get all PID files in the workers directory.""" - pid_dir = get_pid_dir() - return list(pid_dir.glob('*.pid')) - - -def get_all_worker_pids(worker_type: str | None = None) -> list[dict]: - """ - Get info about all running workers. - Optionally filter by worker_type. - """ - workers = [] - - for pid_file in get_all_pid_files(): - info = read_pid_file(pid_file) - if info is None: - continue - - # Skip if process is dead - if not is_process_alive(info['pid']): - continue - - # Filter by type if specified - if worker_type and info['worker_type'] != worker_type: - continue - - workers.append(info) - - return workers - - -def cleanup_stale_pid_files() -> int: - """ - Remove PID files for processes that are no longer running. - Returns the number of stale files removed. - """ - removed = 0 - - for pid_file in get_all_pid_files(): - info = read_pid_file(pid_file) - if info is None: - # Invalid PID file, remove it - remove_pid_file(pid_file) - removed += 1 - continue - - if not is_process_alive(info['pid']): - remove_pid_file(pid_file) - removed += 1 - - return removed - - -def get_running_worker_count(worker_type: str) -> int: - """Get the count of running workers of a specific type.""" - return len(get_all_worker_pids(worker_type)) - - -def get_next_worker_id(worker_type: str) -> int: - """Get the next available worker ID for a given type.""" - existing_ids = set() - - for pid_file in get_all_pid_files(): - # Parse worker ID from filename like "snapshot_worker_3.pid" - name = pid_file.stem - if name.startswith(f'{worker_type}_worker_'): - try: - worker_id = int(name.split('_')[-1]) - existing_ids.add(worker_id) - except ValueError: - continue - - # Find the lowest unused ID - next_id = 0 - while next_id in existing_ids: - next_id += 1 - - return next_id - - -def stop_worker(pid: int, graceful: bool = True) -> bool: - """ - Stop a worker process. - If graceful=True, sends SIGTERM first, then SIGKILL after timeout. - Returns True if process was stopped. - """ - if not is_process_alive(pid): - return True - - try: - if graceful: - os.kill(pid, signal.SIGTERM) - # Give it a moment to shut down - import time - for _ in range(10): # Wait up to 1 second - time.sleep(0.1) - if not is_process_alive(pid): - return True - # Force kill if still running - os.kill(pid, signal.SIGKILL) - else: - os.kill(pid, signal.SIGKILL) - return True - except (OSError, ProcessLookupError): - return True # Process already dead diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index 404ad0a3..a8a7851e 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -17,7 +17,7 @@ import traceback from typing import ClassVar, Any from datetime import timedelta from pathlib import Path -from multiprocessing import Process, cpu_count +from multiprocessing import Process as MPProcess, cpu_count from django.db.models import QuerySet from django.utils import timezone @@ -26,13 +26,6 @@ from django.conf import settings from rich import print from archivebox.misc.logging_util import log_worker_event -from .pid_utils import ( - write_pid_file, - remove_pid_file, - get_all_worker_pids, - get_next_worker_id, - cleanup_stale_pid_files, -) CPU_COUNT = cpu_count() @@ -133,8 +126,11 @@ class Worker: def on_startup(self) -> None: """Called when worker starts.""" + from archivebox.machine.models import Process + self.pid = os.getpid() - self.pid_file = write_pid_file(self.name, self.worker_id) + # Register this worker process in the database + self.db_process = Process.current() # Determine worker type for logging worker_type_name = self.__class__.__name__ @@ -160,9 +156,12 @@ class Worker: def on_shutdown(self, error: BaseException | None = None) -> None: """Called when worker shuts down.""" - # Remove PID file - if self.pid_file: - remove_pid_file(self.pid_file) + # Update Process record status + if hasattr(self, 'db_process') and self.db_process: + self.db_process.exit_code = 1 if error else 0 + self.db_process.status = self.db_process.StatusChoices.EXITED + self.db_process.ended_at = timezone.now() + self.db_process.save() # Determine worker type for logging worker_type_name = self.__class__.__name__ @@ -288,11 +287,13 @@ class Worker: Fork a new worker as a subprocess. Returns the PID of the new process. """ + from archivebox.machine.models import Process + if worker_id is None: - worker_id = get_next_worker_id(cls.name) + worker_id = Process.get_next_worker_id(process_type=cls.name) # Use module-level function for pickling compatibility - proc = Process( + proc = MPProcess( target=_run_worker, args=(cls.name, worker_id, daemon), kwargs=kwargs, @@ -304,15 +305,19 @@ class Worker: return proc.pid @classmethod - def get_running_workers(cls) -> list[dict]: + def get_running_workers(cls) -> list: """Get info about all running workers of this type.""" - cleanup_stale_pid_files() - return get_all_worker_pids(cls.name) + from archivebox.machine.models import Process + + Process.cleanup_stale_running() + return list(Process.get_running(process_type=cls.name)) @classmethod def get_worker_count(cls) -> int: """Get count of running workers of this type.""" - return len(cls.get_running_workers()) + from archivebox.machine.models import Process + + return Process.get_running_count(process_type=cls.name) class CrawlWorker(Worker): @@ -402,11 +407,13 @@ class ArchiveResultWorker(Worker): @classmethod def start(cls, worker_id: int | None = None, daemon: bool = False, plugin: str | None = None, **kwargs: Any) -> int: """Fork a new worker as subprocess with optional plugin filter.""" + from archivebox.machine.models import Process + if worker_id is None: - worker_id = get_next_worker_id(cls.name) + worker_id = Process.get_next_worker_id(process_type=cls.name) # Use module-level function for pickling compatibility - proc = Process( + proc = MPProcess( target=_run_worker, args=(cls.name, worker_id, daemon), kwargs={'plugin': plugin, **kwargs},