diff --git a/TODO_process_tracking.md b/TODO_process_tracking.md index fe8005e5..570c3c6e 100644 --- a/TODO_process_tracking.md +++ b/TODO_process_tracking.md @@ -1728,8 +1728,8 @@ The goal is to consolidate all subprocess management into `Process` model method | `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` | | `remove_pid_file(path)` | Manual cleanup in `Process.kill()` and legacy hook cleanup code | | `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` | -| `get_all_pid_files()` | `Process.objects.filter(status='running')` | -| `get_all_worker_pids(type)` | `Process.objects.filter(process_type=type, status='running')` | +| `get_all_pid_files()` | `Process.objects.filter(machine=Machine.current(), status=Process.StatusChoices.RUNNING)` | +| `get_all_worker_pids(type)` | `Process.objects.filter(machine=Machine.current(), process_type=type, status=Process.StatusChoices.RUNNING)` | | `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` | | `get_running_worker_count(type)` | `Process.objects.filter(...).count()` | | `get_next_worker_id(type)` | Use `Max(worker_id)+1` under transaction or DB sequence to avoid race conditions | @@ -1808,7 +1808,7 @@ for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): def cleanup(self): # Kill all running child processes for this crawl for snapshot in self.snapshot_set.all(): - for ar in snapshot.archiveresult_set.filter(status='started'): + for ar in snapshot.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED): if ar.process_id: # Kill hook process and all its children ar.process.kill() diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 49f7e89a..c3b588c4 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -424,9 +424,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith if self.OUTPUT_DIR.exists(): for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): cmd_file = pid_file.parent / 'cmd.sh' - # Only delete PID file if kill succeeded or process is already dead + # safe_kill_process now waits for termination and escalates to SIGKILL + # Returns True only if process is confirmed dead killed = safe_kill_process(pid_file, cmd_file) - if killed or not pid_file.exists(): + if killed: pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks diff --git a/archivebox/machine/migrations/0002_process_parent_and_type.py b/archivebox/machine/migrations/0002_process_parent_and_type.py index ba908467..e70de360 100644 --- a/archivebox/machine/migrations/0002_process_parent_and_type.py +++ b/archivebox/machine/migrations/0002_process_parent_and_type.py @@ -30,17 +30,9 @@ class Migration(migrations.Migration): -- Add composite index for machine + pid + started_at (for PID reuse protection) CREATE INDEX IF NOT EXISTS machine_process_machine_pid_started_idx ON machine_process(machine_id, pid, started_at); """, - reverse_sql=""" - DROP INDEX IF EXISTS machine_process_machine_pid_started_idx; - DROP INDEX IF EXISTS machine_process_parent_status_idx; - DROP INDEX IF EXISTS machine_process_process_type_idx; - DROP INDEX IF EXISTS machine_process_parent_id_idx; - - -- SQLite doesn't support DROP COLUMN directly, but we record the intent - -- In practice, this migration is forward-only for SQLite - -- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN process_type; - -- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN parent_id; - """ + # Migration is irreversible due to SQLite limitations + # SQLite doesn't support DROP COLUMN, would require table rebuild + reverse_sql=migrations.RunSQL.noop ), ], state_operations=[ @@ -75,7 +67,21 @@ class Migration(migrations.Migration): max_length=16, ), ), - # Add indexes + # Add indexes - must match the SQL index names exactly + migrations.AddIndex( + model_name='process', + index=models.Index( + fields=['parent'], + name='machine_process_parent_id_idx', + ), + ), + migrations.AddIndex( + model_name='process', + index=models.Index( + fields=['process_type'], + name='machine_process_process_type_idx', + ), + ), migrations.AddIndex( model_name='process', index=models.Index( diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index ddddc37a..428633b3 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -812,6 +812,16 @@ class Process(ModelWithHealthStats): parent = cls._find_parent_process(machine) process_type = cls._detect_process_type() + # Use psutil cmdline if available (matches what proc() will validate against) + # Otherwise fall back to sys.argv + cmd = sys.argv + if PSUTIL_AVAILABLE: + try: + os_proc = psutil.Process(current_pid) + cmd = os_proc.cmdline() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + # Use psutil start time if available (more accurate than timezone.now()) if os_start_time: started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone()) @@ -822,7 +832,7 @@ class Process(ModelWithHealthStats): machine=machine, parent=parent, process_type=process_type, - cmd=sys.argv, + cmd=cmd, pwd=os.getcwd(), pid=current_pid, started_at=started_at, diff --git a/archivebox/misc/process_utils.py b/archivebox/misc/process_utils.py index 9d3fe52d..15fa861e 100644 --- a/archivebox/misc/process_utils.py +++ b/archivebox/misc/process_utils.py @@ -70,15 +70,54 @@ def write_cmd_file(cmd_file: Path, cmd: list[str]): pass -def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15) -> bool: - """Kill process after validation. Returns True if killed.""" +def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15, timeout: float = 3.0) -> bool: + """ + Kill process after validation, with graceful wait and SIGKILL escalation. + + Returns True only if process is confirmed dead (either already dead or killed successfully). + """ + import time + import signal + if not validate_pid_file(pid_file, cmd_file): pid_file.unlink(missing_ok=True) # Clean stale file - return False + return True # Process already dead, consider it killed try: pid = int(pid_file.read_text().strip()) - os.kill(pid, signal_num) - return True - except (OSError, ValueError, ProcessLookupError): + + # Send initial signal (SIGTERM by default) + try: + os.kill(pid, signal_num) + except ProcessLookupError: + # Process already dead + return True + + # Wait for process to terminate gracefully + start_time = time.time() + while time.time() - start_time < timeout: + try: + os.kill(pid, 0) # Check if process still exists + time.sleep(0.1) + except ProcessLookupError: + # Process terminated + return True + + # Process didn't terminate, escalate to SIGKILL + try: + os.kill(pid, signal.SIGKILL) + time.sleep(0.5) # Brief wait after SIGKILL + # Verify it's dead + try: + os.kill(pid, 0) + # Process still alive after SIGKILL - this is unusual + return False + except ProcessLookupError: + # Process finally dead + return True + except ProcessLookupError: + # Process died between timeout and SIGKILL + return True + + except (OSError, ValueError): return False diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index ee1c88fc..918b2bba 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -131,6 +131,10 @@ class Worker: self.pid = os.getpid() # Register this worker process in the database self.db_process = Process.current() + # Explicitly set process_type to WORKER to prevent mis-detection + if self.db_process.process_type != Process.TypeChoices.WORKER: + self.db_process.process_type = Process.TypeChoices.WORKER + self.db_process.save(update_fields=['process_type']) # Determine worker type for logging worker_type_name = self.__class__.__name__ @@ -312,10 +316,12 @@ class Worker: Process.cleanup_stale_running() # Convert Process objects to dicts to match the expected API contract processes = Process.get_running(process_type=Process.TypeChoices.WORKER) + # Note: worker_id is not stored on Process model, it's dynamically generated + # We return process_id (UUID) and pid (OS process ID) instead return [ { 'pid': p.pid, - 'worker_id': p.id, + 'process_id': str(p.id), # UUID of Process record 'started_at': p.started_at.isoformat() if p.started_at else None, 'status': p.status, } @@ -420,7 +426,7 @@ class ArchiveResultWorker(Worker): from archivebox.machine.models import Process if worker_id is None: - worker_id = Process.get_next_worker_id(process_type=cls.name) + worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER) # Use module-level function for pickling compatibility proc = MPProcess(