diff --git a/TODO_process_tracking.md b/TODO_process_tracking.md index 4ecf55a7..fe8005e5 100644 --- a/TODO_process_tracking.md +++ b/TODO_process_tracking.md @@ -1726,14 +1726,14 @@ The goal is to consolidate all subprocess management into `Process` model method |------------------|-------------| | `write_pid_file(worker_type, worker_id)` | `Process.current()` auto-creates | | `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` | -| `remove_pid_file(path)` | Automatic on `Process.status = EXITED` | +| `remove_pid_file(path)` | Manual cleanup in `Process.kill()` and legacy hook cleanup code | | `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` | | `get_all_pid_files()` | `Process.objects.filter(status='running')` | | `get_all_worker_pids(type)` | `Process.objects.filter(process_type=type, status='running')` | | `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` | | `get_running_worker_count(type)` | `Process.objects.filter(...).count()` | -| `get_next_worker_id(type)` | Derive from `Process.objects.filter(...).count()` | -| `stop_worker(pid, graceful)` | `Process.kill(signal_num=SIGTERM)` then `Process.kill(SIGKILL)` | +| `get_next_worker_id(type)` | Use `Max(worker_id)+1` under transaction or DB sequence to avoid race conditions | +| `stop_worker(pid, graceful)` | `Process.terminate(graceful_timeout)` or `Process.kill_tree()` | #### `hooks.py` Changes @@ -1752,10 +1752,13 @@ with open(stdout_file, 'w') as out, open(stderr_file, 'w') as err: **New `run_hook()` using Process:** ```python +# Only store env delta or allowlist to avoid leaking secrets +env_delta = {k: v for k, v in env.items() if k in ALLOWED_ENV_VARS} + hook_process = Process.objects.create( parent=parent_process, process_type=Process.TypeChoices.HOOK, - cmd=cmd, pwd=str(output_dir), env=env, timeout=timeout, + cmd=cmd, pwd=str(output_dir), env=env_delta, timeout=timeout, ) hook_process.launch(background=is_background) # stdout/stderr/pid_file all handled internally by Process.launch() diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index abf21175..49f7e89a 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -424,8 +424,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith if self.OUTPUT_DIR.exists(): for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): cmd_file = pid_file.parent / 'cmd.sh' - safe_kill_process(pid_file, cmd_file) - pid_file.unlink(missing_ok=True) + # Only delete PID file if kill succeeded or process is already dead + killed = safe_kill_process(pid_file, cmd_file) + if killed or not pid_file.exists(): + pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks from archivebox.config.configset import get_config diff --git a/archivebox/machine/migrations/0002_process_parent_and_type.py b/archivebox/machine/migrations/0002_process_parent_and_type.py index 3b2c8ceb..ba908467 100644 --- a/archivebox/machine/migrations/0002_process_parent_and_type.py +++ b/archivebox/machine/migrations/0002_process_parent_and_type.py @@ -70,7 +70,6 @@ class Migration(migrations.Migration): ('hook', 'Hook Script'), ('binary', 'Binary Execution'), ], - db_index=True, default='binary', help_text='Type of process in the execution hierarchy', max_length=16, @@ -81,14 +80,14 @@ class Migration(migrations.Migration): model_name='process', index=models.Index( fields=['parent', 'status'], - name='machine_pro_parent__status_idx', + name='machine_process_parent_status_idx', ), ), migrations.AddIndex( model_name='process', index=models.Index( fields=['machine', 'pid', 'started_at'], - name='machine_pro_machine_pid_idx', + name='machine_process_machine_pid_started_idx', ), ), ], diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index 4bac79d6..ddddc37a 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -914,7 +914,7 @@ class Process(ModelWithHealthStats): # Check if too old (PID definitely reused) if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW: is_stale = True - elif PSUTIL_AVAILABLE: + elif PSUTIL_AVAILABLE and proc.pid is not None: # Check if OS process still exists with matching start time try: os_proc = psutil.Process(proc.pid) @@ -1147,9 +1147,12 @@ class Process(ModelWithHealthStats): import subprocess import time + # Validate pwd is set (required for output files) + if not self.pwd: + raise ValueError("Process.pwd must be set before calling launch()") + # Ensure output directory exists - if self.pwd: - Path(self.pwd).mkdir(parents=True, exist_ok=True) + Path(self.pwd).mkdir(parents=True, exist_ok=True) # Write cmd.sh for debugging self._write_cmd_file() @@ -1232,7 +1235,8 @@ class Process(ModelWithHealthStats): proc.send_signal(signal_num) # Update our record - self.exit_code = -signal_num + # Use standard Unix convention: 128 + signal number + self.exit_code = 128 + signal_num self.ended_at = timezone.now() self.status = self.StatusChoices.EXITED self.save() @@ -1336,9 +1340,10 @@ class Process(ModelWithHealthStats): # Step 2: Wait for graceful exit try: - proc.wait(timeout=graceful_timeout) + exit_status = proc.wait(timeout=graceful_timeout) # Process exited gracefully - self.exit_code = proc.returncode if hasattr(proc, 'returncode') else 0 + # psutil.Process.wait() returns the exit status + self.exit_code = exit_status if exit_status is not None else 0 self.status = self.StatusChoices.EXITED self.ended_at = timezone.now() self.save() @@ -1350,7 +1355,8 @@ class Process(ModelWithHealthStats): proc.kill() proc.wait(timeout=2) - self.exit_code = -signal.SIGKILL + # Use standard Unix convention: 128 + signal number + self.exit_code = 128 + signal.SIGKILL self.status = self.StatusChoices.EXITED self.ended_at = timezone.now() self.save() @@ -1398,6 +1404,7 @@ class Process(ModelWithHealthStats): try: child.terminate() except (psutil.NoSuchProcess, psutil.AccessDenied): + # Child already dead or we don't have permission - continue pass # Wait briefly for children to exit @@ -1410,6 +1417,7 @@ class Process(ModelWithHealthStats): child.kill() killed_count += 1 except (psutil.NoSuchProcess, psutil.AccessDenied): + # Child exited or we don't have permission - continue pass # Now kill self diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index 370adf85..bb0046f7 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -72,6 +72,7 @@ class Orchestrator: self.pid: int = os.getpid() self.pid_file = None self.idle_count: int = 0 + self._last_cleanup_time: float = 0.0 # For throttling cleanup_stale_running() def __repr__(self) -> str: return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]' @@ -81,15 +82,21 @@ class Orchestrator: """Check if an orchestrator is already running.""" from archivebox.machine.models import Process - return Process.get_running_count(process_type='orchestrator') > 0 + # Clean up stale processes before counting + Process.cleanup_stale_running() + return Process.get_running_count(process_type=Process.TypeChoices.ORCHESTRATOR) > 0 def on_startup(self) -> None: """Called when orchestrator starts.""" from archivebox.machine.models import Process self.pid = os.getpid() - # Register orchestrator process in database + # Register orchestrator process in database with explicit type self.db_process = Process.current() + # Ensure the process type is correctly set to ORCHESTRATOR + if self.db_process.process_type != Process.TypeChoices.ORCHESTRATOR: + self.db_process.process_type = Process.TypeChoices.ORCHESTRATOR + self.db_process.save(update_fields=['process_type']) # Clean up any stale Process records from previous runs stale_count = Process.cleanup_stale_running() @@ -115,7 +122,8 @@ class Orchestrator: """Called when orchestrator shuts down.""" # Update Process record status if hasattr(self, 'db_process') and self.db_process: - self.db_process.exit_code = 1 if error else 0 + # KeyboardInterrupt is a graceful shutdown, not an error + self.db_process.exit_code = 1 if error and not isinstance(error, KeyboardInterrupt) else 0 self.db_process.status = self.db_process.StatusChoices.EXITED self.db_process.ended_at = timezone.now() self.db_process.save() @@ -131,8 +139,15 @@ class Orchestrator: def get_total_worker_count(self) -> int: """Get total count of running workers across all types.""" from archivebox.machine.models import Process + import time + + # Throttle cleanup to once every 30 seconds to avoid performance issues + CLEANUP_THROTTLE_SECONDS = 30 + now = time.time() + if now - self._last_cleanup_time > CLEANUP_THROTTLE_SECONDS: + Process.cleanup_stale_running() + self._last_cleanup_time = now - Process.cleanup_stale_running() return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES) def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool: diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index a8a7851e..ee1c88fc 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -290,7 +290,7 @@ class Worker: from archivebox.machine.models import Process if worker_id is None: - worker_id = Process.get_next_worker_id(process_type=cls.name) + worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER) # Use module-level function for pickling compatibility proc = MPProcess( @@ -310,14 +310,24 @@ class Worker: from archivebox.machine.models import Process Process.cleanup_stale_running() - return list(Process.get_running(process_type=cls.name)) + # Convert Process objects to dicts to match the expected API contract + processes = Process.get_running(process_type=Process.TypeChoices.WORKER) + return [ + { + 'pid': p.pid, + 'worker_id': p.id, + 'started_at': p.started_at.isoformat() if p.started_at else None, + 'status': p.status, + } + for p in processes + ] @classmethod def get_worker_count(cls) -> int: """Get count of running workers of this type.""" from archivebox.machine.models import Process - return Process.get_running_count(process_type=cls.name) + return Process.get_running_count(process_type=Process.TypeChoices.WORKER) class CrawlWorker(Worker):