From 2d3a2fec579796a320c8278b509ee24916c7e8f6 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 31 Dec 2025 10:08:45 +0000 Subject: [PATCH 1/4] Add terminate, kill_tree, and query methods to Process model MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This consolidates scattered subprocess management logic into the Process model: - terminate(): Graceful SIGTERM → wait → SIGKILL (replaces stop_worker, etc.) - kill_tree(): Kill process and all OS children (replaces os.killpg logic) - kill_children_db(): Kill DB-tracked child processes - get_running(): Query running processes by type (replaces get_all_worker_pids) - get_running_count(): Count running processes (replaces get_running_worker_count) - stop_all(): Stop all processes of a type - get_next_worker_id(): Get next worker ID for spawning Added Phase 8 to TODO documenting ~390 lines that can be deleted after consolidation, including workers/pid_utils.py which becomes obsolete. Also includes migration 0002 for parent FK and process_type fields. --- TODO_process_tracking.md | 221 +++++ .../0002_process_parent_and_type.py | 96 ++ archivebox/machine/models.py | 862 +++++++++++++++++- 3 files changed, 1178 insertions(+), 1 deletion(-) create mode 100644 archivebox/machine/migrations/0002_process_parent_and_type.py diff --git a/TODO_process_tracking.md b/TODO_process_tracking.md index 18a4cc4d..4ecf55a7 100644 --- a/TODO_process_tracking.md +++ b/TODO_process_tracking.md @@ -1702,6 +1702,227 @@ class ProcessAdmin(admin.ModelAdmin): --- +## Phase 8: Code Consolidation (Delete Redundant Logic) + +The goal is to consolidate all subprocess management into `Process` model methods, eliminating duplicate logic scattered across the codebase. + +### 8.1 Files to Simplify/Delete + +| File | Current Lines | After Consolidation | Savings | +|------|--------------|---------------------|---------| +| `workers/pid_utils.py` | ~192 lines | DELETE entirely | -192 | +| `misc/process_utils.py` | ~85 lines | Keep as low-level utils | 0 | +| `hooks.py` (run_hook) | ~100 lines | -50 lines (use Process.launch) | -50 | +| `hooks.py` (kill/alive) | ~50 lines | DELETE (use Process.kill/is_running) | -50 | +| `crawls/models.py` (cleanup) | ~100 lines | -70 lines (use Process.kill) | -70 | +| `supervisord_util.py` | ~50 lines process mgmt | -30 lines | -30 | +| **TOTAL** | | | **~-390 lines** | + +### 8.2 Detailed Consolidation Map + +#### `workers/pid_utils.py` → DELETE ENTIRELY + +| Current Function | Replacement | +|------------------|-------------| +| `write_pid_file(worker_type, worker_id)` | `Process.current()` auto-creates | +| `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` | +| `remove_pid_file(path)` | Automatic on `Process.status = EXITED` | +| `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` | +| `get_all_pid_files()` | `Process.objects.filter(status='running')` | +| `get_all_worker_pids(type)` | `Process.objects.filter(process_type=type, status='running')` | +| `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` | +| `get_running_worker_count(type)` | `Process.objects.filter(...).count()` | +| `get_next_worker_id(type)` | Derive from `Process.objects.filter(...).count()` | +| `stop_worker(pid, graceful)` | `Process.kill(signal_num=SIGTERM)` then `Process.kill(SIGKILL)` | + +#### `hooks.py` Changes + +**Current `run_hook()` lines 374-398:** +```python +# DELETE these lines - replaced by Process.launch() +stdout_file = output_dir / 'stdout.log' +stderr_file = output_dir / 'stderr.log' +pid_file = output_dir / 'hook.pid' +cmd_file = output_dir / 'cmd.sh' +write_cmd_file(cmd_file, cmd) +with open(stdout_file, 'w') as out, open(stderr_file, 'w') as err: + process = subprocess.Popen(cmd, ...) + write_pid_file_with_mtime(pid_file, process.pid, time.time()) +``` + +**New `run_hook()` using Process:** +```python +hook_process = Process.objects.create( + parent=parent_process, + process_type=Process.TypeChoices.HOOK, + cmd=cmd, pwd=str(output_dir), env=env, timeout=timeout, +) +hook_process.launch(background=is_background) +# stdout/stderr/pid_file all handled internally by Process.launch() +``` + +**DELETE these functions entirely:** +```python +def process_is_alive(pid_file: Path) -> bool: # lines 1238-1256 +def kill_process(pid_file: Path, sig, validate): # lines 1259-1282 +``` + +**Replace with:** +```python +# Use Process methods directly: +process.is_running # replaces process_is_alive() +process.kill() # replaces kill_process() +``` + +#### `crawls/models.py` Changes + +**Current `Crawl.cleanup()` lines 418-493:** +```python +# DELETE all this inline process logic: +def is_process_alive(pid): + try: + os.kill(pid, 0) + return True + except (OSError, ProcessLookupError): + return False + +for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): + if not validate_pid_file(pid_file, cmd_file): + pid_file.unlink(missing_ok=True) + continue + pid = int(pid_file.read_text().strip()) + os.killpg(pid, signal.SIGTERM) + time.sleep(2) + if not is_process_alive(pid): + pid_file.unlink(missing_ok=True) + continue + os.killpg(pid, signal.SIGKILL) + # ... more cleanup logic +``` + +**New `Crawl.cleanup()` using Process:** +```python +def cleanup(self): + # Kill all running child processes for this crawl + for snapshot in self.snapshot_set.all(): + for ar in snapshot.archiveresult_set.filter(status='started'): + if ar.process_id: + # Kill hook process and all its children + ar.process.kill() + for child in ar.process.children.filter(status='running'): + child.kill() + + # Run on_CrawlEnd hooks (foreground) + # ... existing hook running logic ... +``` + +#### `supervisord_util.py` Changes + +**Current global tracking:** +```python +_supervisord_proc = None # subprocess.Popen reference + +def stop_existing_supervisord_process(): + global _supervisord_proc + if _supervisord_proc and _supervisord_proc.poll() is None: + _supervisord_proc.terminate() + _supervisord_proc.wait(timeout=5) + # ... fallback to PID file ... +``` + +**New using Process model:** +```python +_supervisord_db_process = None # Process model instance + +def start_new_supervisord_process(): + # ... existing subprocess.Popen ... + global _supervisord_db_process + _supervisord_db_process = Process.objects.create( + parent=Process.current(), + process_type=Process.TypeChoices.SUPERVISORD, + pid=proc.pid, + cmd=['supervisord', f'--configuration={CONFIG_FILE}'], + started_at=timezone.now(), + status=Process.StatusChoices.RUNNING, + ) + +def stop_existing_supervisord_process(): + global _supervisord_db_process + if _supervisord_db_process: + _supervisord_db_process.kill() # Handles children, PID validation, etc. + _supervisord_db_process = None +``` + +#### `workers/worker.py` Changes + +**Current:** +```python +from .pid_utils import write_pid_file, remove_pid_file, ... + +def on_startup(self): + self.pid = os.getpid() + self.pid_file = write_pid_file(self.name, self.worker_id) + +def on_shutdown(self, error=None): + if self.pid_file: + remove_pid_file(self.pid_file) +``` + +**New:** +```python +# No import needed - Process.current() handles everything + +def on_startup(self): + self.db_process = Process.current() + # Process.current() auto-detects type, finds parent via PPID, creates record + +def on_shutdown(self, error=None): + if self.db_process: + self.db_process.exit_code = 0 if error is None else 1 + self.db_process.status = Process.StatusChoices.EXITED + self.db_process.ended_at = timezone.now() + self.db_process.save() +``` + +### 8.3 New Process Model Methods Summary + +All process operations now go through `Process`: + +```python +# Getting current process +Process.current() # Creates/retrieves Process for os.getpid() + +# Spawning new process +proc = Process.objects.create(parent=Process.current(), cmd=[...], ...) +proc.launch(background=False) # Handles Popen, PID file, stdout/stderr + +# Checking process status +proc.is_running # True if OS process exists and matches +proc.proc # psutil.Process or None (validated) +proc.poll() # Returns exit_code or None + +# Terminating process +proc.kill() # Safe kill with PID validation +proc.kill(SIGKILL) # Force kill + +# Waiting for completion +proc.wait(timeout=30) # Blocks until exit or timeout + +# Cleanup +Process.cleanup_stale_running() # Mark orphaned processes as EXITED +``` + +### 8.4 Benefits + +1. **Single Source of Truth**: All process state in database, queryable +2. **PID Reuse Protection**: `Process.proc` validates via psutil.create_time() +3. **Hierarchy Tracking**: `Process.parent` / `Process.children` for tree traversal +4. **Machine-Scoped**: All queries filter by `machine=Machine.current()` +5. **Audit Trail**: Every subprocess is logged with timestamps, exit codes +6. **No Stale PID Files**: Process records update status automatically + +--- + ## Open Questions 1. **Performance**: Deep hierarchies with many children could slow queries. Consider: diff --git a/archivebox/machine/migrations/0002_process_parent_and_type.py b/archivebox/machine/migrations/0002_process_parent_and_type.py new file mode 100644 index 00000000..3b2c8ceb --- /dev/null +++ b/archivebox/machine/migrations/0002_process_parent_and_type.py @@ -0,0 +1,96 @@ +# Generated on 2025-12-31 +# Adds parent FK and process_type field to Process model + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('machine', '0001_initial'), + ] + + operations = [ + migrations.SeparateDatabaseAndState( + database_operations=[ + migrations.RunSQL( + sql=""" + -- Add parent_id FK column to machine_process + ALTER TABLE machine_process ADD COLUMN parent_id TEXT REFERENCES machine_process(id) ON DELETE SET NULL; + CREATE INDEX IF NOT EXISTS machine_process_parent_id_idx ON machine_process(parent_id); + + -- Add process_type column with default 'binary' + ALTER TABLE machine_process ADD COLUMN process_type VARCHAR(16) NOT NULL DEFAULT 'binary'; + CREATE INDEX IF NOT EXISTS machine_process_process_type_idx ON machine_process(process_type); + + -- Add composite index for parent + status queries + CREATE INDEX IF NOT EXISTS machine_process_parent_status_idx ON machine_process(parent_id, status); + + -- Add composite index for machine + pid + started_at (for PID reuse protection) + CREATE INDEX IF NOT EXISTS machine_process_machine_pid_started_idx ON machine_process(machine_id, pid, started_at); + """, + reverse_sql=""" + DROP INDEX IF EXISTS machine_process_machine_pid_started_idx; + DROP INDEX IF EXISTS machine_process_parent_status_idx; + DROP INDEX IF EXISTS machine_process_process_type_idx; + DROP INDEX IF EXISTS machine_process_parent_id_idx; + + -- SQLite doesn't support DROP COLUMN directly, but we record the intent + -- In practice, this migration is forward-only for SQLite + -- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN process_type; + -- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN parent_id; + """ + ), + ], + state_operations=[ + # Add parent FK + migrations.AddField( + model_name='process', + name='parent', + field=models.ForeignKey( + blank=True, + help_text='Parent process that spawned this one', + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name='children', + to='machine.process', + ), + ), + # Add process_type field + migrations.AddField( + model_name='process', + name='process_type', + field=models.CharField( + choices=[ + ('cli', 'CLI Command'), + ('supervisord', 'Supervisord Daemon'), + ('orchestrator', 'Orchestrator'), + ('worker', 'Worker Process'), + ('hook', 'Hook Script'), + ('binary', 'Binary Execution'), + ], + db_index=True, + default='binary', + help_text='Type of process in the execution hierarchy', + max_length=16, + ), + ), + # Add indexes + migrations.AddIndex( + model_name='process', + index=models.Index( + fields=['parent', 'status'], + name='machine_pro_parent__status_idx', + ), + ), + migrations.AddIndex( + model_name='process', + index=models.Index( + fields=['machine', 'pid', 'started_at'], + name='machine_pro_machine_pid_idx', + ), + ), + ], + ), + ] diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index 2d15bf1f..c19f320f 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -1,8 +1,11 @@ __package__ = 'archivebox.machine' +import os +import sys import socket +from pathlib import Path from archivebox.uuid_compat import uuid7 -from datetime import timedelta +from datetime import timedelta, datetime from statemachine import State, registry @@ -14,13 +17,23 @@ from archivebox.base_models.models import ModelWithHealthStats from archivebox.workers.models import BaseStateMachine from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats +try: + import psutil + PSUTIL_AVAILABLE = True +except ImportError: + PSUTIL_AVAILABLE = False + _CURRENT_MACHINE = None _CURRENT_INTERFACE = None _CURRENT_BINARIES = {} +_CURRENT_PROCESS = None MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60 NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60 BINARY_RECHECK_INTERVAL = 1 * 30 * 60 +PROCESS_RECHECK_INTERVAL = 60 # Re-validate every 60 seconds +PID_REUSE_WINDOW = timedelta(hours=24) # Max age for considering a PID match valid +START_TIME_TOLERANCE = 5.0 # Seconds tolerance for start time matching class MachineManager(models.Manager): @@ -458,6 +471,56 @@ class Binary(ModelWithHealthStats): class ProcessManager(models.Manager): """Manager for Process model.""" + def current(self) -> 'Process': + """Get the Process record for the current OS process.""" + return Process.current() + + def get_by_pid(self, pid: int, machine: 'Machine' = None) -> 'Process | None': + """ + Find a Process by PID with proper validation against PID reuse. + + IMPORTANT: PIDs are reused by the OS! This method: + 1. Filters by machine (required - PIDs are only unique per machine) + 2. Filters by time window (processes older than 24h are stale) + 3. Validates via psutil that start times match + + Args: + pid: OS process ID + machine: Machine instance (defaults to current machine) + + Returns: + Process if found and validated, None otherwise + """ + if not PSUTIL_AVAILABLE: + return None + + machine = machine or Machine.current() + + # Get the actual process start time from OS + try: + os_proc = psutil.Process(pid) + os_start_time = os_proc.create_time() + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + # Process doesn't exist - any DB record with this PID is stale + return None + + # Query candidates: same machine, same PID, recent, still RUNNING + candidates = self.filter( + machine=machine, + pid=pid, + status=Process.StatusChoices.RUNNING, + started_at__gte=timezone.now() - PID_REUSE_WINDOW, + ).order_by('-started_at') + + for candidate in candidates: + # Validate start time matches (within tolerance) + if candidate.started_at: + db_start_time = candidate.started_at.timestamp() + if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE: + return candidate + + return None + def create_for_archiveresult(self, archiveresult, **kwargs): """ Create a Process record for an ArchiveResult. @@ -500,11 +563,38 @@ class Process(ModelWithHealthStats): RUNNING = 'running', 'Running' EXITED = 'exited', 'Exited' + class TypeChoices(models.TextChoices): + CLI = 'cli', 'CLI Command' + SUPERVISORD = 'supervisord', 'Supervisord Daemon' + ORCHESTRATOR = 'orchestrator', 'Orchestrator' + WORKER = 'worker', 'Worker Process' + HOOK = 'hook', 'Hook Script' + BINARY = 'binary', 'Binary Execution' + # Primary fields id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True) created_at = models.DateTimeField(default=timezone.now, db_index=True) modified_at = models.DateTimeField(auto_now=True) + # Parent process FK for hierarchy tracking + parent = models.ForeignKey( + 'self', + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name='children', + help_text='Parent process that spawned this one' + ) + + # Process type for distinguishing in hierarchy + process_type = models.CharField( + max_length=16, + choices=TypeChoices.choices, + default=TypeChoices.BINARY, + db_index=True, + help_text='Type of process in the execution hierarchy' + ) + # Machine FK - required (every process runs on a machine) machine = models.ForeignKey( Machine, @@ -592,6 +682,8 @@ class Process(ModelWithHealthStats): indexes = [ models.Index(fields=['machine', 'status', 'retry_at']), models.Index(fields=['binary', 'exit_code']), + models.Index(fields=['parent', 'status']), + models.Index(fields=['machine', 'pid', 'started_at']), ] def __str__(self) -> str: @@ -660,6 +752,774 @@ class Process(ModelWithHealthStats): self.modified_at = timezone.now() self.save() + # ========================================================================= + # Process.current() and hierarchy methods + # ========================================================================= + + @classmethod + def current(cls) -> 'Process': + """ + Get or create the Process record for the current OS process. + + Similar to Machine.current(), this: + 1. Checks cache for existing Process with matching PID + 2. Validates the cached Process is still valid (PID not reused) + 3. Creates new Process if needed + + IMPORTANT: Uses psutil to validate PID hasn't been reused. + PIDs are recycled by OS, so we compare start times. + """ + global _CURRENT_PROCESS + + current_pid = os.getpid() + machine = Machine.current() + + # Check cache validity + if _CURRENT_PROCESS: + # Verify: same PID, same machine, cache not expired + if (_CURRENT_PROCESS.pid == current_pid and + _CURRENT_PROCESS.machine_id == machine.id and + timezone.now() < _CURRENT_PROCESS.modified_at + timedelta(seconds=PROCESS_RECHECK_INTERVAL)): + return _CURRENT_PROCESS + _CURRENT_PROCESS = None + + # Get actual process start time from OS for validation + os_start_time = None + if PSUTIL_AVAILABLE: + try: + os_proc = psutil.Process(current_pid) + os_start_time = os_proc.create_time() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + # Try to find existing Process for this PID on this machine + # Filter by: machine + PID + RUNNING + recent + start time matches + if os_start_time: + existing = cls.objects.filter( + machine=machine, + pid=current_pid, + status=cls.StatusChoices.RUNNING, + started_at__gte=timezone.now() - PID_REUSE_WINDOW, + ).order_by('-started_at').first() + + if existing and existing.started_at: + db_start_time = existing.started_at.timestamp() + if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE: + _CURRENT_PROCESS = existing + return existing + + # No valid existing record - create new one + parent = cls._find_parent_process(machine) + process_type = cls._detect_process_type() + + # Use psutil start time if available (more accurate than timezone.now()) + if os_start_time: + started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone()) + else: + started_at = timezone.now() + + _CURRENT_PROCESS = cls.objects.create( + machine=machine, + parent=parent, + process_type=process_type, + cmd=sys.argv, + pwd=os.getcwd(), + pid=current_pid, + started_at=started_at, + status=cls.StatusChoices.RUNNING, + ) + return _CURRENT_PROCESS + + @classmethod + def _find_parent_process(cls, machine: 'Machine' = None) -> 'Process | None': + """ + Find the parent Process record by looking up PPID. + + IMPORTANT: Validates against PID reuse by checking: + 1. Same machine (PIDs are only unique per machine) + 2. Start time matches OS process start time + 3. Process is still RUNNING and recent + + Returns None if parent is not an ArchiveBox process. + """ + if not PSUTIL_AVAILABLE: + return None + + ppid = os.getppid() + machine = machine or Machine.current() + + # Get parent process start time from OS + try: + os_parent = psutil.Process(ppid) + os_parent_start = os_parent.create_time() + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + return None # Parent process doesn't exist + + # Find matching Process record + candidates = cls.objects.filter( + machine=machine, + pid=ppid, + status=cls.StatusChoices.RUNNING, + started_at__gte=timezone.now() - PID_REUSE_WINDOW, + ).order_by('-started_at') + + for candidate in candidates: + if candidate.started_at: + db_start_time = candidate.started_at.timestamp() + if abs(db_start_time - os_parent_start) < START_TIME_TOLERANCE: + return candidate + + return None # No matching ArchiveBox parent process + + @classmethod + def _detect_process_type(cls) -> str: + """ + Detect the type of the current process from sys.argv. + """ + argv_str = ' '.join(sys.argv).lower() + + if 'supervisord' in argv_str: + return cls.TypeChoices.SUPERVISORD + elif 'orchestrator' in argv_str: + return cls.TypeChoices.ORCHESTRATOR + elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']): + return cls.TypeChoices.WORKER + elif 'archivebox' in argv_str: + return cls.TypeChoices.CLI + else: + return cls.TypeChoices.BINARY + + @classmethod + def cleanup_stale_running(cls, machine: 'Machine' = None) -> int: + """ + Mark stale RUNNING processes as EXITED. + + Processes are stale if: + - Status is RUNNING but OS process no longer exists + - Status is RUNNING but started_at is older than PID_REUSE_WINDOW + + Returns count of processes cleaned up. + """ + machine = machine or Machine.current() + cleaned = 0 + + stale = cls.objects.filter( + machine=machine, + status=cls.StatusChoices.RUNNING, + ) + + for proc in stale: + is_stale = False + + # Check if too old (PID definitely reused) + if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW: + is_stale = True + elif PSUTIL_AVAILABLE: + # Check if OS process still exists with matching start time + try: + os_proc = psutil.Process(proc.pid) + if proc.started_at: + db_start = proc.started_at.timestamp() + os_start = os_proc.create_time() + if abs(db_start - os_start) > START_TIME_TOLERANCE: + is_stale = True # PID reused by different process + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + is_stale = True # Process no longer exists + + if is_stale: + proc.status = cls.StatusChoices.EXITED + proc.ended_at = proc.ended_at or timezone.now() + proc.exit_code = proc.exit_code if proc.exit_code is not None else -1 + proc.save(update_fields=['status', 'ended_at', 'exit_code']) + cleaned += 1 + + return cleaned + + # ========================================================================= + # Tree traversal properties + # ========================================================================= + + @property + def root(self) -> 'Process': + """Get the root process (CLI command) of this hierarchy.""" + proc = self + while proc.parent_id: + proc = proc.parent + return proc + + @property + def ancestors(self) -> list['Process']: + """Get all ancestor processes from parent to root.""" + ancestors = [] + proc = self.parent + while proc: + ancestors.append(proc) + proc = proc.parent + return ancestors + + @property + def depth(self) -> int: + """Get depth in the process tree (0 = root).""" + return len(self.ancestors) + + def get_descendants(self, include_self: bool = False): + """Get all descendant processes recursively.""" + if include_self: + pks = [self.pk] + else: + pks = [] + + children = list(self.children.values_list('pk', flat=True)) + while children: + pks.extend(children) + children = list(Process.objects.filter(parent_id__in=children).values_list('pk', flat=True)) + + return Process.objects.filter(pk__in=pks) + + # ========================================================================= + # Validated psutil access via .proc property + # ========================================================================= + + @property + def proc(self) -> 'psutil.Process | None': + """ + Get validated psutil.Process for this record. + + Returns psutil.Process ONLY if: + 1. Process with this PID exists in OS + 2. OS process start time matches our started_at (within tolerance) + 3. Process is on current machine + + Returns None if: + - PID doesn't exist (process exited) + - PID was reused by a different process (start times don't match) + - We're on a different machine than where process ran + - psutil is not available + + This prevents accidentally matching a stale/recycled PID. + """ + if not PSUTIL_AVAILABLE: + return None + + # Can't get psutil.Process if we don't have a PID + if not self.pid: + return None + + # Can't validate processes on other machines + if self.machine_id != Machine.current().id: + return None + + try: + os_proc = psutil.Process(self.pid) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + return None # Process no longer exists + + # Validate start time matches to prevent PID reuse confusion + if self.started_at: + os_start_time = os_proc.create_time() + db_start_time = self.started_at.timestamp() + + if abs(os_start_time - db_start_time) > START_TIME_TOLERANCE: + # PID has been reused by a different process! + return None + + # Optionally validate command matches (extra safety) + if self.cmd: + try: + os_cmdline = os_proc.cmdline() + # Check if first arg (binary) matches + if os_cmdline and self.cmd: + os_binary = os_cmdline[0] if os_cmdline else '' + db_binary = self.cmd[0] if self.cmd else '' + # Match by basename (handles /usr/bin/python3 vs python3) + if os_binary and db_binary: + if Path(os_binary).name != Path(db_binary).name: + return None # Different binary, PID reused + except (psutil.AccessDenied, psutil.ZombieProcess): + pass # Can't check cmdline, trust start time match + + return os_proc + + @property + def is_running(self) -> bool: + """ + Check if process is currently running via psutil. + + More reliable than checking status field since it validates + the actual OS process exists and matches our record. + """ + proc = self.proc + return proc is not None and proc.is_running() + + def is_alive(self) -> bool: + """ + Alias for is_running, for compatibility with subprocess.Popen API. + """ + return self.is_running + + def get_memory_info(self) -> dict | None: + """Get memory usage if process is running.""" + proc = self.proc + if proc: + try: + mem = proc.memory_info() + return {'rss': mem.rss, 'vms': mem.vms} + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + return None + + def get_cpu_percent(self) -> float | None: + """Get CPU usage percentage if process is running.""" + proc = self.proc + if proc: + try: + return proc.cpu_percent(interval=0.1) + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + return None + + def get_children_pids(self) -> list[int]: + """Get PIDs of child processes from OS (not DB).""" + proc = self.proc + if proc: + try: + return [child.pid for child in proc.children(recursive=True)] + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + return [] + + # ========================================================================= + # Lifecycle methods (launch, kill, poll, wait) + # ========================================================================= + + @property + def pid_file(self) -> Path: + """Path to PID file for this process.""" + return Path(self.pwd) / 'process.pid' if self.pwd else None + + @property + def cmd_file(self) -> Path: + """Path to cmd.sh script for this process.""" + return Path(self.pwd) / 'cmd.sh' if self.pwd else None + + @property + def stdout_file(self) -> Path: + """Path to stdout log.""" + return Path(self.pwd) / 'stdout.log' if self.pwd else None + + @property + def stderr_file(self) -> Path: + """Path to stderr log.""" + return Path(self.pwd) / 'stderr.log' if self.pwd else None + + def _write_pid_file(self) -> None: + """Write PID file with mtime set to process start time.""" + from archivebox.misc.process_utils import write_pid_file_with_mtime + if self.pid and self.started_at and self.pid_file: + write_pid_file_with_mtime( + self.pid_file, + self.pid, + self.started_at.timestamp() + ) + + def _write_cmd_file(self) -> None: + """Write cmd.sh script for debugging/validation.""" + from archivebox.misc.process_utils import write_cmd_file + if self.cmd and self.cmd_file: + write_cmd_file(self.cmd_file, self.cmd) + + def _build_env(self) -> dict: + """Build environment dict for subprocess, merging stored env with system.""" + env = os.environ.copy() + env.update(self.env or {}) + return env + + def launch(self, background: bool = False) -> 'Process': + """ + Spawn the subprocess and update this Process record. + + Args: + background: If True, don't wait for completion (for daemons/bg hooks) + + Returns: + self (updated with pid, started_at, etc.) + """ + import subprocess + import time + + # Ensure output directory exists + if self.pwd: + Path(self.pwd).mkdir(parents=True, exist_ok=True) + + # Write cmd.sh for debugging + self._write_cmd_file() + + stdout_path = self.stdout_file + stderr_path = self.stderr_file + + with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err: + proc = subprocess.Popen( + self.cmd, + cwd=self.pwd, + stdout=out, + stderr=err, + env=self._build_env(), + ) + + # Get accurate start time from psutil if available + if PSUTIL_AVAILABLE: + try: + ps_proc = psutil.Process(proc.pid) + self.started_at = datetime.fromtimestamp( + ps_proc.create_time(), + tz=timezone.get_current_timezone() + ) + except (psutil.NoSuchProcess, psutil.AccessDenied): + self.started_at = timezone.now() + else: + self.started_at = timezone.now() + + self.pid = proc.pid + self.status = self.StatusChoices.RUNNING + self.save() + + self._write_pid_file() + + if not background: + try: + proc.wait(timeout=self.timeout) + self.exit_code = proc.returncode + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + self.exit_code = -1 + + self.ended_at = timezone.now() + if stdout_path.exists(): + self.stdout = stdout_path.read_text() + if stderr_path.exists(): + self.stderr = stderr_path.read_text() + self.status = self.StatusChoices.EXITED + self.save() + + return self + + def kill(self, signal_num: int = 15) -> bool: + """ + Kill this process and update status. + + Uses self.proc for safe killing - only kills if PID matches + our recorded process (prevents killing recycled PIDs). + + Args: + signal_num: Signal to send (default SIGTERM=15) + + Returns: + True if killed successfully, False otherwise + """ + # Use validated psutil.Process to ensure we're killing the right process + proc = self.proc + if proc is None: + # Process doesn't exist or PID was recycled - just update status + if self.status != self.StatusChoices.EXITED: + self.status = self.StatusChoices.EXITED + self.ended_at = self.ended_at or timezone.now() + self.save() + return False + + try: + # Safe to kill - we validated it's our process via start time match + proc.send_signal(signal_num) + + # Update our record + self.exit_code = -signal_num + self.ended_at = timezone.now() + self.status = self.StatusChoices.EXITED + self.save() + + # Clean up PID file + if self.pid_file and self.pid_file.exists(): + self.pid_file.unlink(missing_ok=True) + + return True + except (psutil.NoSuchProcess, psutil.AccessDenied, ProcessLookupError): + # Process already exited between proc check and kill + self.status = self.StatusChoices.EXITED + self.ended_at = self.ended_at or timezone.now() + self.save() + return False + + def poll(self) -> int | None: + """ + Check if process has exited and update status if so. + + Returns: + exit_code if exited, None if still running + """ + if self.status == self.StatusChoices.EXITED: + return self.exit_code + + if not self.is_running: + # Process exited - read output and update status + if self.stdout_file and self.stdout_file.exists(): + self.stdout = self.stdout_file.read_text() + if self.stderr_file and self.stderr_file.exists(): + self.stderr = self.stderr_file.read_text() + + # Try to get exit code from proc or default to unknown + self.exit_code = self.exit_code if self.exit_code is not None else -1 + self.ended_at = timezone.now() + self.status = self.StatusChoices.EXITED + self.save() + return self.exit_code + + return None # Still running + + def wait(self, timeout: int | None = None) -> int: + """ + Wait for process to exit, polling periodically. + + Args: + timeout: Max seconds to wait (None = use self.timeout) + + Returns: + exit_code + + Raises: + TimeoutError if process doesn't exit in time + """ + import time + + timeout = timeout or self.timeout + start = time.time() + + while True: + exit_code = self.poll() + if exit_code is not None: + return exit_code + + if time.time() - start > timeout: + raise TimeoutError(f"Process {self.id} did not exit within {timeout}s") + + time.sleep(0.1) + + def terminate(self, graceful_timeout: float = 5.0) -> bool: + """ + Gracefully terminate process: SIGTERM → wait → SIGKILL. + + This consolidates the scattered SIGTERM/SIGKILL logic from: + - crawls/models.py Crawl.cleanup() + - workers/pid_utils.py stop_worker() + - supervisord_util.py stop_existing_supervisord_process() + + Args: + graceful_timeout: Seconds to wait after SIGTERM before SIGKILL + + Returns: + True if process was terminated, False if already dead + """ + import time + import signal + + proc = self.proc + if proc is None: + # Already dead - just update status + if self.status != self.StatusChoices.EXITED: + self.status = self.StatusChoices.EXITED + self.ended_at = self.ended_at or timezone.now() + self.save() + return False + + try: + # Step 1: Send SIGTERM for graceful shutdown + proc.terminate() + + # Step 2: Wait for graceful exit + try: + proc.wait(timeout=graceful_timeout) + # Process exited gracefully + self.exit_code = proc.returncode if hasattr(proc, 'returncode') else 0 + self.status = self.StatusChoices.EXITED + self.ended_at = timezone.now() + self.save() + return True + except psutil.TimeoutExpired: + pass # Still running, need to force kill + + # Step 3: Force kill with SIGKILL + proc.kill() + proc.wait(timeout=2) + + self.exit_code = -signal.SIGKILL + self.status = self.StatusChoices.EXITED + self.ended_at = timezone.now() + self.save() + return True + + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + # Process already dead + self.status = self.StatusChoices.EXITED + self.ended_at = self.ended_at or timezone.now() + self.save() + return False + + def kill_tree(self, graceful_timeout: float = 2.0) -> int: + """ + Kill this process and all its children (OS children, not DB children). + + This consolidates the scattered child-killing logic from: + - crawls/models.py Crawl.cleanup() os.killpg() + - supervisord_util.py stop_existing_supervisord_process() + + Args: + graceful_timeout: Seconds to wait after SIGTERM before SIGKILL + + Returns: + Number of processes killed (including self) + """ + import signal + + killed_count = 0 + proc = self.proc + if proc is None: + # Already dead + if self.status != self.StatusChoices.EXITED: + self.status = self.StatusChoices.EXITED + self.ended_at = self.ended_at or timezone.now() + self.save() + return 0 + + try: + # Get all children before killing parent + children = proc.children(recursive=True) + + # Kill children first (reverse order - deepest first) + for child in reversed(children): + try: + child.terminate() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + # Wait briefly for children to exit + gone, alive = psutil.wait_procs(children, timeout=graceful_timeout) + killed_count += len(gone) + + # Force kill remaining children + for child in alive: + try: + child.kill() + killed_count += 1 + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + # Now kill self + if self.terminate(graceful_timeout=graceful_timeout): + killed_count += 1 + + return killed_count + + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + # Process tree already dead + self.status = self.StatusChoices.EXITED + self.ended_at = self.ended_at or timezone.now() + self.save() + return killed_count + + def kill_children_db(self) -> int: + """ + Kill all DB-tracked child processes (via parent FK). + + Different from kill_tree() which uses OS children. + This kills processes created via Process.create(parent=self). + + Returns: + Number of child Process records killed + """ + killed = 0 + for child in self.children.filter(status=self.StatusChoices.RUNNING): + if child.terminate(): + killed += 1 + return killed + + # ========================================================================= + # Class methods for querying processes + # ========================================================================= + + @classmethod + def get_running(cls, process_type: str = None, machine: 'Machine' = None) -> 'QuerySet[Process]': + """ + Get all running processes, optionally filtered by type. + + Replaces: + - workers/pid_utils.py get_all_worker_pids() + - workers/orchestrator.py get_total_worker_count() + + Args: + process_type: Filter by TypeChoices (e.g., 'worker', 'hook') + machine: Filter by machine (defaults to current) + + Returns: + QuerySet of running Process records + """ + machine = machine or Machine.current() + qs = cls.objects.filter( + machine=machine, + status=cls.StatusChoices.RUNNING, + ) + if process_type: + qs = qs.filter(process_type=process_type) + return qs + + @classmethod + def get_running_count(cls, process_type: str = None, machine: 'Machine' = None) -> int: + """ + Get count of running processes. + + Replaces: + - workers/pid_utils.py get_running_worker_count() + """ + return cls.get_running(process_type=process_type, machine=machine).count() + + @classmethod + def stop_all(cls, process_type: str = None, machine: 'Machine' = None, graceful: bool = True) -> int: + """ + Stop all running processes of a given type. + + Args: + process_type: Filter by TypeChoices + machine: Filter by machine + graceful: If True, use terminate() (SIGTERM→SIGKILL), else kill() + + Returns: + Number of processes stopped + """ + stopped = 0 + for proc in cls.get_running(process_type=process_type, machine=machine): + if graceful: + if proc.terminate(): + stopped += 1 + else: + if proc.kill(): + stopped += 1 + return stopped + + @classmethod + def get_next_worker_id(cls, process_type: str = 'worker', machine: 'Machine' = None) -> int: + """ + Get the next available worker ID for spawning new workers. + + Replaces workers/pid_utils.py get_next_worker_id(). + Simply returns count of running workers of this type. + + Args: + process_type: Worker type to count + machine: Machine to scope query + + Returns: + Next available worker ID (0-indexed) + """ + return cls.get_running_count(process_type=process_type, machine=machine) + # ============================================================================= # Binary State Machine From b822352fc3aa571079edac71a160b20151f07eea Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 31 Dec 2025 10:15:22 +0000 Subject: [PATCH 2/4] Delete pid_utils.py and migrate to Process model DELETED: - workers/pid_utils.py (-192 lines) - replaced by Process model methods SIMPLIFIED: - crawls/models.py Crawl.cleanup() (80 lines -> 10 lines) - hooks.py: deleted process_is_alive() and kill_process() (-45 lines) UPDATED to use Process model: - core/models.py: Snapshot.cleanup() and has_running_background_hooks() - machine/models.py: Binary.cleanup() - workers/worker.py: Worker.on_startup/shutdown, get_running_workers, start - workers/orchestrator.py: Orchestrator.on_startup/shutdown, is_running All subprocess management now uses: - Process.current() for registering current process - Process.get_running() / get_running_count() for querying - Process.cleanup_stale_running() for cleanup - safe_kill_process() for validated PID killing Total line reduction: ~250 lines --- archivebox/core/models.py | 10 +- archivebox/crawls/models.py | 75 +---------- archivebox/hooks.py | 49 -------- archivebox/machine/models.py | 6 +- archivebox/workers/orchestrator.py | 44 ++++--- archivebox/workers/pid_utils.py | 191 ----------------------------- archivebox/workers/worker.py | 47 ++++--- 7 files changed, 63 insertions(+), 359 deletions(-) delete mode 100644 archivebox/workers/pid_utils.py diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 883733c5..f7b45ba9 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -1385,7 +1385,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea Called by the state machine when entering the 'sealed' state. Kills any background hooks and finalizes their ArchiveResults. """ - from archivebox.hooks import kill_process + from archivebox.misc.process_utils import safe_kill_process # Kill any background ArchiveResult hooks if not self.OUTPUT_DIR.exists(): @@ -1393,7 +1393,8 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea # Find all .pid files in this snapshot's output directory for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): - kill_process(pid_file, validate=True) + cmd_file = pid_file.parent / 'cmd.sh' + safe_kill_process(pid_file, cmd_file) # Update all STARTED ArchiveResults from filesystem results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED) @@ -1406,7 +1407,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea Used by state machine to determine if snapshot is finished. """ - from archivebox.hooks import process_is_alive + from archivebox.misc.process_utils import validate_pid_file if not self.OUTPUT_DIR.exists(): return False @@ -1415,7 +1416,8 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea if not plugin_dir.is_dir(): continue pid_file = plugin_dir / 'hook.pid' - if process_is_alive(pid_file): + cmd_file = plugin_dir / 'cmd.sh' + if validate_pid_file(pid_file, cmd_file): return True return False diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 3e1a53f9..abf21175 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -417,84 +417,15 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith def cleanup(self): """Clean up background hooks and run on_CrawlEnd hooks.""" - import os - import signal - import time - from pathlib import Path from archivebox.hooks import run_hook, discover_hooks - from archivebox.misc.process_utils import validate_pid_file - - def is_process_alive(pid): - """Check if a process exists.""" - try: - os.kill(pid, 0) # Signal 0 checks existence without killing - return True - except (OSError, ProcessLookupError): - return False + from archivebox.misc.process_utils import safe_kill_process # Kill any background processes by scanning for all .pid files if self.OUTPUT_DIR.exists(): for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): - # Validate PID before killing to avoid killing unrelated processes cmd_file = pid_file.parent / 'cmd.sh' - if not validate_pid_file(pid_file, cmd_file): - # PID reused by different process or process dead - pid_file.unlink(missing_ok=True) - continue - - try: - pid = int(pid_file.read_text().strip()) - - # Step 1: Send SIGTERM for graceful shutdown - try: - # Try to kill process group first (handles detached processes like Chrome) - try: - os.killpg(pid, signal.SIGTERM) - except (OSError, ProcessLookupError): - # Fall back to killing just the process - os.kill(pid, signal.SIGTERM) - except ProcessLookupError: - # Already dead - pid_file.unlink(missing_ok=True) - continue - - # Step 2: Wait for graceful shutdown - time.sleep(2) - - # Step 3: Check if still alive - if not is_process_alive(pid): - # Process terminated gracefully - pid_file.unlink(missing_ok=True) - continue - - # Step 4: Process still alive, force kill ENTIRE process group with SIGKILL - try: - try: - # Always kill entire process group with SIGKILL (not individual processes) - os.killpg(pid, signal.SIGKILL) - except (OSError, ProcessLookupError) as e: - # Process group kill failed, try single process as fallback - os.kill(pid, signal.SIGKILL) - except ProcessLookupError: - # Process died between check and kill - pid_file.unlink(missing_ok=True) - continue - - # Step 5: Wait and verify death - time.sleep(1) - - if is_process_alive(pid): - # Process is unkillable (likely in UNE state on macOS) - # This happens when Chrome crashes in kernel syscall (IOSurface) - # Log but don't block cleanup - process will remain until reboot - print(f'[yellow]⚠️ Process {pid} is unkillable (likely crashed in kernel). Will remain until reboot.[/yellow]') - else: - # Successfully killed - pid_file.unlink(missing_ok=True) - - except (ValueError, OSError) as e: - # Invalid PID file or permission error - pass + safe_kill_process(pid_file, cmd_file) + pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks from archivebox.config.configset import get_config diff --git a/archivebox/hooks.py b/archivebox/hooks.py index 6485f2c0..73febfa0 100644 --- a/archivebox/hooks.py +++ b/archivebox/hooks.py @@ -1233,52 +1233,3 @@ def process_hook_records(records: List[Dict[str, Any]], overrides: Dict[str, Any continue return stats - - -def process_is_alive(pid_file: Path) -> bool: - """ - Check if process in PID file is still running. - - Args: - pid_file: Path to hook.pid file - - Returns: - True if process is alive, False otherwise - """ - if not pid_file.exists(): - return False - - try: - pid = int(pid_file.read_text().strip()) - os.kill(pid, 0) # Signal 0 = check if process exists without killing it - return True - except (OSError, ValueError): - return False - - -def kill_process(pid_file: Path, sig: int = signal.SIGTERM, validate: bool = True): - """ - Kill process in PID file with optional validation. - - Args: - pid_file: Path to hook.pid file - sig: Signal to send (default SIGTERM) - validate: If True, validate process identity before killing (default: True) - """ - from archivebox.misc.process_utils import safe_kill_process - - if validate: - # Use safe kill with validation - cmd_file = pid_file.parent / 'cmd.sh' - safe_kill_process(pid_file, cmd_file, signal_num=sig) - else: - # Legacy behavior - kill without validation - if not pid_file.exists(): - return - try: - pid = int(pid_file.read_text().strip()) - os.kill(pid, sig) - except (OSError, ValueError): - pass - - diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index c19f320f..4bac79d6 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -449,7 +449,7 @@ class Binary(ModelWithHealthStats): since installations are foreground, but included for consistency). """ from pathlib import Path - from archivebox.hooks import kill_process + from archivebox.misc.process_utils import safe_kill_process output_dir = self.OUTPUT_DIR if not output_dir.exists(): @@ -460,8 +460,8 @@ class Binary(ModelWithHealthStats): if not plugin_dir.is_dir(): continue pid_file = plugin_dir / 'hook.pid' - if pid_file.exists(): - kill_process(pid_file) + cmd_file = plugin_dir / 'cmd.sh' + safe_kill_process(pid_file, cmd_file) # ============================================================================= diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index 1b1789cb..370adf85 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -30,7 +30,7 @@ __package__ = 'archivebox.workers' import os import time from typing import Type -from multiprocessing import Process +from multiprocessing import Process as MPProcess from django.utils import timezone @@ -38,12 +38,6 @@ from rich import print from archivebox.misc.logging_util import log_worker_event from .worker import Worker, CrawlWorker, SnapshotWorker, ArchiveResultWorker -from .pid_utils import ( - write_pid_file, - remove_pid_file, - get_all_worker_pids, - cleanup_stale_pid_files, -) def _run_orchestrator_process(exit_on_idle: bool) -> None: @@ -85,16 +79,20 @@ class Orchestrator: @classmethod def is_running(cls) -> bool: """Check if an orchestrator is already running.""" - workers = get_all_worker_pids('orchestrator') - return len(workers) > 0 - + from archivebox.machine.models import Process + + return Process.get_running_count(process_type='orchestrator') > 0 + def on_startup(self) -> None: """Called when orchestrator starts.""" - self.pid = os.getpid() - self.pid_file = write_pid_file('orchestrator', worker_id=0) + from archivebox.machine.models import Process - # Clean up any stale PID files from previous runs - stale_count = cleanup_stale_pid_files() + self.pid = os.getpid() + # Register orchestrator process in database + self.db_process = Process.current() + + # Clean up any stale Process records from previous runs + stale_count = Process.cleanup_stale_running() # Collect startup metadata metadata = { @@ -112,11 +110,15 @@ class Orchestrator: pid=self.pid, metadata=metadata, ) - + def on_shutdown(self, error: BaseException | None = None) -> None: """Called when orchestrator shuts down.""" - if self.pid_file: - remove_pid_file(self.pid_file) + # Update Process record status + if hasattr(self, 'db_process') and self.db_process: + self.db_process.exit_code = 1 if error else 0 + self.db_process.status = self.db_process.StatusChoices.EXITED + self.db_process.ended_at = timezone.now() + self.db_process.save() log_worker_event( worker_type='Orchestrator', @@ -125,10 +127,12 @@ class Orchestrator: pid=self.pid, error=error if error and not isinstance(error, KeyboardInterrupt) else None, ) - + def get_total_worker_count(self) -> int: """Get total count of running workers across all types.""" - cleanup_stale_pid_files() + from archivebox.machine.models import Process + + Process.cleanup_stale_running() return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES) def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool: @@ -287,7 +291,7 @@ class Orchestrator: Returns the PID of the new process. """ # Use module-level function to avoid pickle errors with local functions - proc = Process( + proc = MPProcess( target=_run_orchestrator_process, args=(self.exit_on_idle,), name='orchestrator' diff --git a/archivebox/workers/pid_utils.py b/archivebox/workers/pid_utils.py deleted file mode 100644 index 020fce70..00000000 --- a/archivebox/workers/pid_utils.py +++ /dev/null @@ -1,191 +0,0 @@ -""" -PID file utilities for tracking worker and orchestrator processes. - -PID files are stored in data/tmp/workers/ and contain: -- Line 1: PID -- Line 2: Worker type (orchestrator, crawl, snapshot, archiveresult) -- Line 3: Extractor filter (optional, for archiveresult workers) -- Line 4: Started at ISO timestamp -""" - -__package__ = 'archivebox.workers' - -import os -import signal -from pathlib import Path -from datetime import datetime, timezone - -from django.conf import settings - - -def get_pid_dir() -> Path: - """Get the directory for PID files, creating it if needed.""" - pid_dir = Path(settings.DATA_DIR) / 'tmp' / 'workers' - pid_dir.mkdir(parents=True, exist_ok=True) - return pid_dir - - -def write_pid_file(worker_type: str, worker_id: int = 0, extractor: str | None = None) -> Path: - """ - Write a PID file for the current process. - Returns the path to the PID file. - """ - pid_dir = get_pid_dir() - - if worker_type == 'orchestrator': - pid_file = pid_dir / 'orchestrator.pid' - else: - pid_file = pid_dir / f'{worker_type}_worker_{worker_id}.pid' - - content = f"{os.getpid()}\n{worker_type}\n{extractor or ''}\n{datetime.now(timezone.utc).isoformat()}\n" - pid_file.write_text(content) - - return pid_file - - -def read_pid_file(path: Path) -> dict | None: - """ - Read and parse a PID file. - Returns dict with pid, worker_type, extractor, started_at or None if invalid. - """ - try: - if not path.exists(): - return None - - lines = path.read_text().strip().split('\n') - if len(lines) < 4: - return None - - return { - 'pid': int(lines[0]), - 'worker_type': lines[1], - 'extractor': lines[2] or None, - 'started_at': datetime.fromisoformat(lines[3]), - 'pid_file': path, - } - except (ValueError, IndexError, OSError): - return None - - -def remove_pid_file(path: Path) -> None: - """Remove a PID file if it exists.""" - try: - path.unlink(missing_ok=True) - except OSError: - pass - - -def is_process_alive(pid: int) -> bool: - """Check if a process with the given PID is still running.""" - try: - os.kill(pid, 0) # Signal 0 doesn't kill, just checks - return True - except (OSError, ProcessLookupError): - return False - - -def get_all_pid_files() -> list[Path]: - """Get all PID files in the workers directory.""" - pid_dir = get_pid_dir() - return list(pid_dir.glob('*.pid')) - - -def get_all_worker_pids(worker_type: str | None = None) -> list[dict]: - """ - Get info about all running workers. - Optionally filter by worker_type. - """ - workers = [] - - for pid_file in get_all_pid_files(): - info = read_pid_file(pid_file) - if info is None: - continue - - # Skip if process is dead - if not is_process_alive(info['pid']): - continue - - # Filter by type if specified - if worker_type and info['worker_type'] != worker_type: - continue - - workers.append(info) - - return workers - - -def cleanup_stale_pid_files() -> int: - """ - Remove PID files for processes that are no longer running. - Returns the number of stale files removed. - """ - removed = 0 - - for pid_file in get_all_pid_files(): - info = read_pid_file(pid_file) - if info is None: - # Invalid PID file, remove it - remove_pid_file(pid_file) - removed += 1 - continue - - if not is_process_alive(info['pid']): - remove_pid_file(pid_file) - removed += 1 - - return removed - - -def get_running_worker_count(worker_type: str) -> int: - """Get the count of running workers of a specific type.""" - return len(get_all_worker_pids(worker_type)) - - -def get_next_worker_id(worker_type: str) -> int: - """Get the next available worker ID for a given type.""" - existing_ids = set() - - for pid_file in get_all_pid_files(): - # Parse worker ID from filename like "snapshot_worker_3.pid" - name = pid_file.stem - if name.startswith(f'{worker_type}_worker_'): - try: - worker_id = int(name.split('_')[-1]) - existing_ids.add(worker_id) - except ValueError: - continue - - # Find the lowest unused ID - next_id = 0 - while next_id in existing_ids: - next_id += 1 - - return next_id - - -def stop_worker(pid: int, graceful: bool = True) -> bool: - """ - Stop a worker process. - If graceful=True, sends SIGTERM first, then SIGKILL after timeout. - Returns True if process was stopped. - """ - if not is_process_alive(pid): - return True - - try: - if graceful: - os.kill(pid, signal.SIGTERM) - # Give it a moment to shut down - import time - for _ in range(10): # Wait up to 1 second - time.sleep(0.1) - if not is_process_alive(pid): - return True - # Force kill if still running - os.kill(pid, signal.SIGKILL) - else: - os.kill(pid, signal.SIGKILL) - return True - except (OSError, ProcessLookupError): - return True # Process already dead diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index 404ad0a3..a8a7851e 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -17,7 +17,7 @@ import traceback from typing import ClassVar, Any from datetime import timedelta from pathlib import Path -from multiprocessing import Process, cpu_count +from multiprocessing import Process as MPProcess, cpu_count from django.db.models import QuerySet from django.utils import timezone @@ -26,13 +26,6 @@ from django.conf import settings from rich import print from archivebox.misc.logging_util import log_worker_event -from .pid_utils import ( - write_pid_file, - remove_pid_file, - get_all_worker_pids, - get_next_worker_id, - cleanup_stale_pid_files, -) CPU_COUNT = cpu_count() @@ -133,8 +126,11 @@ class Worker: def on_startup(self) -> None: """Called when worker starts.""" + from archivebox.machine.models import Process + self.pid = os.getpid() - self.pid_file = write_pid_file(self.name, self.worker_id) + # Register this worker process in the database + self.db_process = Process.current() # Determine worker type for logging worker_type_name = self.__class__.__name__ @@ -160,9 +156,12 @@ class Worker: def on_shutdown(self, error: BaseException | None = None) -> None: """Called when worker shuts down.""" - # Remove PID file - if self.pid_file: - remove_pid_file(self.pid_file) + # Update Process record status + if hasattr(self, 'db_process') and self.db_process: + self.db_process.exit_code = 1 if error else 0 + self.db_process.status = self.db_process.StatusChoices.EXITED + self.db_process.ended_at = timezone.now() + self.db_process.save() # Determine worker type for logging worker_type_name = self.__class__.__name__ @@ -288,11 +287,13 @@ class Worker: Fork a new worker as a subprocess. Returns the PID of the new process. """ + from archivebox.machine.models import Process + if worker_id is None: - worker_id = get_next_worker_id(cls.name) + worker_id = Process.get_next_worker_id(process_type=cls.name) # Use module-level function for pickling compatibility - proc = Process( + proc = MPProcess( target=_run_worker, args=(cls.name, worker_id, daemon), kwargs=kwargs, @@ -304,15 +305,19 @@ class Worker: return proc.pid @classmethod - def get_running_workers(cls) -> list[dict]: + def get_running_workers(cls) -> list: """Get info about all running workers of this type.""" - cleanup_stale_pid_files() - return get_all_worker_pids(cls.name) + from archivebox.machine.models import Process + + Process.cleanup_stale_running() + return list(Process.get_running(process_type=cls.name)) @classmethod def get_worker_count(cls) -> int: """Get count of running workers of this type.""" - return len(cls.get_running_workers()) + from archivebox.machine.models import Process + + return Process.get_running_count(process_type=cls.name) class CrawlWorker(Worker): @@ -402,11 +407,13 @@ class ArchiveResultWorker(Worker): @classmethod def start(cls, worker_id: int | None = None, daemon: bool = False, plugin: str | None = None, **kwargs: Any) -> int: """Fork a new worker as subprocess with optional plugin filter.""" + from archivebox.machine.models import Process + if worker_id is None: - worker_id = get_next_worker_id(cls.name) + worker_id = Process.get_next_worker_id(process_type=cls.name) # Use module-level function for pickling compatibility - proc = Process( + proc = MPProcess( target=_run_worker, args=(cls.name, worker_id, daemon), kwargs={'plugin': plugin, **kwargs}, From ee201a0f836d50054307a71bd59e3ebe2b1823be Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Wed, 31 Dec 2025 11:14:47 +0000 Subject: [PATCH 3/4] Fix code review issues in process management refactor - Add pwd validation in Process.launch() to prevent crashes - Fix psutil returncode handling (use wait() return value, not returncode attr) - Add None check for proc.pid in cleanup_stale_running() - Add stale process cleanup in Orchestrator.is_running() - Ensure orchestrator process_type is correctly set to ORCHESTRATOR - Fix KeyboardInterrupt handling (exit code 0 for graceful shutdown) - Throttle cleanup_stale_running() to once per 30 seconds for performance - Fix worker process_type to use TypeChoices.WORKER consistently - Fix get_running_workers() API to return list of dicts (not Process objects) - Only delete PID files after successful kill or confirmed stale - Fix migration index names to match between SQL and Django state - Remove db_index=True from process_type (index created manually) - Update documentation to reflect actual implementation - Add explanatory comments to empty except blocks - Fix exit codes to use Unix convention (128 + signal number) Co-authored-by: Nick Sweeting --- TODO_process_tracking.md | 11 +++++---- archivebox/crawls/models.py | 6 +++-- .../0002_process_parent_and_type.py | 5 ++-- archivebox/machine/models.py | 22 ++++++++++++------ archivebox/workers/orchestrator.py | 23 +++++++++++++++---- archivebox/workers/worker.py | 16 ++++++++++--- 6 files changed, 60 insertions(+), 23 deletions(-) diff --git a/TODO_process_tracking.md b/TODO_process_tracking.md index 4ecf55a7..fe8005e5 100644 --- a/TODO_process_tracking.md +++ b/TODO_process_tracking.md @@ -1726,14 +1726,14 @@ The goal is to consolidate all subprocess management into `Process` model method |------------------|-------------| | `write_pid_file(worker_type, worker_id)` | `Process.current()` auto-creates | | `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` | -| `remove_pid_file(path)` | Automatic on `Process.status = EXITED` | +| `remove_pid_file(path)` | Manual cleanup in `Process.kill()` and legacy hook cleanup code | | `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` | | `get_all_pid_files()` | `Process.objects.filter(status='running')` | | `get_all_worker_pids(type)` | `Process.objects.filter(process_type=type, status='running')` | | `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` | | `get_running_worker_count(type)` | `Process.objects.filter(...).count()` | -| `get_next_worker_id(type)` | Derive from `Process.objects.filter(...).count()` | -| `stop_worker(pid, graceful)` | `Process.kill(signal_num=SIGTERM)` then `Process.kill(SIGKILL)` | +| `get_next_worker_id(type)` | Use `Max(worker_id)+1` under transaction or DB sequence to avoid race conditions | +| `stop_worker(pid, graceful)` | `Process.terminate(graceful_timeout)` or `Process.kill_tree()` | #### `hooks.py` Changes @@ -1752,10 +1752,13 @@ with open(stdout_file, 'w') as out, open(stderr_file, 'w') as err: **New `run_hook()` using Process:** ```python +# Only store env delta or allowlist to avoid leaking secrets +env_delta = {k: v for k, v in env.items() if k in ALLOWED_ENV_VARS} + hook_process = Process.objects.create( parent=parent_process, process_type=Process.TypeChoices.HOOK, - cmd=cmd, pwd=str(output_dir), env=env, timeout=timeout, + cmd=cmd, pwd=str(output_dir), env=env_delta, timeout=timeout, ) hook_process.launch(background=is_background) # stdout/stderr/pid_file all handled internally by Process.launch() diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index abf21175..49f7e89a 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -424,8 +424,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith if self.OUTPUT_DIR.exists(): for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): cmd_file = pid_file.parent / 'cmd.sh' - safe_kill_process(pid_file, cmd_file) - pid_file.unlink(missing_ok=True) + # Only delete PID file if kill succeeded or process is already dead + killed = safe_kill_process(pid_file, cmd_file) + if killed or not pid_file.exists(): + pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks from archivebox.config.configset import get_config diff --git a/archivebox/machine/migrations/0002_process_parent_and_type.py b/archivebox/machine/migrations/0002_process_parent_and_type.py index 3b2c8ceb..ba908467 100644 --- a/archivebox/machine/migrations/0002_process_parent_and_type.py +++ b/archivebox/machine/migrations/0002_process_parent_and_type.py @@ -70,7 +70,6 @@ class Migration(migrations.Migration): ('hook', 'Hook Script'), ('binary', 'Binary Execution'), ], - db_index=True, default='binary', help_text='Type of process in the execution hierarchy', max_length=16, @@ -81,14 +80,14 @@ class Migration(migrations.Migration): model_name='process', index=models.Index( fields=['parent', 'status'], - name='machine_pro_parent__status_idx', + name='machine_process_parent_status_idx', ), ), migrations.AddIndex( model_name='process', index=models.Index( fields=['machine', 'pid', 'started_at'], - name='machine_pro_machine_pid_idx', + name='machine_process_machine_pid_started_idx', ), ), ], diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index 4bac79d6..ddddc37a 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -914,7 +914,7 @@ class Process(ModelWithHealthStats): # Check if too old (PID definitely reused) if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW: is_stale = True - elif PSUTIL_AVAILABLE: + elif PSUTIL_AVAILABLE and proc.pid is not None: # Check if OS process still exists with matching start time try: os_proc = psutil.Process(proc.pid) @@ -1147,9 +1147,12 @@ class Process(ModelWithHealthStats): import subprocess import time + # Validate pwd is set (required for output files) + if not self.pwd: + raise ValueError("Process.pwd must be set before calling launch()") + # Ensure output directory exists - if self.pwd: - Path(self.pwd).mkdir(parents=True, exist_ok=True) + Path(self.pwd).mkdir(parents=True, exist_ok=True) # Write cmd.sh for debugging self._write_cmd_file() @@ -1232,7 +1235,8 @@ class Process(ModelWithHealthStats): proc.send_signal(signal_num) # Update our record - self.exit_code = -signal_num + # Use standard Unix convention: 128 + signal number + self.exit_code = 128 + signal_num self.ended_at = timezone.now() self.status = self.StatusChoices.EXITED self.save() @@ -1336,9 +1340,10 @@ class Process(ModelWithHealthStats): # Step 2: Wait for graceful exit try: - proc.wait(timeout=graceful_timeout) + exit_status = proc.wait(timeout=graceful_timeout) # Process exited gracefully - self.exit_code = proc.returncode if hasattr(proc, 'returncode') else 0 + # psutil.Process.wait() returns the exit status + self.exit_code = exit_status if exit_status is not None else 0 self.status = self.StatusChoices.EXITED self.ended_at = timezone.now() self.save() @@ -1350,7 +1355,8 @@ class Process(ModelWithHealthStats): proc.kill() proc.wait(timeout=2) - self.exit_code = -signal.SIGKILL + # Use standard Unix convention: 128 + signal number + self.exit_code = 128 + signal.SIGKILL self.status = self.StatusChoices.EXITED self.ended_at = timezone.now() self.save() @@ -1398,6 +1404,7 @@ class Process(ModelWithHealthStats): try: child.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied): + # Child already dead or we don't have permission - continue pass # Wait briefly for children to exit @@ -1410,6 +1417,7 @@ class Process(ModelWithHealthStats): child.kill() killed_count += 1 except (psutil.NoSuchProcess, psutil.AccessDenied): + # Child exited or we don't have permission - continue pass # Now kill self diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index 370adf85..bb0046f7 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -72,6 +72,7 @@ class Orchestrator: self.pid: int = os.getpid() self.pid_file = None self.idle_count: int = 0 + self._last_cleanup_time: float = 0.0 # For throttling cleanup_stale_running() def __repr__(self) -> str: return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]' @@ -81,15 +82,21 @@ class Orchestrator: """Check if an orchestrator is already running.""" from archivebox.machine.models import Process - return Process.get_running_count(process_type='orchestrator') > 0 + # Clean up stale processes before counting + Process.cleanup_stale_running() + return Process.get_running_count(process_type=Process.TypeChoices.ORCHESTRATOR) > 0 def on_startup(self) -> None: """Called when orchestrator starts.""" from archivebox.machine.models import Process self.pid = os.getpid() - # Register orchestrator process in database + # Register orchestrator process in database with explicit type self.db_process = Process.current() + # Ensure the process type is correctly set to ORCHESTRATOR + if self.db_process.process_type != Process.TypeChoices.ORCHESTRATOR: + self.db_process.process_type = Process.TypeChoices.ORCHESTRATOR + self.db_process.save(update_fields=['process_type']) # Clean up any stale Process records from previous runs stale_count = Process.cleanup_stale_running() @@ -115,7 +122,8 @@ class Orchestrator: """Called when orchestrator shuts down.""" # Update Process record status if hasattr(self, 'db_process') and self.db_process: - self.db_process.exit_code = 1 if error else 0 + # KeyboardInterrupt is a graceful shutdown, not an error + self.db_process.exit_code = 1 if error and not isinstance(error, KeyboardInterrupt) else 0 self.db_process.status = self.db_process.StatusChoices.EXITED self.db_process.ended_at = timezone.now() self.db_process.save() @@ -131,8 +139,15 @@ class Orchestrator: def get_total_worker_count(self) -> int: """Get total count of running workers across all types.""" from archivebox.machine.models import Process + import time + + # Throttle cleanup to once every 30 seconds to avoid performance issues + CLEANUP_THROTTLE_SECONDS = 30 + now = time.time() + if now - self._last_cleanup_time > CLEANUP_THROTTLE_SECONDS: + Process.cleanup_stale_running() + self._last_cleanup_time = now - Process.cleanup_stale_running() return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES) def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool: diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index a8a7851e..ee1c88fc 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -290,7 +290,7 @@ class Worker: from archivebox.machine.models import Process if worker_id is None: - worker_id = Process.get_next_worker_id(process_type=cls.name) + worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER) # Use module-level function for pickling compatibility proc = MPProcess( @@ -310,14 +310,24 @@ class Worker: from archivebox.machine.models import Process Process.cleanup_stale_running() - return list(Process.get_running(process_type=cls.name)) + # Convert Process objects to dicts to match the expected API contract + processes = Process.get_running(process_type=Process.TypeChoices.WORKER) + return [ + { + 'pid': p.pid, + 'worker_id': p.id, + 'started_at': p.started_at.isoformat() if p.started_at else None, + 'status': p.status, + } + for p in processes + ] @classmethod def get_worker_count(cls) -> int: """Get count of running workers of this type.""" from archivebox.machine.models import Process - return Process.get_running_count(process_type=cls.name) + return Process.get_running_count(process_type=Process.TypeChoices.WORKER) class CrawlWorker(Worker): From b2132d1f14e30051658e523d0818980d629ecc97 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Wed, 31 Dec 2025 11:42:07 +0000 Subject: [PATCH 4/4] Fix cubic review issues: process_type detection, cmd storage, PID cleanup, and migration - Fix Process.current() to store psutil cmdline instead of sys.argv for accurate validation - Fix worker process_type detection: explicitly set to WORKER after registration - Fix ArchiveResultWorker.start() to use Process.TypeChoices.WORKER consistently - Fix migration to be explicitly irreversible (SQLite doesn't support DROP COLUMN) - Fix get_running_workers() to return process_id instead of incorrectly named worker_id - Fix safe_kill_process() to wait for termination and escalate to SIGKILL if needed - Fix migration to include all indexes in state_operations (parent_id, process_type) - Fix documentation to use Machine.current() scoping and StatusChoices constants Co-authored-by: Nick Sweeting --- TODO_process_tracking.md | 6 +-- archivebox/crawls/models.py | 5 +- .../0002_process_parent_and_type.py | 30 ++++++----- archivebox/machine/models.py | 12 ++++- archivebox/misc/process_utils.py | 51 ++++++++++++++++--- archivebox/workers/worker.py | 10 +++- 6 files changed, 88 insertions(+), 26 deletions(-) diff --git a/TODO_process_tracking.md b/TODO_process_tracking.md index fe8005e5..570c3c6e 100644 --- a/TODO_process_tracking.md +++ b/TODO_process_tracking.md @@ -1728,8 +1728,8 @@ The goal is to consolidate all subprocess management into `Process` model method | `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` | | `remove_pid_file(path)` | Manual cleanup in `Process.kill()` and legacy hook cleanup code | | `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` | -| `get_all_pid_files()` | `Process.objects.filter(status='running')` | -| `get_all_worker_pids(type)` | `Process.objects.filter(process_type=type, status='running')` | +| `get_all_pid_files()` | `Process.objects.filter(machine=Machine.current(), status=Process.StatusChoices.RUNNING)` | +| `get_all_worker_pids(type)` | `Process.objects.filter(machine=Machine.current(), process_type=type, status=Process.StatusChoices.RUNNING)` | | `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` | | `get_running_worker_count(type)` | `Process.objects.filter(...).count()` | | `get_next_worker_id(type)` | Use `Max(worker_id)+1` under transaction or DB sequence to avoid race conditions | @@ -1808,7 +1808,7 @@ for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): def cleanup(self): # Kill all running child processes for this crawl for snapshot in self.snapshot_set.all(): - for ar in snapshot.archiveresult_set.filter(status='started'): + for ar in snapshot.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED): if ar.process_id: # Kill hook process and all its children ar.process.kill() diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 49f7e89a..c3b588c4 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -424,9 +424,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith if self.OUTPUT_DIR.exists(): for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): cmd_file = pid_file.parent / 'cmd.sh' - # Only delete PID file if kill succeeded or process is already dead + # safe_kill_process now waits for termination and escalates to SIGKILL + # Returns True only if process is confirmed dead killed = safe_kill_process(pid_file, cmd_file) - if killed or not pid_file.exists(): + if killed: pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks diff --git a/archivebox/machine/migrations/0002_process_parent_and_type.py b/archivebox/machine/migrations/0002_process_parent_and_type.py index ba908467..e70de360 100644 --- a/archivebox/machine/migrations/0002_process_parent_and_type.py +++ b/archivebox/machine/migrations/0002_process_parent_and_type.py @@ -30,17 +30,9 @@ class Migration(migrations.Migration): -- Add composite index for machine + pid + started_at (for PID reuse protection) CREATE INDEX IF NOT EXISTS machine_process_machine_pid_started_idx ON machine_process(machine_id, pid, started_at); """, - reverse_sql=""" - DROP INDEX IF EXISTS machine_process_machine_pid_started_idx; - DROP INDEX IF EXISTS machine_process_parent_status_idx; - DROP INDEX IF EXISTS machine_process_process_type_idx; - DROP INDEX IF EXISTS machine_process_parent_id_idx; - - -- SQLite doesn't support DROP COLUMN directly, but we record the intent - -- In practice, this migration is forward-only for SQLite - -- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN process_type; - -- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN parent_id; - """ + # Migration is irreversible due to SQLite limitations + # SQLite doesn't support DROP COLUMN, would require table rebuild + reverse_sql=migrations.RunSQL.noop ), ], state_operations=[ @@ -75,7 +67,21 @@ class Migration(migrations.Migration): max_length=16, ), ), - # Add indexes + # Add indexes - must match the SQL index names exactly + migrations.AddIndex( + model_name='process', + index=models.Index( + fields=['parent'], + name='machine_process_parent_id_idx', + ), + ), + migrations.AddIndex( + model_name='process', + index=models.Index( + fields=['process_type'], + name='machine_process_process_type_idx', + ), + ), migrations.AddIndex( model_name='process', index=models.Index( diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index ddddc37a..428633b3 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -812,6 +812,16 @@ class Process(ModelWithHealthStats): parent = cls._find_parent_process(machine) process_type = cls._detect_process_type() + # Use psutil cmdline if available (matches what proc() will validate against) + # Otherwise fall back to sys.argv + cmd = sys.argv + if PSUTIL_AVAILABLE: + try: + os_proc = psutil.Process(current_pid) + cmd = os_proc.cmdline() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + # Use psutil start time if available (more accurate than timezone.now()) if os_start_time: started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone()) @@ -822,7 +832,7 @@ class Process(ModelWithHealthStats): machine=machine, parent=parent, process_type=process_type, - cmd=sys.argv, + cmd=cmd, pwd=os.getcwd(), pid=current_pid, started_at=started_at, diff --git a/archivebox/misc/process_utils.py b/archivebox/misc/process_utils.py index 9d3fe52d..15fa861e 100644 --- a/archivebox/misc/process_utils.py +++ b/archivebox/misc/process_utils.py @@ -70,15 +70,54 @@ def write_cmd_file(cmd_file: Path, cmd: list[str]): pass -def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15) -> bool: - """Kill process after validation. Returns True if killed.""" +def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15, timeout: float = 3.0) -> bool: + """ + Kill process after validation, with graceful wait and SIGKILL escalation. + + Returns True only if process is confirmed dead (either already dead or killed successfully). + """ + import time + import signal + if not validate_pid_file(pid_file, cmd_file): pid_file.unlink(missing_ok=True) # Clean stale file - return False + return True # Process already dead, consider it killed try: pid = int(pid_file.read_text().strip()) - os.kill(pid, signal_num) - return True - except (OSError, ValueError, ProcessLookupError): + + # Send initial signal (SIGTERM by default) + try: + os.kill(pid, signal_num) + except ProcessLookupError: + # Process already dead + return True + + # Wait for process to terminate gracefully + start_time = time.time() + while time.time() - start_time < timeout: + try: + os.kill(pid, 0) # Check if process still exists + time.sleep(0.1) + except ProcessLookupError: + # Process terminated + return True + + # Process didn't terminate, escalate to SIGKILL + try: + os.kill(pid, signal.SIGKILL) + time.sleep(0.5) # Brief wait after SIGKILL + # Verify it's dead + try: + os.kill(pid, 0) + # Process still alive after SIGKILL - this is unusual + return False + except ProcessLookupError: + # Process finally dead + return True + except ProcessLookupError: + # Process died between timeout and SIGKILL + return True + + except (OSError, ValueError): return False diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index ee1c88fc..918b2bba 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -131,6 +131,10 @@ class Worker: self.pid = os.getpid() # Register this worker process in the database self.db_process = Process.current() + # Explicitly set process_type to WORKER to prevent mis-detection + if self.db_process.process_type != Process.TypeChoices.WORKER: + self.db_process.process_type = Process.TypeChoices.WORKER + self.db_process.save(update_fields=['process_type']) # Determine worker type for logging worker_type_name = self.__class__.__name__ @@ -312,10 +316,12 @@ class Worker: Process.cleanup_stale_running() # Convert Process objects to dicts to match the expected API contract processes = Process.get_running(process_type=Process.TypeChoices.WORKER) + # Note: worker_id is not stored on Process model, it's dynamically generated + # We return process_id (UUID) and pid (OS process ID) instead return [ { 'pid': p.pid, - 'worker_id': p.id, + 'process_id': str(p.id), # UUID of Process record 'started_at': p.started_at.isoformat() if p.started_at else None, 'status': p.status, } @@ -420,7 +426,7 @@ class ArchiveResultWorker(Worker): from archivebox.machine.models import Process if worker_id is None: - worker_id = Process.get_next_worker_id(process_type=cls.name) + worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER) # Use module-level function for pickling compatibility proc = MPProcess(