mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-03 09:25:42 +10:00
Add terminate, kill_tree, and query methods to Process model
This consolidates scattered subprocess management logic into the Process model: - terminate(): Graceful SIGTERM → wait → SIGKILL (replaces stop_worker, etc.) - kill_tree(): Kill process and all OS children (replaces os.killpg logic) - kill_children_db(): Kill DB-tracked child processes - get_running(): Query running processes by type (replaces get_all_worker_pids) - get_running_count(): Count running processes (replaces get_running_worker_count) - stop_all(): Stop all processes of a type - get_next_worker_id(): Get next worker ID for spawning Added Phase 8 to TODO documenting ~390 lines that can be deleted after consolidation, including workers/pid_utils.py which becomes obsolete. Also includes migration 0002 for parent FK and process_type fields.
This commit is contained in:
@@ -1702,6 +1702,227 @@ class ProcessAdmin(admin.ModelAdmin):
|
||||
|
||||
---
|
||||
|
||||
## Phase 8: Code Consolidation (Delete Redundant Logic)
|
||||
|
||||
The goal is to consolidate all subprocess management into `Process` model methods, eliminating duplicate logic scattered across the codebase.
|
||||
|
||||
### 8.1 Files to Simplify/Delete
|
||||
|
||||
| File | Current Lines | After Consolidation | Savings |
|
||||
|------|--------------|---------------------|---------|
|
||||
| `workers/pid_utils.py` | ~192 lines | DELETE entirely | -192 |
|
||||
| `misc/process_utils.py` | ~85 lines | Keep as low-level utils | 0 |
|
||||
| `hooks.py` (run_hook) | ~100 lines | -50 lines (use Process.launch) | -50 |
|
||||
| `hooks.py` (kill/alive) | ~50 lines | DELETE (use Process.kill/is_running) | -50 |
|
||||
| `crawls/models.py` (cleanup) | ~100 lines | -70 lines (use Process.kill) | -70 |
|
||||
| `supervisord_util.py` | ~50 lines process mgmt | -30 lines | -30 |
|
||||
| **TOTAL** | | | **~-390 lines** |
|
||||
|
||||
### 8.2 Detailed Consolidation Map
|
||||
|
||||
#### `workers/pid_utils.py` → DELETE ENTIRELY
|
||||
|
||||
| Current Function | Replacement |
|
||||
|------------------|-------------|
|
||||
| `write_pid_file(worker_type, worker_id)` | `Process.current()` auto-creates |
|
||||
| `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` |
|
||||
| `remove_pid_file(path)` | Automatic on `Process.status = EXITED` |
|
||||
| `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` |
|
||||
| `get_all_pid_files()` | `Process.objects.filter(status='running')` |
|
||||
| `get_all_worker_pids(type)` | `Process.objects.filter(process_type=type, status='running')` |
|
||||
| `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` |
|
||||
| `get_running_worker_count(type)` | `Process.objects.filter(...).count()` |
|
||||
| `get_next_worker_id(type)` | Derive from `Process.objects.filter(...).count()` |
|
||||
| `stop_worker(pid, graceful)` | `Process.kill(signal_num=SIGTERM)` then `Process.kill(SIGKILL)` |
|
||||
|
||||
#### `hooks.py` Changes
|
||||
|
||||
**Current `run_hook()` lines 374-398:**
|
||||
```python
|
||||
# DELETE these lines - replaced by Process.launch()
|
||||
stdout_file = output_dir / 'stdout.log'
|
||||
stderr_file = output_dir / 'stderr.log'
|
||||
pid_file = output_dir / 'hook.pid'
|
||||
cmd_file = output_dir / 'cmd.sh'
|
||||
write_cmd_file(cmd_file, cmd)
|
||||
with open(stdout_file, 'w') as out, open(stderr_file, 'w') as err:
|
||||
process = subprocess.Popen(cmd, ...)
|
||||
write_pid_file_with_mtime(pid_file, process.pid, time.time())
|
||||
```
|
||||
|
||||
**New `run_hook()` using Process:**
|
||||
```python
|
||||
hook_process = Process.objects.create(
|
||||
parent=parent_process,
|
||||
process_type=Process.TypeChoices.HOOK,
|
||||
cmd=cmd, pwd=str(output_dir), env=env, timeout=timeout,
|
||||
)
|
||||
hook_process.launch(background=is_background)
|
||||
# stdout/stderr/pid_file all handled internally by Process.launch()
|
||||
```
|
||||
|
||||
**DELETE these functions entirely:**
|
||||
```python
|
||||
def process_is_alive(pid_file: Path) -> bool: # lines 1238-1256
|
||||
def kill_process(pid_file: Path, sig, validate): # lines 1259-1282
|
||||
```
|
||||
|
||||
**Replace with:**
|
||||
```python
|
||||
# Use Process methods directly:
|
||||
process.is_running # replaces process_is_alive()
|
||||
process.kill() # replaces kill_process()
|
||||
```
|
||||
|
||||
#### `crawls/models.py` Changes
|
||||
|
||||
**Current `Crawl.cleanup()` lines 418-493:**
|
||||
```python
|
||||
# DELETE all this inline process logic:
|
||||
def is_process_alive(pid):
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
return True
|
||||
except (OSError, ProcessLookupError):
|
||||
return False
|
||||
|
||||
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
|
||||
if not validate_pid_file(pid_file, cmd_file):
|
||||
pid_file.unlink(missing_ok=True)
|
||||
continue
|
||||
pid = int(pid_file.read_text().strip())
|
||||
os.killpg(pid, signal.SIGTERM)
|
||||
time.sleep(2)
|
||||
if not is_process_alive(pid):
|
||||
pid_file.unlink(missing_ok=True)
|
||||
continue
|
||||
os.killpg(pid, signal.SIGKILL)
|
||||
# ... more cleanup logic
|
||||
```
|
||||
|
||||
**New `Crawl.cleanup()` using Process:**
|
||||
```python
|
||||
def cleanup(self):
|
||||
# Kill all running child processes for this crawl
|
||||
for snapshot in self.snapshot_set.all():
|
||||
for ar in snapshot.archiveresult_set.filter(status='started'):
|
||||
if ar.process_id:
|
||||
# Kill hook process and all its children
|
||||
ar.process.kill()
|
||||
for child in ar.process.children.filter(status='running'):
|
||||
child.kill()
|
||||
|
||||
# Run on_CrawlEnd hooks (foreground)
|
||||
# ... existing hook running logic ...
|
||||
```
|
||||
|
||||
#### `supervisord_util.py` Changes
|
||||
|
||||
**Current global tracking:**
|
||||
```python
|
||||
_supervisord_proc = None # subprocess.Popen reference
|
||||
|
||||
def stop_existing_supervisord_process():
|
||||
global _supervisord_proc
|
||||
if _supervisord_proc and _supervisord_proc.poll() is None:
|
||||
_supervisord_proc.terminate()
|
||||
_supervisord_proc.wait(timeout=5)
|
||||
# ... fallback to PID file ...
|
||||
```
|
||||
|
||||
**New using Process model:**
|
||||
```python
|
||||
_supervisord_db_process = None # Process model instance
|
||||
|
||||
def start_new_supervisord_process():
|
||||
# ... existing subprocess.Popen ...
|
||||
global _supervisord_db_process
|
||||
_supervisord_db_process = Process.objects.create(
|
||||
parent=Process.current(),
|
||||
process_type=Process.TypeChoices.SUPERVISORD,
|
||||
pid=proc.pid,
|
||||
cmd=['supervisord', f'--configuration={CONFIG_FILE}'],
|
||||
started_at=timezone.now(),
|
||||
status=Process.StatusChoices.RUNNING,
|
||||
)
|
||||
|
||||
def stop_existing_supervisord_process():
|
||||
global _supervisord_db_process
|
||||
if _supervisord_db_process:
|
||||
_supervisord_db_process.kill() # Handles children, PID validation, etc.
|
||||
_supervisord_db_process = None
|
||||
```
|
||||
|
||||
#### `workers/worker.py` Changes
|
||||
|
||||
**Current:**
|
||||
```python
|
||||
from .pid_utils import write_pid_file, remove_pid_file, ...
|
||||
|
||||
def on_startup(self):
|
||||
self.pid = os.getpid()
|
||||
self.pid_file = write_pid_file(self.name, self.worker_id)
|
||||
|
||||
def on_shutdown(self, error=None):
|
||||
if self.pid_file:
|
||||
remove_pid_file(self.pid_file)
|
||||
```
|
||||
|
||||
**New:**
|
||||
```python
|
||||
# No import needed - Process.current() handles everything
|
||||
|
||||
def on_startup(self):
|
||||
self.db_process = Process.current()
|
||||
# Process.current() auto-detects type, finds parent via PPID, creates record
|
||||
|
||||
def on_shutdown(self, error=None):
|
||||
if self.db_process:
|
||||
self.db_process.exit_code = 0 if error is None else 1
|
||||
self.db_process.status = Process.StatusChoices.EXITED
|
||||
self.db_process.ended_at = timezone.now()
|
||||
self.db_process.save()
|
||||
```
|
||||
|
||||
### 8.3 New Process Model Methods Summary
|
||||
|
||||
All process operations now go through `Process`:
|
||||
|
||||
```python
|
||||
# Getting current process
|
||||
Process.current() # Creates/retrieves Process for os.getpid()
|
||||
|
||||
# Spawning new process
|
||||
proc = Process.objects.create(parent=Process.current(), cmd=[...], ...)
|
||||
proc.launch(background=False) # Handles Popen, PID file, stdout/stderr
|
||||
|
||||
# Checking process status
|
||||
proc.is_running # True if OS process exists and matches
|
||||
proc.proc # psutil.Process or None (validated)
|
||||
proc.poll() # Returns exit_code or None
|
||||
|
||||
# Terminating process
|
||||
proc.kill() # Safe kill with PID validation
|
||||
proc.kill(SIGKILL) # Force kill
|
||||
|
||||
# Waiting for completion
|
||||
proc.wait(timeout=30) # Blocks until exit or timeout
|
||||
|
||||
# Cleanup
|
||||
Process.cleanup_stale_running() # Mark orphaned processes as EXITED
|
||||
```
|
||||
|
||||
### 8.4 Benefits
|
||||
|
||||
1. **Single Source of Truth**: All process state in database, queryable
|
||||
2. **PID Reuse Protection**: `Process.proc` validates via psutil.create_time()
|
||||
3. **Hierarchy Tracking**: `Process.parent` / `Process.children` for tree traversal
|
||||
4. **Machine-Scoped**: All queries filter by `machine=Machine.current()`
|
||||
5. **Audit Trail**: Every subprocess is logged with timestamps, exit codes
|
||||
6. **No Stale PID Files**: Process records update status automatically
|
||||
|
||||
---
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. **Performance**: Deep hierarchies with many children could slow queries. Consider:
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
# Generated on 2025-12-31
|
||||
# Adds parent FK and process_type field to Process model
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('machine', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.SeparateDatabaseAndState(
|
||||
database_operations=[
|
||||
migrations.RunSQL(
|
||||
sql="""
|
||||
-- Add parent_id FK column to machine_process
|
||||
ALTER TABLE machine_process ADD COLUMN parent_id TEXT REFERENCES machine_process(id) ON DELETE SET NULL;
|
||||
CREATE INDEX IF NOT EXISTS machine_process_parent_id_idx ON machine_process(parent_id);
|
||||
|
||||
-- Add process_type column with default 'binary'
|
||||
ALTER TABLE machine_process ADD COLUMN process_type VARCHAR(16) NOT NULL DEFAULT 'binary';
|
||||
CREATE INDEX IF NOT EXISTS machine_process_process_type_idx ON machine_process(process_type);
|
||||
|
||||
-- Add composite index for parent + status queries
|
||||
CREATE INDEX IF NOT EXISTS machine_process_parent_status_idx ON machine_process(parent_id, status);
|
||||
|
||||
-- Add composite index for machine + pid + started_at (for PID reuse protection)
|
||||
CREATE INDEX IF NOT EXISTS machine_process_machine_pid_started_idx ON machine_process(machine_id, pid, started_at);
|
||||
""",
|
||||
reverse_sql="""
|
||||
DROP INDEX IF EXISTS machine_process_machine_pid_started_idx;
|
||||
DROP INDEX IF EXISTS machine_process_parent_status_idx;
|
||||
DROP INDEX IF EXISTS machine_process_process_type_idx;
|
||||
DROP INDEX IF EXISTS machine_process_parent_id_idx;
|
||||
|
||||
-- SQLite doesn't support DROP COLUMN directly, but we record the intent
|
||||
-- In practice, this migration is forward-only for SQLite
|
||||
-- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN process_type;
|
||||
-- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN parent_id;
|
||||
"""
|
||||
),
|
||||
],
|
||||
state_operations=[
|
||||
# Add parent FK
|
||||
migrations.AddField(
|
||||
model_name='process',
|
||||
name='parent',
|
||||
field=models.ForeignKey(
|
||||
blank=True,
|
||||
help_text='Parent process that spawned this one',
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
related_name='children',
|
||||
to='machine.process',
|
||||
),
|
||||
),
|
||||
# Add process_type field
|
||||
migrations.AddField(
|
||||
model_name='process',
|
||||
name='process_type',
|
||||
field=models.CharField(
|
||||
choices=[
|
||||
('cli', 'CLI Command'),
|
||||
('supervisord', 'Supervisord Daemon'),
|
||||
('orchestrator', 'Orchestrator'),
|
||||
('worker', 'Worker Process'),
|
||||
('hook', 'Hook Script'),
|
||||
('binary', 'Binary Execution'),
|
||||
],
|
||||
db_index=True,
|
||||
default='binary',
|
||||
help_text='Type of process in the execution hierarchy',
|
||||
max_length=16,
|
||||
),
|
||||
),
|
||||
# Add indexes
|
||||
migrations.AddIndex(
|
||||
model_name='process',
|
||||
index=models.Index(
|
||||
fields=['parent', 'status'],
|
||||
name='machine_pro_parent__status_idx',
|
||||
),
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name='process',
|
||||
index=models.Index(
|
||||
fields=['machine', 'pid', 'started_at'],
|
||||
name='machine_pro_machine_pid_idx',
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
]
|
||||
@@ -1,8 +1,11 @@
|
||||
__package__ = 'archivebox.machine'
|
||||
|
||||
import os
|
||||
import sys
|
||||
import socket
|
||||
from pathlib import Path
|
||||
from archivebox.uuid_compat import uuid7
|
||||
from datetime import timedelta
|
||||
from datetime import timedelta, datetime
|
||||
|
||||
from statemachine import State, registry
|
||||
|
||||
@@ -14,13 +17,23 @@ from archivebox.base_models.models import ModelWithHealthStats
|
||||
from archivebox.workers.models import BaseStateMachine
|
||||
from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats
|
||||
|
||||
try:
|
||||
import psutil
|
||||
PSUTIL_AVAILABLE = True
|
||||
except ImportError:
|
||||
PSUTIL_AVAILABLE = False
|
||||
|
||||
_CURRENT_MACHINE = None
|
||||
_CURRENT_INTERFACE = None
|
||||
_CURRENT_BINARIES = {}
|
||||
_CURRENT_PROCESS = None
|
||||
|
||||
MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60
|
||||
NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60
|
||||
BINARY_RECHECK_INTERVAL = 1 * 30 * 60
|
||||
PROCESS_RECHECK_INTERVAL = 60 # Re-validate every 60 seconds
|
||||
PID_REUSE_WINDOW = timedelta(hours=24) # Max age for considering a PID match valid
|
||||
START_TIME_TOLERANCE = 5.0 # Seconds tolerance for start time matching
|
||||
|
||||
|
||||
class MachineManager(models.Manager):
|
||||
@@ -458,6 +471,56 @@ class Binary(ModelWithHealthStats):
|
||||
class ProcessManager(models.Manager):
|
||||
"""Manager for Process model."""
|
||||
|
||||
def current(self) -> 'Process':
|
||||
"""Get the Process record for the current OS process."""
|
||||
return Process.current()
|
||||
|
||||
def get_by_pid(self, pid: int, machine: 'Machine' = None) -> 'Process | None':
|
||||
"""
|
||||
Find a Process by PID with proper validation against PID reuse.
|
||||
|
||||
IMPORTANT: PIDs are reused by the OS! This method:
|
||||
1. Filters by machine (required - PIDs are only unique per machine)
|
||||
2. Filters by time window (processes older than 24h are stale)
|
||||
3. Validates via psutil that start times match
|
||||
|
||||
Args:
|
||||
pid: OS process ID
|
||||
machine: Machine instance (defaults to current machine)
|
||||
|
||||
Returns:
|
||||
Process if found and validated, None otherwise
|
||||
"""
|
||||
if not PSUTIL_AVAILABLE:
|
||||
return None
|
||||
|
||||
machine = machine or Machine.current()
|
||||
|
||||
# Get the actual process start time from OS
|
||||
try:
|
||||
os_proc = psutil.Process(pid)
|
||||
os_start_time = os_proc.create_time()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
# Process doesn't exist - any DB record with this PID is stale
|
||||
return None
|
||||
|
||||
# Query candidates: same machine, same PID, recent, still RUNNING
|
||||
candidates = self.filter(
|
||||
machine=machine,
|
||||
pid=pid,
|
||||
status=Process.StatusChoices.RUNNING,
|
||||
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
|
||||
).order_by('-started_at')
|
||||
|
||||
for candidate in candidates:
|
||||
# Validate start time matches (within tolerance)
|
||||
if candidate.started_at:
|
||||
db_start_time = candidate.started_at.timestamp()
|
||||
if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
|
||||
return candidate
|
||||
|
||||
return None
|
||||
|
||||
def create_for_archiveresult(self, archiveresult, **kwargs):
|
||||
"""
|
||||
Create a Process record for an ArchiveResult.
|
||||
@@ -500,11 +563,38 @@ class Process(ModelWithHealthStats):
|
||||
RUNNING = 'running', 'Running'
|
||||
EXITED = 'exited', 'Exited'
|
||||
|
||||
class TypeChoices(models.TextChoices):
|
||||
CLI = 'cli', 'CLI Command'
|
||||
SUPERVISORD = 'supervisord', 'Supervisord Daemon'
|
||||
ORCHESTRATOR = 'orchestrator', 'Orchestrator'
|
||||
WORKER = 'worker', 'Worker Process'
|
||||
HOOK = 'hook', 'Hook Script'
|
||||
BINARY = 'binary', 'Binary Execution'
|
||||
|
||||
# Primary fields
|
||||
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
|
||||
created_at = models.DateTimeField(default=timezone.now, db_index=True)
|
||||
modified_at = models.DateTimeField(auto_now=True)
|
||||
|
||||
# Parent process FK for hierarchy tracking
|
||||
parent = models.ForeignKey(
|
||||
'self',
|
||||
on_delete=models.SET_NULL,
|
||||
null=True,
|
||||
blank=True,
|
||||
related_name='children',
|
||||
help_text='Parent process that spawned this one'
|
||||
)
|
||||
|
||||
# Process type for distinguishing in hierarchy
|
||||
process_type = models.CharField(
|
||||
max_length=16,
|
||||
choices=TypeChoices.choices,
|
||||
default=TypeChoices.BINARY,
|
||||
db_index=True,
|
||||
help_text='Type of process in the execution hierarchy'
|
||||
)
|
||||
|
||||
# Machine FK - required (every process runs on a machine)
|
||||
machine = models.ForeignKey(
|
||||
Machine,
|
||||
@@ -592,6 +682,8 @@ class Process(ModelWithHealthStats):
|
||||
indexes = [
|
||||
models.Index(fields=['machine', 'status', 'retry_at']),
|
||||
models.Index(fields=['binary', 'exit_code']),
|
||||
models.Index(fields=['parent', 'status']),
|
||||
models.Index(fields=['machine', 'pid', 'started_at']),
|
||||
]
|
||||
|
||||
def __str__(self) -> str:
|
||||
@@ -660,6 +752,774 @@ class Process(ModelWithHealthStats):
|
||||
self.modified_at = timezone.now()
|
||||
self.save()
|
||||
|
||||
# =========================================================================
|
||||
# Process.current() and hierarchy methods
|
||||
# =========================================================================
|
||||
|
||||
@classmethod
|
||||
def current(cls) -> 'Process':
|
||||
"""
|
||||
Get or create the Process record for the current OS process.
|
||||
|
||||
Similar to Machine.current(), this:
|
||||
1. Checks cache for existing Process with matching PID
|
||||
2. Validates the cached Process is still valid (PID not reused)
|
||||
3. Creates new Process if needed
|
||||
|
||||
IMPORTANT: Uses psutil to validate PID hasn't been reused.
|
||||
PIDs are recycled by OS, so we compare start times.
|
||||
"""
|
||||
global _CURRENT_PROCESS
|
||||
|
||||
current_pid = os.getpid()
|
||||
machine = Machine.current()
|
||||
|
||||
# Check cache validity
|
||||
if _CURRENT_PROCESS:
|
||||
# Verify: same PID, same machine, cache not expired
|
||||
if (_CURRENT_PROCESS.pid == current_pid and
|
||||
_CURRENT_PROCESS.machine_id == machine.id and
|
||||
timezone.now() < _CURRENT_PROCESS.modified_at + timedelta(seconds=PROCESS_RECHECK_INTERVAL)):
|
||||
return _CURRENT_PROCESS
|
||||
_CURRENT_PROCESS = None
|
||||
|
||||
# Get actual process start time from OS for validation
|
||||
os_start_time = None
|
||||
if PSUTIL_AVAILABLE:
|
||||
try:
|
||||
os_proc = psutil.Process(current_pid)
|
||||
os_start_time = os_proc.create_time()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
# Try to find existing Process for this PID on this machine
|
||||
# Filter by: machine + PID + RUNNING + recent + start time matches
|
||||
if os_start_time:
|
||||
existing = cls.objects.filter(
|
||||
machine=machine,
|
||||
pid=current_pid,
|
||||
status=cls.StatusChoices.RUNNING,
|
||||
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
|
||||
).order_by('-started_at').first()
|
||||
|
||||
if existing and existing.started_at:
|
||||
db_start_time = existing.started_at.timestamp()
|
||||
if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
|
||||
_CURRENT_PROCESS = existing
|
||||
return existing
|
||||
|
||||
# No valid existing record - create new one
|
||||
parent = cls._find_parent_process(machine)
|
||||
process_type = cls._detect_process_type()
|
||||
|
||||
# Use psutil start time if available (more accurate than timezone.now())
|
||||
if os_start_time:
|
||||
started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone())
|
||||
else:
|
||||
started_at = timezone.now()
|
||||
|
||||
_CURRENT_PROCESS = cls.objects.create(
|
||||
machine=machine,
|
||||
parent=parent,
|
||||
process_type=process_type,
|
||||
cmd=sys.argv,
|
||||
pwd=os.getcwd(),
|
||||
pid=current_pid,
|
||||
started_at=started_at,
|
||||
status=cls.StatusChoices.RUNNING,
|
||||
)
|
||||
return _CURRENT_PROCESS
|
||||
|
||||
@classmethod
|
||||
def _find_parent_process(cls, machine: 'Machine' = None) -> 'Process | None':
|
||||
"""
|
||||
Find the parent Process record by looking up PPID.
|
||||
|
||||
IMPORTANT: Validates against PID reuse by checking:
|
||||
1. Same machine (PIDs are only unique per machine)
|
||||
2. Start time matches OS process start time
|
||||
3. Process is still RUNNING and recent
|
||||
|
||||
Returns None if parent is not an ArchiveBox process.
|
||||
"""
|
||||
if not PSUTIL_AVAILABLE:
|
||||
return None
|
||||
|
||||
ppid = os.getppid()
|
||||
machine = machine or Machine.current()
|
||||
|
||||
# Get parent process start time from OS
|
||||
try:
|
||||
os_parent = psutil.Process(ppid)
|
||||
os_parent_start = os_parent.create_time()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
return None # Parent process doesn't exist
|
||||
|
||||
# Find matching Process record
|
||||
candidates = cls.objects.filter(
|
||||
machine=machine,
|
||||
pid=ppid,
|
||||
status=cls.StatusChoices.RUNNING,
|
||||
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
|
||||
).order_by('-started_at')
|
||||
|
||||
for candidate in candidates:
|
||||
if candidate.started_at:
|
||||
db_start_time = candidate.started_at.timestamp()
|
||||
if abs(db_start_time - os_parent_start) < START_TIME_TOLERANCE:
|
||||
return candidate
|
||||
|
||||
return None # No matching ArchiveBox parent process
|
||||
|
||||
@classmethod
|
||||
def _detect_process_type(cls) -> str:
|
||||
"""
|
||||
Detect the type of the current process from sys.argv.
|
||||
"""
|
||||
argv_str = ' '.join(sys.argv).lower()
|
||||
|
||||
if 'supervisord' in argv_str:
|
||||
return cls.TypeChoices.SUPERVISORD
|
||||
elif 'orchestrator' in argv_str:
|
||||
return cls.TypeChoices.ORCHESTRATOR
|
||||
elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']):
|
||||
return cls.TypeChoices.WORKER
|
||||
elif 'archivebox' in argv_str:
|
||||
return cls.TypeChoices.CLI
|
||||
else:
|
||||
return cls.TypeChoices.BINARY
|
||||
|
||||
@classmethod
|
||||
def cleanup_stale_running(cls, machine: 'Machine' = None) -> int:
|
||||
"""
|
||||
Mark stale RUNNING processes as EXITED.
|
||||
|
||||
Processes are stale if:
|
||||
- Status is RUNNING but OS process no longer exists
|
||||
- Status is RUNNING but started_at is older than PID_REUSE_WINDOW
|
||||
|
||||
Returns count of processes cleaned up.
|
||||
"""
|
||||
machine = machine or Machine.current()
|
||||
cleaned = 0
|
||||
|
||||
stale = cls.objects.filter(
|
||||
machine=machine,
|
||||
status=cls.StatusChoices.RUNNING,
|
||||
)
|
||||
|
||||
for proc in stale:
|
||||
is_stale = False
|
||||
|
||||
# Check if too old (PID definitely reused)
|
||||
if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW:
|
||||
is_stale = True
|
||||
elif PSUTIL_AVAILABLE:
|
||||
# Check if OS process still exists with matching start time
|
||||
try:
|
||||
os_proc = psutil.Process(proc.pid)
|
||||
if proc.started_at:
|
||||
db_start = proc.started_at.timestamp()
|
||||
os_start = os_proc.create_time()
|
||||
if abs(db_start - os_start) > START_TIME_TOLERANCE:
|
||||
is_stale = True # PID reused by different process
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
is_stale = True # Process no longer exists
|
||||
|
||||
if is_stale:
|
||||
proc.status = cls.StatusChoices.EXITED
|
||||
proc.ended_at = proc.ended_at or timezone.now()
|
||||
proc.exit_code = proc.exit_code if proc.exit_code is not None else -1
|
||||
proc.save(update_fields=['status', 'ended_at', 'exit_code'])
|
||||
cleaned += 1
|
||||
|
||||
return cleaned
|
||||
|
||||
# =========================================================================
|
||||
# Tree traversal properties
|
||||
# =========================================================================
|
||||
|
||||
@property
|
||||
def root(self) -> 'Process':
|
||||
"""Get the root process (CLI command) of this hierarchy."""
|
||||
proc = self
|
||||
while proc.parent_id:
|
||||
proc = proc.parent
|
||||
return proc
|
||||
|
||||
@property
|
||||
def ancestors(self) -> list['Process']:
|
||||
"""Get all ancestor processes from parent to root."""
|
||||
ancestors = []
|
||||
proc = self.parent
|
||||
while proc:
|
||||
ancestors.append(proc)
|
||||
proc = proc.parent
|
||||
return ancestors
|
||||
|
||||
@property
|
||||
def depth(self) -> int:
|
||||
"""Get depth in the process tree (0 = root)."""
|
||||
return len(self.ancestors)
|
||||
|
||||
def get_descendants(self, include_self: bool = False):
|
||||
"""Get all descendant processes recursively."""
|
||||
if include_self:
|
||||
pks = [self.pk]
|
||||
else:
|
||||
pks = []
|
||||
|
||||
children = list(self.children.values_list('pk', flat=True))
|
||||
while children:
|
||||
pks.extend(children)
|
||||
children = list(Process.objects.filter(parent_id__in=children).values_list('pk', flat=True))
|
||||
|
||||
return Process.objects.filter(pk__in=pks)
|
||||
|
||||
# =========================================================================
|
||||
# Validated psutil access via .proc property
|
||||
# =========================================================================
|
||||
|
||||
@property
|
||||
def proc(self) -> 'psutil.Process | None':
|
||||
"""
|
||||
Get validated psutil.Process for this record.
|
||||
|
||||
Returns psutil.Process ONLY if:
|
||||
1. Process with this PID exists in OS
|
||||
2. OS process start time matches our started_at (within tolerance)
|
||||
3. Process is on current machine
|
||||
|
||||
Returns None if:
|
||||
- PID doesn't exist (process exited)
|
||||
- PID was reused by a different process (start times don't match)
|
||||
- We're on a different machine than where process ran
|
||||
- psutil is not available
|
||||
|
||||
This prevents accidentally matching a stale/recycled PID.
|
||||
"""
|
||||
if not PSUTIL_AVAILABLE:
|
||||
return None
|
||||
|
||||
# Can't get psutil.Process if we don't have a PID
|
||||
if not self.pid:
|
||||
return None
|
||||
|
||||
# Can't validate processes on other machines
|
||||
if self.machine_id != Machine.current().id:
|
||||
return None
|
||||
|
||||
try:
|
||||
os_proc = psutil.Process(self.pid)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
return None # Process no longer exists
|
||||
|
||||
# Validate start time matches to prevent PID reuse confusion
|
||||
if self.started_at:
|
||||
os_start_time = os_proc.create_time()
|
||||
db_start_time = self.started_at.timestamp()
|
||||
|
||||
if abs(os_start_time - db_start_time) > START_TIME_TOLERANCE:
|
||||
# PID has been reused by a different process!
|
||||
return None
|
||||
|
||||
# Optionally validate command matches (extra safety)
|
||||
if self.cmd:
|
||||
try:
|
||||
os_cmdline = os_proc.cmdline()
|
||||
# Check if first arg (binary) matches
|
||||
if os_cmdline and self.cmd:
|
||||
os_binary = os_cmdline[0] if os_cmdline else ''
|
||||
db_binary = self.cmd[0] if self.cmd else ''
|
||||
# Match by basename (handles /usr/bin/python3 vs python3)
|
||||
if os_binary and db_binary:
|
||||
if Path(os_binary).name != Path(db_binary).name:
|
||||
return None # Different binary, PID reused
|
||||
except (psutil.AccessDenied, psutil.ZombieProcess):
|
||||
pass # Can't check cmdline, trust start time match
|
||||
|
||||
return os_proc
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""
|
||||
Check if process is currently running via psutil.
|
||||
|
||||
More reliable than checking status field since it validates
|
||||
the actual OS process exists and matches our record.
|
||||
"""
|
||||
proc = self.proc
|
||||
return proc is not None and proc.is_running()
|
||||
|
||||
def is_alive(self) -> bool:
|
||||
"""
|
||||
Alias for is_running, for compatibility with subprocess.Popen API.
|
||||
"""
|
||||
return self.is_running
|
||||
|
||||
def get_memory_info(self) -> dict | None:
|
||||
"""Get memory usage if process is running."""
|
||||
proc = self.proc
|
||||
if proc:
|
||||
try:
|
||||
mem = proc.memory_info()
|
||||
return {'rss': mem.rss, 'vms': mem.vms}
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
return None
|
||||
|
||||
def get_cpu_percent(self) -> float | None:
|
||||
"""Get CPU usage percentage if process is running."""
|
||||
proc = self.proc
|
||||
if proc:
|
||||
try:
|
||||
return proc.cpu_percent(interval=0.1)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
return None
|
||||
|
||||
def get_children_pids(self) -> list[int]:
|
||||
"""Get PIDs of child processes from OS (not DB)."""
|
||||
proc = self.proc
|
||||
if proc:
|
||||
try:
|
||||
return [child.pid for child in proc.children(recursive=True)]
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
return []
|
||||
|
||||
# =========================================================================
|
||||
# Lifecycle methods (launch, kill, poll, wait)
|
||||
# =========================================================================
|
||||
|
||||
@property
|
||||
def pid_file(self) -> Path:
|
||||
"""Path to PID file for this process."""
|
||||
return Path(self.pwd) / 'process.pid' if self.pwd else None
|
||||
|
||||
@property
|
||||
def cmd_file(self) -> Path:
|
||||
"""Path to cmd.sh script for this process."""
|
||||
return Path(self.pwd) / 'cmd.sh' if self.pwd else None
|
||||
|
||||
@property
|
||||
def stdout_file(self) -> Path:
|
||||
"""Path to stdout log."""
|
||||
return Path(self.pwd) / 'stdout.log' if self.pwd else None
|
||||
|
||||
@property
|
||||
def stderr_file(self) -> Path:
|
||||
"""Path to stderr log."""
|
||||
return Path(self.pwd) / 'stderr.log' if self.pwd else None
|
||||
|
||||
def _write_pid_file(self) -> None:
|
||||
"""Write PID file with mtime set to process start time."""
|
||||
from archivebox.misc.process_utils import write_pid_file_with_mtime
|
||||
if self.pid and self.started_at and self.pid_file:
|
||||
write_pid_file_with_mtime(
|
||||
self.pid_file,
|
||||
self.pid,
|
||||
self.started_at.timestamp()
|
||||
)
|
||||
|
||||
def _write_cmd_file(self) -> None:
|
||||
"""Write cmd.sh script for debugging/validation."""
|
||||
from archivebox.misc.process_utils import write_cmd_file
|
||||
if self.cmd and self.cmd_file:
|
||||
write_cmd_file(self.cmd_file, self.cmd)
|
||||
|
||||
def _build_env(self) -> dict:
|
||||
"""Build environment dict for subprocess, merging stored env with system."""
|
||||
env = os.environ.copy()
|
||||
env.update(self.env or {})
|
||||
return env
|
||||
|
||||
def launch(self, background: bool = False) -> 'Process':
|
||||
"""
|
||||
Spawn the subprocess and update this Process record.
|
||||
|
||||
Args:
|
||||
background: If True, don't wait for completion (for daemons/bg hooks)
|
||||
|
||||
Returns:
|
||||
self (updated with pid, started_at, etc.)
|
||||
"""
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
# Ensure output directory exists
|
||||
if self.pwd:
|
||||
Path(self.pwd).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write cmd.sh for debugging
|
||||
self._write_cmd_file()
|
||||
|
||||
stdout_path = self.stdout_file
|
||||
stderr_path = self.stderr_file
|
||||
|
||||
with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err:
|
||||
proc = subprocess.Popen(
|
||||
self.cmd,
|
||||
cwd=self.pwd,
|
||||
stdout=out,
|
||||
stderr=err,
|
||||
env=self._build_env(),
|
||||
)
|
||||
|
||||
# Get accurate start time from psutil if available
|
||||
if PSUTIL_AVAILABLE:
|
||||
try:
|
||||
ps_proc = psutil.Process(proc.pid)
|
||||
self.started_at = datetime.fromtimestamp(
|
||||
ps_proc.create_time(),
|
||||
tz=timezone.get_current_timezone()
|
||||
)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
self.started_at = timezone.now()
|
||||
else:
|
||||
self.started_at = timezone.now()
|
||||
|
||||
self.pid = proc.pid
|
||||
self.status = self.StatusChoices.RUNNING
|
||||
self.save()
|
||||
|
||||
self._write_pid_file()
|
||||
|
||||
if not background:
|
||||
try:
|
||||
proc.wait(timeout=self.timeout)
|
||||
self.exit_code = proc.returncode
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
proc.wait()
|
||||
self.exit_code = -1
|
||||
|
||||
self.ended_at = timezone.now()
|
||||
if stdout_path.exists():
|
||||
self.stdout = stdout_path.read_text()
|
||||
if stderr_path.exists():
|
||||
self.stderr = stderr_path.read_text()
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.save()
|
||||
|
||||
return self
|
||||
|
||||
def kill(self, signal_num: int = 15) -> bool:
|
||||
"""
|
||||
Kill this process and update status.
|
||||
|
||||
Uses self.proc for safe killing - only kills if PID matches
|
||||
our recorded process (prevents killing recycled PIDs).
|
||||
|
||||
Args:
|
||||
signal_num: Signal to send (default SIGTERM=15)
|
||||
|
||||
Returns:
|
||||
True if killed successfully, False otherwise
|
||||
"""
|
||||
# Use validated psutil.Process to ensure we're killing the right process
|
||||
proc = self.proc
|
||||
if proc is None:
|
||||
# Process doesn't exist or PID was recycled - just update status
|
||||
if self.status != self.StatusChoices.EXITED:
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = self.ended_at or timezone.now()
|
||||
self.save()
|
||||
return False
|
||||
|
||||
try:
|
||||
# Safe to kill - we validated it's our process via start time match
|
||||
proc.send_signal(signal_num)
|
||||
|
||||
# Update our record
|
||||
self.exit_code = -signal_num
|
||||
self.ended_at = timezone.now()
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.save()
|
||||
|
||||
# Clean up PID file
|
||||
if self.pid_file and self.pid_file.exists():
|
||||
self.pid_file.unlink(missing_ok=True)
|
||||
|
||||
return True
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, ProcessLookupError):
|
||||
# Process already exited between proc check and kill
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = self.ended_at or timezone.now()
|
||||
self.save()
|
||||
return False
|
||||
|
||||
def poll(self) -> int | None:
|
||||
"""
|
||||
Check if process has exited and update status if so.
|
||||
|
||||
Returns:
|
||||
exit_code if exited, None if still running
|
||||
"""
|
||||
if self.status == self.StatusChoices.EXITED:
|
||||
return self.exit_code
|
||||
|
||||
if not self.is_running:
|
||||
# Process exited - read output and update status
|
||||
if self.stdout_file and self.stdout_file.exists():
|
||||
self.stdout = self.stdout_file.read_text()
|
||||
if self.stderr_file and self.stderr_file.exists():
|
||||
self.stderr = self.stderr_file.read_text()
|
||||
|
||||
# Try to get exit code from proc or default to unknown
|
||||
self.exit_code = self.exit_code if self.exit_code is not None else -1
|
||||
self.ended_at = timezone.now()
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.save()
|
||||
return self.exit_code
|
||||
|
||||
return None # Still running
|
||||
|
||||
def wait(self, timeout: int | None = None) -> int:
|
||||
"""
|
||||
Wait for process to exit, polling periodically.
|
||||
|
||||
Args:
|
||||
timeout: Max seconds to wait (None = use self.timeout)
|
||||
|
||||
Returns:
|
||||
exit_code
|
||||
|
||||
Raises:
|
||||
TimeoutError if process doesn't exit in time
|
||||
"""
|
||||
import time
|
||||
|
||||
timeout = timeout or self.timeout
|
||||
start = time.time()
|
||||
|
||||
while True:
|
||||
exit_code = self.poll()
|
||||
if exit_code is not None:
|
||||
return exit_code
|
||||
|
||||
if time.time() - start > timeout:
|
||||
raise TimeoutError(f"Process {self.id} did not exit within {timeout}s")
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
def terminate(self, graceful_timeout: float = 5.0) -> bool:
|
||||
"""
|
||||
Gracefully terminate process: SIGTERM → wait → SIGKILL.
|
||||
|
||||
This consolidates the scattered SIGTERM/SIGKILL logic from:
|
||||
- crawls/models.py Crawl.cleanup()
|
||||
- workers/pid_utils.py stop_worker()
|
||||
- supervisord_util.py stop_existing_supervisord_process()
|
||||
|
||||
Args:
|
||||
graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
|
||||
|
||||
Returns:
|
||||
True if process was terminated, False if already dead
|
||||
"""
|
||||
import time
|
||||
import signal
|
||||
|
||||
proc = self.proc
|
||||
if proc is None:
|
||||
# Already dead - just update status
|
||||
if self.status != self.StatusChoices.EXITED:
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = self.ended_at or timezone.now()
|
||||
self.save()
|
||||
return False
|
||||
|
||||
try:
|
||||
# Step 1: Send SIGTERM for graceful shutdown
|
||||
proc.terminate()
|
||||
|
||||
# Step 2: Wait for graceful exit
|
||||
try:
|
||||
proc.wait(timeout=graceful_timeout)
|
||||
# Process exited gracefully
|
||||
self.exit_code = proc.returncode if hasattr(proc, 'returncode') else 0
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = timezone.now()
|
||||
self.save()
|
||||
return True
|
||||
except psutil.TimeoutExpired:
|
||||
pass # Still running, need to force kill
|
||||
|
||||
# Step 3: Force kill with SIGKILL
|
||||
proc.kill()
|
||||
proc.wait(timeout=2)
|
||||
|
||||
self.exit_code = -signal.SIGKILL
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = timezone.now()
|
||||
self.save()
|
||||
return True
|
||||
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
# Process already dead
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = self.ended_at or timezone.now()
|
||||
self.save()
|
||||
return False
|
||||
|
||||
def kill_tree(self, graceful_timeout: float = 2.0) -> int:
|
||||
"""
|
||||
Kill this process and all its children (OS children, not DB children).
|
||||
|
||||
This consolidates the scattered child-killing logic from:
|
||||
- crawls/models.py Crawl.cleanup() os.killpg()
|
||||
- supervisord_util.py stop_existing_supervisord_process()
|
||||
|
||||
Args:
|
||||
graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
|
||||
|
||||
Returns:
|
||||
Number of processes killed (including self)
|
||||
"""
|
||||
import signal
|
||||
|
||||
killed_count = 0
|
||||
proc = self.proc
|
||||
if proc is None:
|
||||
# Already dead
|
||||
if self.status != self.StatusChoices.EXITED:
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = self.ended_at or timezone.now()
|
||||
self.save()
|
||||
return 0
|
||||
|
||||
try:
|
||||
# Get all children before killing parent
|
||||
children = proc.children(recursive=True)
|
||||
|
||||
# Kill children first (reverse order - deepest first)
|
||||
for child in reversed(children):
|
||||
try:
|
||||
child.terminate()
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
# Wait briefly for children to exit
|
||||
gone, alive = psutil.wait_procs(children, timeout=graceful_timeout)
|
||||
killed_count += len(gone)
|
||||
|
||||
# Force kill remaining children
|
||||
for child in alive:
|
||||
try:
|
||||
child.kill()
|
||||
killed_count += 1
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
# Now kill self
|
||||
if self.terminate(graceful_timeout=graceful_timeout):
|
||||
killed_count += 1
|
||||
|
||||
return killed_count
|
||||
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
# Process tree already dead
|
||||
self.status = self.StatusChoices.EXITED
|
||||
self.ended_at = self.ended_at or timezone.now()
|
||||
self.save()
|
||||
return killed_count
|
||||
|
||||
def kill_children_db(self) -> int:
|
||||
"""
|
||||
Kill all DB-tracked child processes (via parent FK).
|
||||
|
||||
Different from kill_tree() which uses OS children.
|
||||
This kills processes created via Process.create(parent=self).
|
||||
|
||||
Returns:
|
||||
Number of child Process records killed
|
||||
"""
|
||||
killed = 0
|
||||
for child in self.children.filter(status=self.StatusChoices.RUNNING):
|
||||
if child.terminate():
|
||||
killed += 1
|
||||
return killed
|
||||
|
||||
# =========================================================================
|
||||
# Class methods for querying processes
|
||||
# =========================================================================
|
||||
|
||||
@classmethod
|
||||
def get_running(cls, process_type: str = None, machine: 'Machine' = None) -> 'QuerySet[Process]':
|
||||
"""
|
||||
Get all running processes, optionally filtered by type.
|
||||
|
||||
Replaces:
|
||||
- workers/pid_utils.py get_all_worker_pids()
|
||||
- workers/orchestrator.py get_total_worker_count()
|
||||
|
||||
Args:
|
||||
process_type: Filter by TypeChoices (e.g., 'worker', 'hook')
|
||||
machine: Filter by machine (defaults to current)
|
||||
|
||||
Returns:
|
||||
QuerySet of running Process records
|
||||
"""
|
||||
machine = machine or Machine.current()
|
||||
qs = cls.objects.filter(
|
||||
machine=machine,
|
||||
status=cls.StatusChoices.RUNNING,
|
||||
)
|
||||
if process_type:
|
||||
qs = qs.filter(process_type=process_type)
|
||||
return qs
|
||||
|
||||
@classmethod
|
||||
def get_running_count(cls, process_type: str = None, machine: 'Machine' = None) -> int:
|
||||
"""
|
||||
Get count of running processes.
|
||||
|
||||
Replaces:
|
||||
- workers/pid_utils.py get_running_worker_count()
|
||||
"""
|
||||
return cls.get_running(process_type=process_type, machine=machine).count()
|
||||
|
||||
@classmethod
|
||||
def stop_all(cls, process_type: str = None, machine: 'Machine' = None, graceful: bool = True) -> int:
|
||||
"""
|
||||
Stop all running processes of a given type.
|
||||
|
||||
Args:
|
||||
process_type: Filter by TypeChoices
|
||||
machine: Filter by machine
|
||||
graceful: If True, use terminate() (SIGTERM→SIGKILL), else kill()
|
||||
|
||||
Returns:
|
||||
Number of processes stopped
|
||||
"""
|
||||
stopped = 0
|
||||
for proc in cls.get_running(process_type=process_type, machine=machine):
|
||||
if graceful:
|
||||
if proc.terminate():
|
||||
stopped += 1
|
||||
else:
|
||||
if proc.kill():
|
||||
stopped += 1
|
||||
return stopped
|
||||
|
||||
@classmethod
|
||||
def get_next_worker_id(cls, process_type: str = 'worker', machine: 'Machine' = None) -> int:
|
||||
"""
|
||||
Get the next available worker ID for spawning new workers.
|
||||
|
||||
Replaces workers/pid_utils.py get_next_worker_id().
|
||||
Simply returns count of running workers of this type.
|
||||
|
||||
Args:
|
||||
process_type: Worker type to count
|
||||
machine: Machine to scope query
|
||||
|
||||
Returns:
|
||||
Next available worker ID (0-indexed)
|
||||
"""
|
||||
return cls.get_running_count(process_type=process_type, machine=machine)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Binary State Machine
|
||||
|
||||
Reference in New Issue
Block a user