diff --git a/TODO_process_tracking.md b/TODO_process_tracking.md index 603e3256..ef18aef1 100644 --- a/TODO_process_tracking.md +++ b/TODO_process_tracking.md @@ -67,7 +67,139 @@ class Process(ModelWithHealthStats): ) ``` -### 1.3 Add Helper Methods for Tree Traversal +### 1.3 Add `Process.current()` Class Method (like `Machine.current()`) + +Following the pattern established by `Machine.current()`, add a method to get-or-create the Process record for the current OS process: + +```python +_CURRENT_PROCESS = None +PROCESS_RECHECK_INTERVAL = 60 # Re-validate every 60 seconds + +class ProcessManager(models.Manager): + def current(self) -> 'Process': + return Process.current() + + +class Process(ModelWithHealthStats): + # ... existing fields ... + + objects: ProcessManager = ProcessManager() + + @classmethod + def current(cls) -> 'Process': + """ + Get or create the Process record for the current OS process. + + Similar to Machine.current(), this: + 1. Checks cache for existing Process with matching PID + 2. Validates the cached Process is still valid (PID not reused) + 3. Creates new Process if needed + + Uses os.getpid() to identify current process and os.getppid() to + find parent Process record. + """ + global _CURRENT_PROCESS + + current_pid = os.getpid() + + # Check cache validity + if _CURRENT_PROCESS: + # Verify cached process matches current PID and hasn't expired + if (_CURRENT_PROCESS.pid == current_pid and + timezone.now() < _CURRENT_PROCESS.modified_at + timedelta(seconds=PROCESS_RECHECK_INTERVAL)): + return _CURRENT_PROCESS + _CURRENT_PROCESS = None + + machine = Machine.current() + + # Try to find existing Process for this PID on this machine + existing = cls.objects.filter( + machine=machine, + pid=current_pid, + status=cls.StatusChoices.RUNNING, + ).first() + + if existing: + # Validate it's actually our process (check start time matches) + try: + import psutil + proc = psutil.Process(current_pid) + if abs(existing.started_at.timestamp() - proc.create_time()) < 5.0: + _CURRENT_PROCESS = existing + return existing + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + # Create new Process record + parent = cls._find_parent_process() + process_type = cls._detect_process_type() + + _CURRENT_PROCESS = cls.objects.create( + machine=machine, + parent=parent, + process_type=process_type, + cmd=sys.argv, + pwd=os.getcwd(), + pid=current_pid, + started_at=timezone.now(), + status=cls.StatusChoices.RUNNING, + ) + return _CURRENT_PROCESS + + @classmethod + def _find_parent_process(cls) -> 'Process | None': + """ + Find the parent Process record by looking up PPID. + + Returns None if parent is not an ArchiveBox process. + """ + ppid = os.getppid() + machine = Machine.current() + + return cls.objects.filter( + machine=machine, + pid=ppid, + status=cls.StatusChoices.RUNNING, + ).first() + + @classmethod + def _detect_process_type(cls) -> str: + """ + Detect the type of the current process from sys.argv. + """ + argv_str = ' '.join(sys.argv).lower() + + if 'supervisord' in argv_str: + return cls.TypeChoices.SUPERVISORD + elif 'orchestrator' in argv_str: + return cls.TypeChoices.ORCHESTRATOR + elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']): + return cls.TypeChoices.WORKER + elif 'archivebox' in argv_str: + return cls.TypeChoices.CLI + else: + return cls.TypeChoices.BINARY +``` + +**Key Benefits:** +- **Automatic hierarchy**: Calling `Process.current()` from anywhere auto-links to parent +- **Cached**: Like `Machine.current()`, avoids repeated DB queries +- **Validated**: Checks PID hasn't been reused via psutil +- **Self-healing**: Creates missing records on-demand + +**Usage pattern:** +```python +# In any ArchiveBox code that spawns a subprocess: +parent = Process.current() # Get/create record for THIS process +child = Process.objects.create( + parent=parent, + cmd=['wget', ...], + ... +) +child.launch() +``` + +### 1.4 Add Helper Methods for Tree Traversal ```python class Process(ModelWithHealthStats): @@ -431,55 +563,40 @@ def process_hook_binary_records( ## Phase 3: Worker System Changes -### 3.1 Track Worker Processes in Database +### 3.1 Track Worker Processes in Database (Simplified with Process.current()) **File:** `archivebox/workers/worker.py` -Currently uses `multiprocessing.Process` and PID files. Add database tracking: +With `Process.current()`, tracking becomes trivial: ```python class Worker: # ... existing code ... - db_process: 'Process | None' = None # NEW: database Process record + db_process: 'Process | None' = None # Database Process record def on_startup(self) -> None: """Called when worker starts.""" - from archivebox.machine.models import Process, Machine + from archivebox.machine.models import Process self.pid = os.getpid() self.pid_file = write_pid_file(self.name, self.worker_id) - # NEW: Create database Process record - self.db_process = Process.objects.create( - machine=Machine.current(), - parent=self._get_parent_process(), # Find orchestrator's Process - process_type=Process.TypeChoices.WORKER, - cmd=['archivebox', 'manage', self.name, f'--worker-id={self.worker_id}'], - pwd=str(settings.DATA_DIR), - pid=self.pid, - started_at=timezone.now(), - status=Process.StatusChoices.RUNNING, - ) + # Process.current() automatically: + # - Creates record with correct process_type (detected from sys.argv) + # - Finds parent via PPID (orchestrator) + # - Sets machine, pid, started_at, status + self.db_process = Process.current() # ... existing logging ... - def _get_parent_process(self) -> 'Process | None': - """Find the orchestrator's Process record.""" - from archivebox.machine.models import Process - - # Look for running orchestrator process on this machine - return Process.objects.filter( - machine=Machine.current(), - process_type=Process.TypeChoices.ORCHESTRATOR, - status=Process.StatusChoices.RUNNING, - ).first() + # _get_parent_process() NO LONGER NEEDED - Process.current() uses PPID def on_shutdown(self, error: BaseException | None = None) -> None: """Called when worker shuts down.""" # ... existing code ... - # NEW: Update database Process record + # Update database Process record if self.db_process: self.db_process.exit_code = 0 if error is None else 1 self.db_process.ended_at = timezone.now() @@ -489,7 +606,7 @@ class Worker: self.db_process.save() ``` -### 3.2 Track Orchestrator Process +### 3.2 Track Orchestrator Process (Simplified) **File:** `archivebox/workers/orchestrator.py` @@ -501,36 +618,19 @@ class Orchestrator: def on_startup(self) -> None: """Called when orchestrator starts.""" - from archivebox.machine.models import Process, Machine + from archivebox.machine.models import Process self.pid = os.getpid() self.pid_file = write_pid_file('orchestrator', worker_id=0) - # NEW: Create database Process record - self.db_process = Process.objects.create( - machine=Machine.current(), - parent=self._get_parent_process(), # Find supervisord's Process - process_type=Process.TypeChoices.ORCHESTRATOR, - cmd=['archivebox', 'manage', 'orchestrator'], - pwd=str(settings.DATA_DIR), - pid=self.pid, - started_at=timezone.now(), - status=Process.StatusChoices.RUNNING, - ) + # Process.current() handles everything: + # - Detects type as ORCHESTRATOR from "orchestrator" in sys.argv + # - Finds parent (supervisord) via PPID lookup + self.db_process = Process.current() # ... existing logging ... - def _get_parent_process(self) -> 'Process | None': - """Find supervisord's Process record (if running under supervisord).""" - from archivebox.machine.models import Process - - if os.environ.get('IS_SUPERVISORD_PARENT'): - return Process.objects.filter( - machine=Machine.current(), - process_type=Process.TypeChoices.SUPERVISORD, - status=Process.StatusChoices.RUNNING, - ).first() - return None + # _get_parent_process() NO LONGER NEEDED ``` ### 3.3 Track Supervisord Process @@ -568,28 +668,19 @@ def start_new_supervisord_process(daemonize=False): ## Phase 4: CLI Entry Point Changes -### 4.1 Create Root Process on CLI Invocation +### 4.1 Simplified: Just Call `Process.current()` + +With `Process.current()` implemented, CLI entry becomes trivial: **File:** `archivebox/__main__.py` or `archivebox/cli/__init__.py` ```python def main(): - from archivebox.machine.models import Process, Machine + from archivebox.machine.models import Process - # Create root Process record for this CLI invocation - cli_process = Process.objects.create( - machine=Machine.current(), - parent=None, # Root of the tree - process_type=Process.TypeChoices.CLI, - cmd=sys.argv, - pwd=os.getcwd(), - pid=os.getpid(), - started_at=timezone.now(), - status=Process.StatusChoices.RUNNING, - ) - - # Store in thread-local or context for child processes to find - set_current_cli_process(cli_process) + # Process.current() auto-creates the CLI process record + # It detects process_type from sys.argv, finds parent via PPID + cli_process = Process.current() try: # ... existing CLI dispatch ... @@ -605,24 +696,23 @@ def main(): cli_process.save() ``` -### 4.2 Context Management for Parent Process Discovery +**That's it!** No thread-local context needed. `Process.current()` handles: +- Creating the record with correct `process_type` +- Finding parent via PPID lookup +- Caching to avoid repeated queries +- Validating PID hasn't been reused + +### 4.2 Context Management (DEPRECATED - Replaced by Process.current()) + +~~The following is no longer needed since `Process.current()` uses PPID lookup:~~ ```python -# archivebox/machine/context.py +# archivebox/machine/context.py - NO LONGER NEEDED -import threading -from typing import Optional - -_cli_process_local = threading.local() - -def set_current_cli_process(process: 'Process') -> None: - """Set the current CLI process for this thread.""" - _cli_process_local.process = process - -def get_current_cli_process() -> Optional['Process']: - """Get the current CLI process for this thread.""" - return getattr(_cli_process_local, 'process', None) +# Process.current() replaces all of this by using os.getppid() +# to find parent Process records automatically. +# OLD approach (don't use): def get_cli_process() -> Optional['Process']: """ Find the CLI process that started this execution. @@ -838,18 +928,19 @@ class ProcessAdmin(admin.ModelAdmin): | File | Changes | |------|---------| -| `archivebox/machine/models.py` | Add `parent` FK, `process_type` field, lifecycle methods | +| `archivebox/machine/models.py` | Add `parent` FK, `process_type` field, `Process.current()`, lifecycle methods | | `archivebox/machine/migrations/XXXX_*.py` | New migration for schema changes | | `archivebox/machine/admin.py` | Update admin with tree visualization | -| `archivebox/machine/context.py` | NEW: Thread-local context for process discovery | | `archivebox/hooks.py` | Update `run_hook()` to create/use Process records | -| `archivebox/workers/worker.py` | Add database Process tracking | -| `archivebox/workers/orchestrator.py` | Add database Process tracking | -| `archivebox/workers/supervisord_util.py` | Add database Process tracking | -| `archivebox/core/models.py` | Update ArchiveResult to pass parent process | -| `archivebox/__main__.py` or CLI entry | Create root CLI Process | +| `archivebox/workers/worker.py` | Simplify: just call `Process.current()` in `on_startup()` | +| `archivebox/workers/orchestrator.py` | Simplify: just call `Process.current()` in `on_startup()` | +| `archivebox/workers/supervisord_util.py` | Add `Process.current()` call when starting supervisord | +| `archivebox/core/models.py` | Update ArchiveResult to use `Process.current()` as parent | +| `archivebox/__main__.py` or CLI entry | Call `Process.current()` at startup, update on exit | | `archivebox/misc/process_utils.py` | Keep as low-level utilities (called by Process methods) | +**Note:** `archivebox/machine/context.py` is NOT needed - `Process.current()` uses PPID lookup instead of thread-local context. + --- ## Testing Plan