mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-03 09:25:42 +10:00
Fix code review issues in process management refactor
- Add pwd validation in Process.launch() to prevent crashes - Fix psutil returncode handling (use wait() return value, not returncode attr) - Add None check for proc.pid in cleanup_stale_running() - Add stale process cleanup in Orchestrator.is_running() - Ensure orchestrator process_type is correctly set to ORCHESTRATOR - Fix KeyboardInterrupt handling (exit code 0 for graceful shutdown) - Throttle cleanup_stale_running() to once per 30 seconds for performance - Fix worker process_type to use TypeChoices.WORKER consistently - Fix get_running_workers() API to return list of dicts (not Process objects) - Only delete PID files after successful kill or confirmed stale - Fix migration index names to match between SQL and Django state - Remove db_index=True from process_type (index created manually) - Update documentation to reflect actual implementation - Add explanatory comments to empty except blocks - Fix exit codes to use Unix convention (128 + signal number) Co-authored-by: Nick Sweeting <pirate@users.noreply.github.com>
This commit is contained in:
@@ -1726,14 +1726,14 @@ The goal is to consolidate all subprocess management into `Process` model method
|
||||
|------------------|-------------|
|
||||
| `write_pid_file(worker_type, worker_id)` | `Process.current()` auto-creates |
|
||||
| `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` |
|
||||
| `remove_pid_file(path)` | Automatic on `Process.status = EXITED` |
|
||||
| `remove_pid_file(path)` | Manual cleanup in `Process.kill()` and legacy hook cleanup code |
|
||||
| `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` |
|
||||
| `get_all_pid_files()` | `Process.objects.filter(status='running')` |
|
||||
| `get_all_worker_pids(type)` | `Process.objects.filter(process_type=type, status='running')` |
|
||||
| `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` |
|
||||
| `get_running_worker_count(type)` | `Process.objects.filter(...).count()` |
|
||||
| `get_next_worker_id(type)` | Derive from `Process.objects.filter(...).count()` |
|
||||
| `stop_worker(pid, graceful)` | `Process.kill(signal_num=SIGTERM)` then `Process.kill(SIGKILL)` |
|
||||
| `get_next_worker_id(type)` | Use `Max(worker_id)+1` under transaction or DB sequence to avoid race conditions |
|
||||
| `stop_worker(pid, graceful)` | `Process.terminate(graceful_timeout)` or `Process.kill_tree()` |
|
||||
|
||||
#### `hooks.py` Changes
|
||||
|
||||
@@ -1752,10 +1752,13 @@ with open(stdout_file, 'w') as out, open(stderr_file, 'w') as err:
|
||||
|
||||
**New `run_hook()` using Process:**
|
||||
```python
|
||||
# Only store env delta or allowlist to avoid leaking secrets
|
||||
env_delta = {k: v for k, v in env.items() if k in ALLOWED_ENV_VARS}
|
||||
|
||||
hook_process = Process.objects.create(
|
||||
parent=parent_process,
|
||||
process_type=Process.TypeChoices.HOOK,
|
||||
cmd=cmd, pwd=str(output_dir), env=env, timeout=timeout,
|
||||
cmd=cmd, pwd=str(output_dir), env=env_delta, timeout=timeout,
|
||||
)
|
||||
hook_process.launch(background=is_background)
|
||||
# stdout/stderr/pid_file all handled internally by Process.launch()
|
||||
|
||||
@@ -424,8 +424,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
|
||||
if self.OUTPUT_DIR.exists():
|
||||
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
|
||||
cmd_file = pid_file.parent / 'cmd.sh'
|
||||
safe_kill_process(pid_file, cmd_file)
|
||||
pid_file.unlink(missing_ok=True)
|
||||
# Only delete PID file if kill succeeded or process is already dead
|
||||
killed = safe_kill_process(pid_file, cmd_file)
|
||||
if killed or not pid_file.exists():
|
||||
pid_file.unlink(missing_ok=True)
|
||||
|
||||
# Run on_CrawlEnd hooks
|
||||
from archivebox.config.configset import get_config
|
||||
|
||||
@@ -70,7 +70,6 @@ class Migration(migrations.Migration):
|
||||
('hook', 'Hook Script'),
|
||||
('binary', 'Binary Execution'),
|
||||
],
|
||||
db_index=True,
|
||||
default='binary',
|
||||
help_text='Type of process in the execution hierarchy',
|
||||
max_length=16,
|
||||
@@ -81,14 +80,14 @@ class Migration(migrations.Migration):
|
||||
model_name='process',
|
||||
index=models.Index(
|
||||
fields=['parent', 'status'],
|
||||
name='machine_pro_parent__status_idx',
|
||||
name='machine_process_parent_status_idx',
|
||||
),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='process',
|
||||
index=models.Index(
|
||||
fields=['machine', 'pid', 'started_at'],
|
||||
name='machine_pro_machine_pid_idx',
|
||||
name='machine_process_machine_pid_started_idx',
|
||||
),
|
||||
),
|
||||
],
|
||||
|
||||
@@ -914,7 +914,7 @@ class Process(ModelWithHealthStats):
|
||||
# Check if too old (PID definitely reused)
|
||||
if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW:
|
||||
is_stale = True
|
||||
elif PSUTIL_AVAILABLE:
|
||||
elif PSUTIL_AVAILABLE and proc.pid is not None:
|
||||
# Check if OS process still exists with matching start time
|
||||
try:
|
||||
os_proc = psutil.Process(proc.pid)
|
||||
@@ -1147,9 +1147,12 @@ class Process(ModelWithHealthStats):
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
# Validate pwd is set (required for output files)
|
||||
if not self.pwd:
|
||||
raise ValueError("Process.pwd must be set before calling launch()")
|
||||
|
||||
# Ensure output directory exists
|
||||
if self.pwd:
|
||||
Path(self.pwd).mkdir(parents=True, exist_ok=True)
|
||||
Path(self.pwd).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write cmd.sh for debugging
|
||||
self._write_cmd_file()
|
||||
@@ -1232,7 +1235,8 @@ class Process(ModelWithHealthStats):
|
||||
proc.send_signal(signal_num)
|
||||
|
||||
# Update our record
|
||||
self.exit_code = -signal_num
|
||||
# Use standard Unix convention: 128 + signal number
|
||||
self.exit_code = 128 + signal_num
|
||||
self.ended_at = timezone.now()
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.save()
|
||||
@@ -1336,9 +1340,10 @@ class Process(ModelWithHealthStats):
|
||||
|
||||
# Step 2: Wait for graceful exit
|
||||
try:
|
||||
proc.wait(timeout=graceful_timeout)
|
||||
exit_status = proc.wait(timeout=graceful_timeout)
|
||||
# Process exited gracefully
|
||||
self.exit_code = proc.returncode if hasattr(proc, 'returncode') else 0
|
||||
# psutil.Process.wait() returns the exit status
|
||||
self.exit_code = exit_status if exit_status is not None else 0
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = timezone.now()
|
||||
self.save()
|
||||
@@ -1350,7 +1355,8 @@ class Process(ModelWithHealthStats):
|
||||
proc.kill()
|
||||
proc.wait(timeout=2)
|
||||
|
||||
self.exit_code = -signal.SIGKILL
|
||||
# Use standard Unix convention: 128 + signal number
|
||||
self.exit_code = 128 + signal.SIGKILL
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = timezone.now()
|
||||
self.save()
|
||||
@@ -1398,6 +1404,7 @@ class Process(ModelWithHealthStats):
|
||||
try:
|
||||
child.terminate()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
# Child already dead or we don't have permission - continue
|
||||
pass
|
||||
|
||||
# Wait briefly for children to exit
|
||||
@@ -1410,6 +1417,7 @@ class Process(ModelWithHealthStats):
|
||||
child.kill()
|
||||
killed_count += 1
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
# Child exited or we don't have permission - continue
|
||||
pass
|
||||
|
||||
# Now kill self
|
||||
|
||||
@@ -72,6 +72,7 @@ class Orchestrator:
|
||||
self.pid: int = os.getpid()
|
||||
self.pid_file = None
|
||||
self.idle_count: int = 0
|
||||
self._last_cleanup_time: float = 0.0 # For throttling cleanup_stale_running()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]'
|
||||
@@ -81,15 +82,21 @@ class Orchestrator:
|
||||
"""Check if an orchestrator is already running."""
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
return Process.get_running_count(process_type='orchestrator') > 0
|
||||
# Clean up stale processes before counting
|
||||
Process.cleanup_stale_running()
|
||||
return Process.get_running_count(process_type=Process.TypeChoices.ORCHESTRATOR) > 0
|
||||
|
||||
def on_startup(self) -> None:
|
||||
"""Called when orchestrator starts."""
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
self.pid = os.getpid()
|
||||
# Register orchestrator process in database
|
||||
# Register orchestrator process in database with explicit type
|
||||
self.db_process = Process.current()
|
||||
# Ensure the process type is correctly set to ORCHESTRATOR
|
||||
if self.db_process.process_type != Process.TypeChoices.ORCHESTRATOR:
|
||||
self.db_process.process_type = Process.TypeChoices.ORCHESTRATOR
|
||||
self.db_process.save(update_fields=['process_type'])
|
||||
|
||||
# Clean up any stale Process records from previous runs
|
||||
stale_count = Process.cleanup_stale_running()
|
||||
@@ -115,7 +122,8 @@ class Orchestrator:
|
||||
"""Called when orchestrator shuts down."""
|
||||
# Update Process record status
|
||||
if hasattr(self, 'db_process') and self.db_process:
|
||||
self.db_process.exit_code = 1 if error else 0
|
||||
# KeyboardInterrupt is a graceful shutdown, not an error
|
||||
self.db_process.exit_code = 1 if error and not isinstance(error, KeyboardInterrupt) else 0
|
||||
self.db_process.status = self.db_process.StatusChoices.EXITED
|
||||
self.db_process.ended_at = timezone.now()
|
||||
self.db_process.save()
|
||||
@@ -131,8 +139,15 @@ class Orchestrator:
|
||||
def get_total_worker_count(self) -> int:
|
||||
"""Get total count of running workers across all types."""
|
||||
from archivebox.machine.models import Process
|
||||
import time
|
||||
|
||||
# Throttle cleanup to once every 30 seconds to avoid performance issues
|
||||
CLEANUP_THROTTLE_SECONDS = 30
|
||||
now = time.time()
|
||||
if now - self._last_cleanup_time > CLEANUP_THROTTLE_SECONDS:
|
||||
Process.cleanup_stale_running()
|
||||
self._last_cleanup_time = now
|
||||
|
||||
Process.cleanup_stale_running()
|
||||
return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES)
|
||||
|
||||
def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool:
|
||||
|
||||
@@ -290,7 +290,7 @@ class Worker:
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
if worker_id is None:
|
||||
worker_id = Process.get_next_worker_id(process_type=cls.name)
|
||||
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
|
||||
|
||||
# Use module-level function for pickling compatibility
|
||||
proc = MPProcess(
|
||||
@@ -310,14 +310,24 @@ class Worker:
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
Process.cleanup_stale_running()
|
||||
return list(Process.get_running(process_type=cls.name))
|
||||
# Convert Process objects to dicts to match the expected API contract
|
||||
processes = Process.get_running(process_type=Process.TypeChoices.WORKER)
|
||||
return [
|
||||
{
|
||||
'pid': p.pid,
|
||||
'worker_id': p.id,
|
||||
'started_at': p.started_at.isoformat() if p.started_at else None,
|
||||
'status': p.status,
|
||||
}
|
||||
for p in processes
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def get_worker_count(cls) -> int:
|
||||
"""Get count of running workers of this type."""
|
||||
from archivebox.machine.models import Process
|
||||
|
||||
return Process.get_running_count(process_type=cls.name)
|
||||
return Process.get_running_count(process_type=Process.TypeChoices.WORKER)
|
||||
|
||||
|
||||
class CrawlWorker(Worker):
|
||||
|
||||
Reference in New Issue
Block a user