Add Process.current() to implementation plan

Key addition: Process.current() class method (like Machine.current())
that auto-creates/retrieves the Process record for the current OS process.

Benefits:
- Uses PPID lookup to find parent Process automatically
- Detects process_type from sys.argv
- Cached with validation (like Machine.current())
- Eliminates need for thread-local context management

Simplified Phase 3 (workers) and Phase 4 (CLI) to just call
Process.current() instead of manual Process creation.
This commit is contained in:
Claude
2025-12-31 00:32:05 +00:00
parent f21fb55a2c
commit 4c4c065697

View File

@@ -67,7 +67,139 @@ class Process(ModelWithHealthStats):
)
```
### 1.3 Add Helper Methods for Tree Traversal
### 1.3 Add `Process.current()` Class Method (like `Machine.current()`)
Following the pattern established by `Machine.current()`, add a method to get-or-create the Process record for the current OS process:
```python
_CURRENT_PROCESS = None
PROCESS_RECHECK_INTERVAL = 60 # Re-validate every 60 seconds
class ProcessManager(models.Manager):
def current(self) -> 'Process':
return Process.current()
class Process(ModelWithHealthStats):
# ... existing fields ...
objects: ProcessManager = ProcessManager()
@classmethod
def current(cls) -> 'Process':
"""
Get or create the Process record for the current OS process.
Similar to Machine.current(), this:
1. Checks cache for existing Process with matching PID
2. Validates the cached Process is still valid (PID not reused)
3. Creates new Process if needed
Uses os.getpid() to identify current process and os.getppid() to
find parent Process record.
"""
global _CURRENT_PROCESS
current_pid = os.getpid()
# Check cache validity
if _CURRENT_PROCESS:
# Verify cached process matches current PID and hasn't expired
if (_CURRENT_PROCESS.pid == current_pid and
timezone.now() < _CURRENT_PROCESS.modified_at + timedelta(seconds=PROCESS_RECHECK_INTERVAL)):
return _CURRENT_PROCESS
_CURRENT_PROCESS = None
machine = Machine.current()
# Try to find existing Process for this PID on this machine
existing = cls.objects.filter(
machine=machine,
pid=current_pid,
status=cls.StatusChoices.RUNNING,
).first()
if existing:
# Validate it's actually our process (check start time matches)
try:
import psutil
proc = psutil.Process(current_pid)
if abs(existing.started_at.timestamp() - proc.create_time()) < 5.0:
_CURRENT_PROCESS = existing
return existing
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Create new Process record
parent = cls._find_parent_process()
process_type = cls._detect_process_type()
_CURRENT_PROCESS = cls.objects.create(
machine=machine,
parent=parent,
process_type=process_type,
cmd=sys.argv,
pwd=os.getcwd(),
pid=current_pid,
started_at=timezone.now(),
status=cls.StatusChoices.RUNNING,
)
return _CURRENT_PROCESS
@classmethod
def _find_parent_process(cls) -> 'Process | None':
"""
Find the parent Process record by looking up PPID.
Returns None if parent is not an ArchiveBox process.
"""
ppid = os.getppid()
machine = Machine.current()
return cls.objects.filter(
machine=machine,
pid=ppid,
status=cls.StatusChoices.RUNNING,
).first()
@classmethod
def _detect_process_type(cls) -> str:
"""
Detect the type of the current process from sys.argv.
"""
argv_str = ' '.join(sys.argv).lower()
if 'supervisord' in argv_str:
return cls.TypeChoices.SUPERVISORD
elif 'orchestrator' in argv_str:
return cls.TypeChoices.ORCHESTRATOR
elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']):
return cls.TypeChoices.WORKER
elif 'archivebox' in argv_str:
return cls.TypeChoices.CLI
else:
return cls.TypeChoices.BINARY
```
**Key Benefits:**
- **Automatic hierarchy**: Calling `Process.current()` from anywhere auto-links to parent
- **Cached**: Like `Machine.current()`, avoids repeated DB queries
- **Validated**: Checks PID hasn't been reused via psutil
- **Self-healing**: Creates missing records on-demand
**Usage pattern:**
```python
# In any ArchiveBox code that spawns a subprocess:
parent = Process.current() # Get/create record for THIS process
child = Process.objects.create(
parent=parent,
cmd=['wget', ...],
...
)
child.launch()
```
### 1.4 Add Helper Methods for Tree Traversal
```python
class Process(ModelWithHealthStats):
@@ -431,55 +563,40 @@ def process_hook_binary_records(
## Phase 3: Worker System Changes
### 3.1 Track Worker Processes in Database
### 3.1 Track Worker Processes in Database (Simplified with Process.current())
**File:** `archivebox/workers/worker.py`
Currently uses `multiprocessing.Process` and PID files. Add database tracking:
With `Process.current()`, tracking becomes trivial:
```python
class Worker:
# ... existing code ...
db_process: 'Process | None' = None # NEW: database Process record
db_process: 'Process | None' = None # Database Process record
def on_startup(self) -> None:
"""Called when worker starts."""
from archivebox.machine.models import Process, Machine
from archivebox.machine.models import Process
self.pid = os.getpid()
self.pid_file = write_pid_file(self.name, self.worker_id)
# NEW: Create database Process record
self.db_process = Process.objects.create(
machine=Machine.current(),
parent=self._get_parent_process(), # Find orchestrator's Process
process_type=Process.TypeChoices.WORKER,
cmd=['archivebox', 'manage', self.name, f'--worker-id={self.worker_id}'],
pwd=str(settings.DATA_DIR),
pid=self.pid,
started_at=timezone.now(),
status=Process.StatusChoices.RUNNING,
)
# Process.current() automatically:
# - Creates record with correct process_type (detected from sys.argv)
# - Finds parent via PPID (orchestrator)
# - Sets machine, pid, started_at, status
self.db_process = Process.current()
# ... existing logging ...
def _get_parent_process(self) -> 'Process | None':
"""Find the orchestrator's Process record."""
from archivebox.machine.models import Process
# Look for running orchestrator process on this machine
return Process.objects.filter(
machine=Machine.current(),
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
).first()
# _get_parent_process() NO LONGER NEEDED - Process.current() uses PPID
def on_shutdown(self, error: BaseException | None = None) -> None:
"""Called when worker shuts down."""
# ... existing code ...
# NEW: Update database Process record
# Update database Process record
if self.db_process:
self.db_process.exit_code = 0 if error is None else 1
self.db_process.ended_at = timezone.now()
@@ -489,7 +606,7 @@ class Worker:
self.db_process.save()
```
### 3.2 Track Orchestrator Process
### 3.2 Track Orchestrator Process (Simplified)
**File:** `archivebox/workers/orchestrator.py`
@@ -501,36 +618,19 @@ class Orchestrator:
def on_startup(self) -> None:
"""Called when orchestrator starts."""
from archivebox.machine.models import Process, Machine
from archivebox.machine.models import Process
self.pid = os.getpid()
self.pid_file = write_pid_file('orchestrator', worker_id=0)
# NEW: Create database Process record
self.db_process = Process.objects.create(
machine=Machine.current(),
parent=self._get_parent_process(), # Find supervisord's Process
process_type=Process.TypeChoices.ORCHESTRATOR,
cmd=['archivebox', 'manage', 'orchestrator'],
pwd=str(settings.DATA_DIR),
pid=self.pid,
started_at=timezone.now(),
status=Process.StatusChoices.RUNNING,
)
# Process.current() handles everything:
# - Detects type as ORCHESTRATOR from "orchestrator" in sys.argv
# - Finds parent (supervisord) via PPID lookup
self.db_process = Process.current()
# ... existing logging ...
def _get_parent_process(self) -> 'Process | None':
"""Find supervisord's Process record (if running under supervisord)."""
from archivebox.machine.models import Process
if os.environ.get('IS_SUPERVISORD_PARENT'):
return Process.objects.filter(
machine=Machine.current(),
process_type=Process.TypeChoices.SUPERVISORD,
status=Process.StatusChoices.RUNNING,
).first()
return None
# _get_parent_process() NO LONGER NEEDED
```
### 3.3 Track Supervisord Process
@@ -568,28 +668,19 @@ def start_new_supervisord_process(daemonize=False):
## Phase 4: CLI Entry Point Changes
### 4.1 Create Root Process on CLI Invocation
### 4.1 Simplified: Just Call `Process.current()`
With `Process.current()` implemented, CLI entry becomes trivial:
**File:** `archivebox/__main__.py` or `archivebox/cli/__init__.py`
```python
def main():
from archivebox.machine.models import Process, Machine
from archivebox.machine.models import Process
# Create root Process record for this CLI invocation
cli_process = Process.objects.create(
machine=Machine.current(),
parent=None, # Root of the tree
process_type=Process.TypeChoices.CLI,
cmd=sys.argv,
pwd=os.getcwd(),
pid=os.getpid(),
started_at=timezone.now(),
status=Process.StatusChoices.RUNNING,
)
# Store in thread-local or context for child processes to find
set_current_cli_process(cli_process)
# Process.current() auto-creates the CLI process record
# It detects process_type from sys.argv, finds parent via PPID
cli_process = Process.current()
try:
# ... existing CLI dispatch ...
@@ -605,24 +696,23 @@ def main():
cli_process.save()
```
### 4.2 Context Management for Parent Process Discovery
**That's it!** No thread-local context needed. `Process.current()` handles:
- Creating the record with correct `process_type`
- Finding parent via PPID lookup
- Caching to avoid repeated queries
- Validating PID hasn't been reused
### 4.2 Context Management (DEPRECATED - Replaced by Process.current())
~~The following is no longer needed since `Process.current()` uses PPID lookup:~~
```python
# archivebox/machine/context.py
# archivebox/machine/context.py - NO LONGER NEEDED
import threading
from typing import Optional
_cli_process_local = threading.local()
def set_current_cli_process(process: 'Process') -> None:
"""Set the current CLI process for this thread."""
_cli_process_local.process = process
def get_current_cli_process() -> Optional['Process']:
"""Get the current CLI process for this thread."""
return getattr(_cli_process_local, 'process', None)
# Process.current() replaces all of this by using os.getppid()
# to find parent Process records automatically.
# OLD approach (don't use):
def get_cli_process() -> Optional['Process']:
"""
Find the CLI process that started this execution.
@@ -838,18 +928,19 @@ class ProcessAdmin(admin.ModelAdmin):
| File | Changes |
|------|---------|
| `archivebox/machine/models.py` | Add `parent` FK, `process_type` field, lifecycle methods |
| `archivebox/machine/models.py` | Add `parent` FK, `process_type` field, `Process.current()`, lifecycle methods |
| `archivebox/machine/migrations/XXXX_*.py` | New migration for schema changes |
| `archivebox/machine/admin.py` | Update admin with tree visualization |
| `archivebox/machine/context.py` | NEW: Thread-local context for process discovery |
| `archivebox/hooks.py` | Update `run_hook()` to create/use Process records |
| `archivebox/workers/worker.py` | Add database Process tracking |
| `archivebox/workers/orchestrator.py` | Add database Process tracking |
| `archivebox/workers/supervisord_util.py` | Add database Process tracking |
| `archivebox/core/models.py` | Update ArchiveResult to pass parent process |
| `archivebox/__main__.py` or CLI entry | Create root CLI Process |
| `archivebox/workers/worker.py` | Simplify: just call `Process.current()` in `on_startup()` |
| `archivebox/workers/orchestrator.py` | Simplify: just call `Process.current()` in `on_startup()` |
| `archivebox/workers/supervisord_util.py` | Add `Process.current()` call when starting supervisord |
| `archivebox/core/models.py` | Update ArchiveResult to use `Process.current()` as parent |
| `archivebox/__main__.py` or CLI entry | Call `Process.current()` at startup, update on exit |
| `archivebox/misc/process_utils.py` | Keep as low-level utilities (called by Process methods) |
**Note:** `archivebox/machine/context.py` is NOT needed - `Process.current()` uses PPID lookup instead of thread-local context.
---
## Testing Plan