mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-02 17:05:38 +10:00
Add detailed hook/run() changes to Process tracking plan
Phase 2 now includes line-by-line mapping of:
- run_hook(): Create Process record, use Process.launch(), parse
JSONL for child binary Process records
- process_is_alive(): Accept Path or Process, use Process.is_alive()
- kill_process(): Accept Path or Process, use Process.kill()
- ArchiveResult.run(): Pass self.process as parent_process to run_hook()
- ArchiveResult.update_from_output(): Read from Process.stdout/stderr
- Snapshot.cleanup(): Kill via Process model, fallback to PID files
- Snapshot.has_running_background_hooks(): Check via Process model
Hook JSONL contract updated to support {"type": "Process"} records
for tracking binary executions within hooks.
This commit is contained in:
@@ -578,119 +578,445 @@ class Process(ModelWithHealthStats):
|
||||
|
||||
---
|
||||
|
||||
## Phase 2: Hook System Changes
|
||||
## Phase 2: Hook System Changes (Detailed)
|
||||
|
||||
### 2.1 Update `run_hook()` to Create Process Records
|
||||
This section provides a line-by-line mapping of current code to required changes.
|
||||
|
||||
**File:** `archivebox/hooks.py`
|
||||
### 2.1 Current Architecture Overview
|
||||
|
||||
Current implementation creates `subprocess.Popen` directly. Refactor to:
|
||||
**Current Flow:**
|
||||
```
|
||||
ArchiveResult.run() [core/models.py:2463]
|
||||
└── run_hook() [hooks.py:238]
|
||||
└── subprocess.Popen() [hooks.py:381]
|
||||
└── writes: stdout.log, stderr.log, hook.pid, cmd.sh
|
||||
```
|
||||
|
||||
1. Accept an optional `parent_process` parameter
|
||||
2. Create a `Process` record for the hook script
|
||||
3. Create a separate `Process` record for the binary (if hook reports one)
|
||||
**Target Flow:**
|
||||
```
|
||||
ArchiveResult.run()
|
||||
└── run_hook(parent_process=self.process) # Pass existing Process FK
|
||||
└── hook_process = Process.objects.create(parent=parent_process, type=HOOK)
|
||||
└── hook_process.launch(background=is_bg) # Uses Process methods
|
||||
└── writes: stdout.log, stderr.log via Process.stdout_file/stderr_file
|
||||
└── Process handles PID file internally
|
||||
└── parse JSONL for {"type": "Process"} records → create child binary Processes
|
||||
```
|
||||
|
||||
### 2.2 Changes to `hooks.py`
|
||||
|
||||
#### 2.2.1 Update `run_hook()` Signature and Body
|
||||
|
||||
**File:** `archivebox/hooks.py` lines 238-483
|
||||
|
||||
**CURRENT CODE (lines 374-398):**
|
||||
```python
|
||||
# Set up output files for ALL hooks (useful for debugging)
|
||||
stdout_file = output_dir / 'stdout.log'
|
||||
stderr_file = output_dir / 'stderr.log'
|
||||
pid_file = output_dir / 'hook.pid'
|
||||
cmd_file = output_dir / 'cmd.sh'
|
||||
|
||||
try:
|
||||
# Write command script for validation
|
||||
from archivebox.misc.process_utils import write_cmd_file
|
||||
write_cmd_file(cmd_file, cmd)
|
||||
|
||||
# Open log files for writing
|
||||
with open(stdout_file, 'w') as out, open(stderr_file, 'w') as err:
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
cwd=str(output_dir),
|
||||
stdout=out,
|
||||
stderr=err,
|
||||
env=env,
|
||||
)
|
||||
|
||||
# Write PID with mtime set to process start time for validation
|
||||
from archivebox.misc.process_utils import write_pid_file_with_mtime
|
||||
process_start_time = time.time()
|
||||
write_pid_file_with_mtime(pid_file, process.pid, process_start_time)
|
||||
|
||||
if is_background:
|
||||
# Background hook - return None immediately, don't wait
|
||||
return None
|
||||
```
|
||||
|
||||
**NEW CODE:**
|
||||
```python
|
||||
def run_hook(
|
||||
script: Path,
|
||||
output_dir: Path,
|
||||
config: Dict[str, Any],
|
||||
timeout: Optional[int] = None,
|
||||
parent_process: Optional['Process'] = None, # NEW
|
||||
parent_process: Optional['Process'] = None, # NEW: from ArchiveResult.process
|
||||
**kwargs: Any
|
||||
) -> HookResult:
|
||||
"""
|
||||
Execute a hook script with the given arguments.
|
||||
|
||||
Now creates Process records for tracking:
|
||||
- One Process for the hook script itself
|
||||
- Child Process records for any binaries the hook reports running
|
||||
"""
|
||||
from archivebox.machine.models import Process, Machine
|
||||
|
||||
# ... existing setup code ...
|
||||
# ... existing setup (lines 270-372) ...
|
||||
|
||||
# Create Process record for this hook
|
||||
# Create Process record for this hook execution
|
||||
# Parent is the ArchiveResult's Process (passed from ArchiveResult.run())
|
||||
hook_process = Process.objects.create(
|
||||
machine=Machine.current(),
|
||||
parent=parent_process,
|
||||
process_type=Process.TypeChoices.HOOK,
|
||||
cmd=cmd,
|
||||
pwd=str(output_dir),
|
||||
env=env, # Store sanitized env
|
||||
env={k: v for k, v in env.items() if k not in os.environ}, # Only store non-default env
|
||||
timeout=timeout,
|
||||
status=Process.StatusChoices.QUEUED,
|
||||
)
|
||||
|
||||
# Launch the hook
|
||||
hook_process.launch(background=is_background_hook)
|
||||
# Use Process.launch() which handles:
|
||||
# - subprocess.Popen
|
||||
# - PID file with mtime validation
|
||||
# - cmd.sh script
|
||||
# - stdout/stderr capture
|
||||
# - status transitions
|
||||
if is_background:
|
||||
hook_process.launch(background=True)
|
||||
# Return None for background hooks (existing behavior)
|
||||
# HookResult not returned - caller uses hook_process.id to track
|
||||
return None
|
||||
else:
|
||||
hook_process.launch(background=False) # Blocks until completion
|
||||
|
||||
# ... rest of processing ...
|
||||
# Read output from Process (instead of files directly)
|
||||
stdout = hook_process.stdout
|
||||
stderr = hook_process.stderr
|
||||
returncode = hook_process.exit_code
|
||||
|
||||
# ... existing JSONL parsing (lines 427-448) ...
|
||||
|
||||
# NEW: Create child Process records for binaries reported in JSONL
|
||||
for record in records:
|
||||
if record.get('type') == 'Process':
|
||||
Process.objects.create(
|
||||
machine=hook_process.machine,
|
||||
parent=hook_process,
|
||||
process_type=Process.TypeChoices.BINARY,
|
||||
cmd=record.get('cmd', []),
|
||||
pwd=record.get('pwd', str(output_dir)),
|
||||
pid=record.get('pid'),
|
||||
exit_code=record.get('exit_code'),
|
||||
started_at=parse_ts(record.get('started_at')),
|
||||
ended_at=parse_ts(record.get('ended_at')),
|
||||
status=Process.StatusChoices.EXITED,
|
||||
)
|
||||
|
||||
return HookResult(
|
||||
returncode=returncode,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
# ... existing fields ...
|
||||
process_id=str(hook_process.id), # NEW: include process ID
|
||||
process_id=str(hook_process.id), # NEW
|
||||
)
|
||||
```
|
||||
|
||||
### 2.2 Update HookResult TypedDict
|
||||
#### 2.2.2 Update `process_is_alive()` to Use Process Model
|
||||
|
||||
**CURRENT CODE (lines 1238-1256):**
|
||||
```python
|
||||
class HookResult(TypedDict, total=False):
|
||||
"""Raw result from run_hook()."""
|
||||
returncode: int
|
||||
stdout: str
|
||||
stderr: str
|
||||
output_json: Optional[Dict[str, Any]]
|
||||
output_files: List[str]
|
||||
duration_ms: int
|
||||
hook: str
|
||||
plugin: str
|
||||
hook_name: str
|
||||
records: List[Dict[str, Any]]
|
||||
process_id: str # NEW: ID of the hook Process record
|
||||
def process_is_alive(pid_file: Path) -> bool:
|
||||
"""Check if process in PID file is still running."""
|
||||
if not pid_file.exists():
|
||||
return False
|
||||
try:
|
||||
pid = int(pid_file.read_text().strip())
|
||||
os.kill(pid, 0)
|
||||
return True
|
||||
except (OSError, ValueError):
|
||||
return False
|
||||
```
|
||||
|
||||
### 2.3 Handle Binary Process Records from Hook Output
|
||||
|
||||
Hooks can output JSONL records describing binaries they run. Parse these and create child `Process` records:
|
||||
|
||||
**NEW CODE:**
|
||||
```python
|
||||
def process_hook_binary_records(
|
||||
hook_process: 'Process',
|
||||
records: List[Dict[str, Any]]
|
||||
) -> List['Process']:
|
||||
def process_is_alive(pid_file_or_process: 'Path | Process') -> bool:
|
||||
"""
|
||||
Create child Process records for binaries reported by hook.
|
||||
Check if process is still running.
|
||||
|
||||
Hooks output JSONL like:
|
||||
{"type": "Process", "cmd": ["wget", "-p", "..."], "exit_code": 0}
|
||||
Accepts either:
|
||||
- Path to hook.pid file (legacy)
|
||||
- Process model instance (new)
|
||||
"""
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
binary_processes = []
|
||||
if isinstance(pid_file_or_process, Process):
|
||||
return pid_file_or_process.is_alive()
|
||||
|
||||
for record in records:
|
||||
if record.get('type') != 'Process':
|
||||
# Legacy path-based check (for backwards compatibility)
|
||||
pid_file = pid_file_or_process
|
||||
if not pid_file.exists():
|
||||
return False
|
||||
|
||||
# Try to find matching Process record
|
||||
try:
|
||||
pid = int(pid_file.read_text().strip())
|
||||
process = Process.objects.get_by_pid(pid)
|
||||
if process:
|
||||
return process.is_alive()
|
||||
except (ValueError, Process.DoesNotExist):
|
||||
pass
|
||||
|
||||
# Fallback to OS check
|
||||
from archivebox.misc.process_utils import validate_pid_file
|
||||
return validate_pid_file(pid_file)
|
||||
```
|
||||
|
||||
#### 2.2.3 Update `kill_process()` to Use Process Model
|
||||
|
||||
**CURRENT CODE (lines 1259-1282):**
|
||||
```python
|
||||
def kill_process(pid_file: Path, sig: int = signal.SIGTERM, validate: bool = True):
|
||||
"""Kill process in PID file with optional validation."""
|
||||
from archivebox.misc.process_utils import safe_kill_process
|
||||
|
||||
if validate:
|
||||
cmd_file = pid_file.parent / 'cmd.sh'
|
||||
safe_kill_process(pid_file, cmd_file, signal_num=sig)
|
||||
else:
|
||||
# Legacy behavior
|
||||
...
|
||||
```
|
||||
|
||||
**NEW CODE:**
|
||||
```python
|
||||
def kill_process(
|
||||
pid_file_or_process: 'Path | Process',
|
||||
sig: int = signal.SIGTERM,
|
||||
validate: bool = True
|
||||
):
|
||||
"""
|
||||
Kill process with optional validation.
|
||||
|
||||
Accepts either:
|
||||
- Path to hook.pid file (legacy)
|
||||
- Process model instance (new)
|
||||
"""
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
if isinstance(pid_file_or_process, Process):
|
||||
pid_file_or_process.kill(signal_num=sig)
|
||||
return
|
||||
|
||||
# Legacy path-based kill
|
||||
pid_file = pid_file_or_process
|
||||
|
||||
# Try to find matching Process record first
|
||||
try:
|
||||
pid = int(pid_file.read_text().strip())
|
||||
process = Process.objects.get_by_pid(pid)
|
||||
if process:
|
||||
process.kill(signal_num=sig)
|
||||
return
|
||||
except (ValueError, Process.DoesNotExist, FileNotFoundError):
|
||||
pass
|
||||
|
||||
# Fallback to file-based kill
|
||||
if validate:
|
||||
from archivebox.misc.process_utils import safe_kill_process
|
||||
cmd_file = pid_file.parent / 'cmd.sh'
|
||||
safe_kill_process(pid_file, cmd_file, signal_num=sig)
|
||||
```
|
||||
|
||||
### 2.3 Changes to `core/models.py` - ArchiveResult
|
||||
|
||||
#### 2.3.1 Update `ArchiveResult.run()` to Pass Parent Process
|
||||
|
||||
**File:** `archivebox/core/models.py` lines 2463-2565
|
||||
|
||||
**CURRENT CODE (lines 2527-2535):**
|
||||
```python
|
||||
result = run_hook(
|
||||
hook,
|
||||
output_dir=plugin_dir,
|
||||
config=config,
|
||||
url=self.snapshot.url,
|
||||
snapshot_id=str(self.snapshot.id),
|
||||
crawl_id=str(self.snapshot.crawl.id),
|
||||
depth=self.snapshot.depth,
|
||||
)
|
||||
```
|
||||
|
||||
**NEW CODE:**
|
||||
```python
|
||||
result = run_hook(
|
||||
hook,
|
||||
output_dir=plugin_dir,
|
||||
config=config,
|
||||
parent_process=self.process, # NEW: Pass our Process as parent for hook's Process
|
||||
url=self.snapshot.url,
|
||||
snapshot_id=str(self.snapshot.id),
|
||||
crawl_id=str(self.snapshot.crawl.id),
|
||||
depth=self.snapshot.depth,
|
||||
)
|
||||
```
|
||||
|
||||
#### 2.3.2 Update `ArchiveResult.update_from_output()` to Use Process
|
||||
|
||||
**File:** `archivebox/core/models.py` lines 2568-2700
|
||||
|
||||
**CURRENT CODE (lines 2598-2600):**
|
||||
```python
|
||||
# Read and parse JSONL output from stdout.log
|
||||
stdout_file = plugin_dir / 'stdout.log'
|
||||
stdout = stdout_file.read_text() if stdout_file.exists() else ''
|
||||
```
|
||||
|
||||
**NEW CODE:**
|
||||
```python
|
||||
# Read output from Process record (populated by Process.launch())
|
||||
if self.process_id:
|
||||
# Process already has stdout/stderr from launch()
|
||||
stdout = self.process.stdout
|
||||
stderr = self.process.stderr
|
||||
else:
|
||||
# Fallback to file-based read (legacy)
|
||||
stdout_file = plugin_dir / 'stdout.log'
|
||||
stdout = stdout_file.read_text() if stdout_file.exists() else ''
|
||||
```
|
||||
|
||||
### 2.4 Changes to `core/models.py` - Snapshot
|
||||
|
||||
#### 2.4.1 Update `Snapshot.cleanup()` to Use Process Model
|
||||
|
||||
**File:** `archivebox/core/models.py` lines 1381-1401
|
||||
|
||||
**CURRENT CODE:**
|
||||
```python
|
||||
def cleanup(self):
|
||||
from archivebox.hooks import kill_process
|
||||
|
||||
if not self.OUTPUT_DIR.exists():
|
||||
return
|
||||
|
||||
# Find all .pid files in this snapshot's output directory
|
||||
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
|
||||
kill_process(pid_file, validate=True)
|
||||
|
||||
# Update all STARTED ArchiveResults from filesystem
|
||||
results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED)
|
||||
for ar in results:
|
||||
ar.update_from_output()
|
||||
```
|
||||
|
||||
**NEW CODE:**
|
||||
```python
|
||||
def cleanup(self):
|
||||
"""
|
||||
Clean up background ArchiveResult hooks.
|
||||
|
||||
Uses Process model to find and kill running hooks.
|
||||
Falls back to PID file scanning for legacy compatibility.
|
||||
"""
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
# Kill running hook Processes for this snapshot's ArchiveResults
|
||||
for ar in self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED):
|
||||
if ar.process_id:
|
||||
# Get hook Processes that are children of this AR's Process
|
||||
hook_processes = Process.objects.filter(
|
||||
parent=ar.process,
|
||||
process_type=Process.TypeChoices.HOOK,
|
||||
status=Process.StatusChoices.RUNNING,
|
||||
)
|
||||
for hook_proc in hook_processes:
|
||||
hook_proc.kill()
|
||||
|
||||
# Also kill any child binary processes
|
||||
if ar.process_id:
|
||||
for child in ar.process.children.filter(status=Process.StatusChoices.RUNNING):
|
||||
child.kill()
|
||||
|
||||
# Legacy fallback: scan for .pid files not tracked in DB
|
||||
if self.OUTPUT_DIR.exists():
|
||||
from archivebox.hooks import kill_process
|
||||
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
|
||||
kill_process(pid_file, validate=True)
|
||||
|
||||
# Update all STARTED ArchiveResults from filesystem/Process
|
||||
for ar in self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED):
|
||||
ar.update_from_output()
|
||||
```
|
||||
|
||||
#### 2.4.2 Update `Snapshot.has_running_background_hooks()` to Use Process Model
|
||||
|
||||
**CURRENT CODE (lines 1403-1420):**
|
||||
```python
|
||||
def has_running_background_hooks(self) -> bool:
|
||||
from archivebox.hooks import process_is_alive
|
||||
|
||||
if not self.OUTPUT_DIR.exists():
|
||||
return False
|
||||
|
||||
for plugin_dir in self.OUTPUT_DIR.iterdir():
|
||||
if not plugin_dir.is_dir():
|
||||
continue
|
||||
pid_file = plugin_dir / 'hook.pid'
|
||||
if process_is_alive(pid_file):
|
||||
return True
|
||||
|
||||
binary_process = Process.objects.create(
|
||||
machine=hook_process.machine,
|
||||
parent=hook_process,
|
||||
process_type=Process.TypeChoices.BINARY,
|
||||
cmd=record.get('cmd', []),
|
||||
pwd=record.get('pwd', hook_process.pwd),
|
||||
pid=record.get('pid'),
|
||||
exit_code=record.get('exit_code'),
|
||||
stdout=record.get('stdout', ''),
|
||||
stderr=record.get('stderr', ''),
|
||||
started_at=parse_datetime(record.get('started_at')),
|
||||
ended_at=parse_datetime(record.get('ended_at')),
|
||||
status=Process.StatusChoices.EXITED,
|
||||
)
|
||||
binary_processes.append(binary_process)
|
||||
return False
|
||||
```
|
||||
|
||||
return binary_processes
|
||||
**NEW CODE:**
|
||||
```python
|
||||
def has_running_background_hooks(self) -> bool:
|
||||
"""
|
||||
Check if any ArchiveResult background hooks are still running.
|
||||
|
||||
Uses Process model for tracking, falls back to PID file check.
|
||||
"""
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
# Check via Process model (preferred)
|
||||
for ar in self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED):
|
||||
if ar.process_id:
|
||||
# Check if hook Process children are running
|
||||
running_hooks = Process.objects.filter(
|
||||
parent=ar.process,
|
||||
process_type=Process.TypeChoices.HOOK,
|
||||
status=Process.StatusChoices.RUNNING,
|
||||
).exists()
|
||||
if running_hooks:
|
||||
return True
|
||||
|
||||
# Also check the AR's own process
|
||||
if ar.process.is_alive():
|
||||
return True
|
||||
|
||||
# Legacy fallback: check PID files
|
||||
if self.OUTPUT_DIR.exists():
|
||||
from archivebox.hooks import process_is_alive
|
||||
for plugin_dir in self.OUTPUT_DIR.iterdir():
|
||||
if plugin_dir.is_dir():
|
||||
pid_file = plugin_dir / 'hook.pid'
|
||||
if process_is_alive(pid_file):
|
||||
return True
|
||||
|
||||
return False
|
||||
```
|
||||
|
||||
### 2.5 Hook JSONL Output Contract Update
|
||||
|
||||
Hooks should now output `{"type": "Process", ...}` records for any binaries they run:
|
||||
|
||||
```jsonl
|
||||
{"type": "ArchiveResult", "status": "succeeded", "output_str": "Downloaded page"}
|
||||
{"type": "Process", "cmd": ["/usr/bin/wget", "-p", "https://example.com"], "pid": 12345, "exit_code": 0, "started_at": "2024-01-15T10:30:00Z", "ended_at": "2024-01-15T10:30:05Z"}
|
||||
{"type": "Process", "cmd": ["/usr/bin/curl", "-O", "image.png"], "pid": 12346, "exit_code": 0}
|
||||
```
|
||||
|
||||
This allows full tracking of the process hierarchy:
|
||||
```
|
||||
Process(archivebox add, type=CLI)
|
||||
└── Process(orchestrator, type=ORCHESTRATOR)
|
||||
└── Process(archiveresult_worker, type=WORKER)
|
||||
└── Process(on_Snapshot__50_wget.py, type=HOOK) # ArchiveResult.process
|
||||
└── Process(wget -p ..., type=BINARY) # from JSONL
|
||||
└── Process(curl -O ..., type=BINARY) # from JSONL
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
Reference in New Issue
Block a user