Files
ArchiveBox/old/TODO_Process_cleanup_unification.md

334 lines
10 KiB
Markdown

# Process Model Integration Plan
## Current Architecture
### Hook Execution Flow
```
Orchestrator
├─> CrawlWorker
│ └─> Crawl.run() [state machine @started.enter]
│ └─> run_hook() for on_Crawl__* hooks
│ └─> subprocess.Popen (NOT using Process model)
└─> SnapshotWorker
└─> Snapshot.run() [planned - doesn't exist yet]
└─> ArchiveResult.run() [state machine @started.enter]
└─> run_hook() for on_Snapshot__* hooks
└─> subprocess.Popen (NOT using Process model)
```
### Problem
1. **No Process tracking**: `run_hook()` uses `subprocess.Popen` directly, never creates Process records
2. **Orphaned Process model**: Process model has `.launch()`, `.wait()`, `.terminate()` methods that are NEVER used
3. **Manual process management**: SnapshotWorker manually uses psutil for waiting/killing
4. **Duplicate logic**: Process model and run_hook() both do subprocess management independently
## Unified Architecture
### Goal
Make Process model the **single source of truth** for all subprocess operations:
- Hook execution
- PID tracking
- stdout/stderr capture
- Process lifecycle (launch, wait, terminate)
### Design
```python
# hooks.py - Thin wrapper
def run_hook(...) -> Process:
"""
Run a hook using Process model (THIN WRAPPER).
Returns Process model instance for tracking and control.
"""
from archivebox.machine.models import Process
# Build command
cmd = build_hook_cmd(script, kwargs)
# Use Process.launch() - handles everything
process = Process.objects.create(
machine=Machine.current(),
process_type=Process.TypeChoices.HOOK,
pwd=str(output_dir),
cmd=cmd,
env=build_hook_env(config),
timeout=timeout,
)
# Launch subprocess
process.launch(background=is_background_hook(script.name))
return process # Return Process, not dict
# worker.py - Use Process methods
class SnapshotWorker:
def _run_hook(self, hook_path, ar) -> Process:
"""Fork hook using Process model."""
process = run_hook(
hook_path,
ar.create_output_dir(),
self.snapshot.config,
url=self.snapshot.url,
snapshot_id=str(self.snapshot.id),
)
# Link ArchiveResult to Process
ar.process = process
ar.save()
return process
def _wait_for_hook(self, process, ar):
"""Wait using Process.wait() method."""
exit_code = process.wait(timeout=None)
# Update AR from hook output
ar.update_from_output()
ar.status = ar.StatusChoices.SUCCEEDED if exit_code == 0 else ar.StatusChoices.FAILED
ar.save()
def on_shutdown(self):
"""
Terminate all background hooks in parallel with per-plugin timeouts.
Phase 1: Send SIGTERM to all in parallel (polite request to wrap up)
Phase 2: Wait for all in parallel, respecting individual plugin timeouts
Phase 3: SIGKILL any that exceed their timeout
Each plugin has its own timeout (SCREENSHOT_TIMEOUT=60, YTDLP_TIMEOUT=300, etc.)
Some hooks (consolelog, responses) exit immediately on SIGTERM.
Others (ytdlp, wget) need their full timeout to finish actual work.
"""
# Send SIGTERM to all processes in parallel
for hook_name, process in self.background_processes.items():
os.kill(process.pid, signal.SIGTERM)
# Build per-process deadlines based on plugin-specific timeouts
deadlines = {
name: (proc, time.time() + max(0, proc.timeout - (time.time() - proc.started_at.timestamp())))
for name, proc in self.background_processes.items()
}
# Poll all processes in parallel - no head-of-line blocking
still_running = set(deadlines.keys())
while still_running:
time.sleep(0.1)
for name in list(still_running):
proc, deadline = deadlines[name]
if not proc.is_running():
still_running.remove(name)
elif time.time() >= deadline:
os.kill(proc.pid, signal.SIGKILL) # Timeout exceeded
still_running.remove(name)
# models.py - Process becomes active
class Process:
def launch(self, background=False):
"""Spawn subprocess and track it."""
with open(self.stdout_file, 'w') as out, open(self.stderr_file, 'w') as err:
proc = subprocess.Popen(
self.cmd,
cwd=self.pwd,
stdout=out,
stderr=err,
env=self._build_env(),
)
self.pid = proc.pid
self.started_at = timezone.now()
self.status = self.StatusChoices.RUNNING
self.save()
if not background:
# Foreground - wait inline
proc.wait()
self.exit_code = proc.returncode
self.ended_at = timezone.now()
self.status = self.StatusChoices.EXITED
self.save()
return self
def wait(self, timeout=None):
"""Wait for process to exit, polling DB."""
while True:
self.refresh_from_db()
if self.status == self.StatusChoices.EXITED:
return self.exit_code
# Check via psutil if Process died without updating DB
if not self.is_running():
self._reap() # Update status from OS
return self.exit_code
time.sleep(0.1)
def terminate(self, sig=signal.SIGTERM):
"""Gracefully terminate: SIGTERM → wait → SIGKILL."""
if not self.is_running():
return True
os.kill(self.pid, sig)
# Wait for graceful shutdown
for _ in range(50): # 5 seconds
if not self.is_running():
self._reap()
return True
time.sleep(0.1)
# Escalate to SIGKILL
os.kill(self.pid, signal.SIGKILL)
self._reap()
return True
```
## Migration Steps
### Step 1: Update Process.launch() (DONE - already exists)
Process model already has `.launch()`, `.wait()`, `.terminate()` methods implemented in machine/models.py:1295-1593
### Step 2: Refactor run_hook() to use Process.launch()
**File**: `archivebox/hooks.py`
Change signature from:
```python
def run_hook(...) -> HookResult: # Returns dict
```
To:
```python
def run_hook(...) -> Process: # Returns Process model
```
**Implementation**:
```python
def run_hook(script, output_dir, config, timeout=None, **kwargs) -> Process:
from archivebox.machine.models import Process, Machine
# Build command
cmd = build_hook_cmd(script, kwargs)
env = build_hook_env(config)
is_bg = is_background_hook(script.name)
# Create Process record
process = Process.objects.create(
machine=Machine.current(),
process_type=Process.TypeChoices.HOOK,
pwd=str(output_dir),
cmd=cmd,
env=env,
timeout=timeout or 120,
)
# Launch subprocess
process.launch(background=is_bg)
return process
```
### Step 3: Update SnapshotWorker to use Process methods
**File**: `archivebox/workers/worker.py`
Replace manual psutil code with Process model methods (shown above in Design section).
### Step 4: Update ArchiveResult.run() to use new run_hook()
**File**: `archivebox/core/models.py:2559`
Change from:
```python
result = run_hook(...) # Returns HookResult dict
if result is None:
is_bg_hook = True
```
To:
```python
process = run_hook(...) # Returns Process
self.process = process
self.save()
if process.status == Process.StatusChoices.RUNNING:
# Background hook - still running
return
else:
# Foreground hook - completed
self.update_from_output()
```
### Step 5: Update Crawl.run() similarly
**File**: `archivebox/crawls/models.py:374`
Same pattern as ArchiveResult.run()
## Benefits
### 1. Single Source of Truth
- Process model owns ALL subprocess operations
- No duplicate logic between run_hook(), Process, and workers
- Consistent PID tracking, stdout/stderr handling
### 2. Proper Hierarchy
```
Process.parent_id creates tree:
Orchestrator (PID 1000)
└─> CrawlWorker (PID 1001, parent=1000)
└─> on_Crawl__01_chrome.js (PID 1010, parent=1001)
└─> SnapshotWorker (PID 1020, parent=1000)
└─> on_Snapshot__50_wget.py (PID 1021, parent=1020)
└─> on_Snapshot__63_ytdlp.bg.py (PID 1022, parent=1020)
```
### 3. Better Observability
- Query all hook processes: `snapshot.process_set.all()`
- Count running: `Process.objects.filter(status='running').count()`
- Track resource usage via Process.get_memory_info()
### 4. Cleaner Code
- SnapshotWorker._wait_for_hook: 25 lines → 8 lines
- SnapshotWorker.on_shutdown: 12 lines → 7 lines
- run_hook(): ~200 lines → ~50 lines
- Total: ~100 LoC saved
## Risks & Mitigation
### Risk 1: Breaking existing run_hook() callers
**Mitigation**: Two-phase rollout
1. Phase 1: Add run_hook_v2() that returns Process
2. Phase 2: Migrate callers to run_hook_v2()
3. Phase 3: Rename run_hook → run_hook_legacy, run_hook_v2 → run_hook
### Risk 2: Background hook tracking changes
**Mitigation**:
- Process.launch(background=True) handles async launches
- Process.wait() already polls for completion
- Behavior identical to current subprocess.Popen
### Risk 3: Performance overhead (extra DB writes)
**Mitigation**:
- Process records already being created (just not used)
- Batch updates where possible
- Monitor via metrics
## Timeline
### Immediate (This PR)
- [x] State machine fixes (completed)
- [x] Step advancement optimization (completed)
- [x] Document unified architecture (this file)
### Next PR (Process Integration)
1. Add run_hook_v2() returning Process
2. Update SnapshotWorker to use Process methods
3. Migrate ArchiveResult.run() and Crawl.run()
4. Deprecate old run_hook()
### Future
- Remove run_hook_legacy after migration complete
- Add Process.get_tree() for hierarchy visualization
- Add ProcessMachine state machine for lifecycle management