mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-02 17:05:38 +10:00
1807 lines
67 KiB
Python
Executable File
1807 lines
67 KiB
Python
Executable File
__package__ = 'archivebox.machine'
|
|
|
|
import os
|
|
import sys
|
|
import socket
|
|
from pathlib import Path
|
|
from archivebox.uuid_compat import uuid7
|
|
from datetime import timedelta, datetime
|
|
|
|
from statemachine import State, registry
|
|
|
|
from django.db import models
|
|
from django.utils import timezone
|
|
from django.utils.functional import cached_property
|
|
|
|
from archivebox.base_models.models import ModelWithHealthStats
|
|
from archivebox.workers.models import BaseStateMachine
|
|
from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats
|
|
|
|
try:
|
|
import psutil
|
|
PSUTIL_AVAILABLE = True
|
|
except ImportError:
|
|
PSUTIL_AVAILABLE = False
|
|
|
|
_CURRENT_MACHINE = None
|
|
_CURRENT_INTERFACE = None
|
|
_CURRENT_BINARIES = {}
|
|
_CURRENT_PROCESS = None
|
|
|
|
MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60
|
|
NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60
|
|
BINARY_RECHECK_INTERVAL = 1 * 30 * 60
|
|
PROCESS_RECHECK_INTERVAL = 60 # Re-validate every 60 seconds
|
|
PID_REUSE_WINDOW = timedelta(hours=24) # Max age for considering a PID match valid
|
|
START_TIME_TOLERANCE = 5.0 # Seconds tolerance for start time matching
|
|
|
|
|
|
class MachineManager(models.Manager):
|
|
def current(self) -> 'Machine':
|
|
return Machine.current()
|
|
|
|
|
|
class Machine(ModelWithHealthStats):
|
|
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
|
|
created_at = models.DateTimeField(default=timezone.now, db_index=True)
|
|
modified_at = models.DateTimeField(auto_now=True)
|
|
guid = models.CharField(max_length=64, default=None, null=False, unique=True, editable=False)
|
|
hostname = models.CharField(max_length=63, default=None, null=False)
|
|
hw_in_docker = models.BooleanField(default=False, null=False)
|
|
hw_in_vm = models.BooleanField(default=False, null=False)
|
|
hw_manufacturer = models.CharField(max_length=63, default=None, null=False)
|
|
hw_product = models.CharField(max_length=63, default=None, null=False)
|
|
hw_uuid = models.CharField(max_length=255, default=None, null=False)
|
|
os_arch = models.CharField(max_length=15, default=None, null=False)
|
|
os_family = models.CharField(max_length=15, default=None, null=False)
|
|
os_platform = models.CharField(max_length=63, default=None, null=False)
|
|
os_release = models.CharField(max_length=63, default=None, null=False)
|
|
os_kernel = models.CharField(max_length=255, default=None, null=False)
|
|
stats = models.JSONField(default=dict, null=True, blank=True)
|
|
config = models.JSONField(default=dict, null=True, blank=True,
|
|
help_text="Machine-specific config overrides (e.g., resolved binary paths like WGET_BINARY)")
|
|
num_uses_failed = models.PositiveIntegerField(default=0)
|
|
num_uses_succeeded = models.PositiveIntegerField(default=0)
|
|
|
|
objects: MachineManager = MachineManager()
|
|
networkinterface_set: models.Manager['NetworkInterface']
|
|
|
|
class Meta:
|
|
app_label = 'machine'
|
|
|
|
@classmethod
|
|
def current(cls) -> 'Machine':
|
|
global _CURRENT_MACHINE
|
|
if _CURRENT_MACHINE:
|
|
if timezone.now() < _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL):
|
|
return _CURRENT_MACHINE
|
|
_CURRENT_MACHINE = None
|
|
_CURRENT_MACHINE, _ = cls.objects.update_or_create(
|
|
guid=get_host_guid(),
|
|
defaults={'hostname': socket.gethostname(), **get_os_info(), **get_vm_info(), 'stats': get_host_stats()},
|
|
)
|
|
return _CURRENT_MACHINE
|
|
|
|
def to_json(self) -> dict:
|
|
"""
|
|
Convert Machine model instance to a JSON-serializable dict.
|
|
"""
|
|
from archivebox.config import VERSION
|
|
return {
|
|
'type': 'Machine',
|
|
'schema_version': VERSION,
|
|
'id': str(self.id),
|
|
'guid': self.guid,
|
|
'hostname': self.hostname,
|
|
'hw_in_docker': self.hw_in_docker,
|
|
'hw_in_vm': self.hw_in_vm,
|
|
'hw_manufacturer': self.hw_manufacturer,
|
|
'hw_product': self.hw_product,
|
|
'hw_uuid': self.hw_uuid,
|
|
'os_arch': self.os_arch,
|
|
'os_family': self.os_family,
|
|
'os_platform': self.os_platform,
|
|
'os_kernel': self.os_kernel,
|
|
'os_release': self.os_release,
|
|
'stats': self.stats,
|
|
'config': self.config or {},
|
|
}
|
|
|
|
@staticmethod
|
|
def from_json(record: dict, overrides: dict = None):
|
|
"""
|
|
Update Machine config from JSON dict.
|
|
|
|
Args:
|
|
record: JSON dict with '_method': 'update', 'key': '...', 'value': '...'
|
|
overrides: Not used
|
|
|
|
Returns:
|
|
Machine instance or None
|
|
"""
|
|
method = record.get('_method')
|
|
if method == 'update':
|
|
key = record.get('key')
|
|
value = record.get('value')
|
|
if key and value:
|
|
machine = Machine.current()
|
|
if not machine.config:
|
|
machine.config = {}
|
|
machine.config[key] = value
|
|
machine.save(update_fields=['config'])
|
|
return machine
|
|
return None
|
|
|
|
|
|
class NetworkInterfaceManager(models.Manager):
|
|
def current(self) -> 'NetworkInterface':
|
|
return NetworkInterface.current()
|
|
|
|
|
|
class NetworkInterface(ModelWithHealthStats):
|
|
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
|
|
created_at = models.DateTimeField(default=timezone.now, db_index=True)
|
|
modified_at = models.DateTimeField(auto_now=True)
|
|
machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False)
|
|
mac_address = models.CharField(max_length=17, default=None, null=False, editable=False)
|
|
ip_public = models.GenericIPAddressField(default=None, null=False, editable=False)
|
|
ip_local = models.GenericIPAddressField(default=None, null=False, editable=False)
|
|
dns_server = models.GenericIPAddressField(default=None, null=False, editable=False)
|
|
hostname = models.CharField(max_length=63, default=None, null=False)
|
|
iface = models.CharField(max_length=15, default=None, null=False)
|
|
isp = models.CharField(max_length=63, default=None, null=False)
|
|
city = models.CharField(max_length=63, default=None, null=False)
|
|
region = models.CharField(max_length=63, default=None, null=False)
|
|
country = models.CharField(max_length=63, default=None, null=False)
|
|
# num_uses_failed = models.PositiveIntegerField(default=0) # from ModelWithHealthStats
|
|
# num_uses_succeeded = models.PositiveIntegerField(default=0) # from ModelWithHealthStats
|
|
|
|
objects: NetworkInterfaceManager = NetworkInterfaceManager()
|
|
|
|
class Meta:
|
|
app_label = 'machine'
|
|
unique_together = (('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),)
|
|
|
|
@classmethod
|
|
def current(cls) -> 'NetworkInterface':
|
|
global _CURRENT_INTERFACE
|
|
if _CURRENT_INTERFACE:
|
|
if timezone.now() < _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL):
|
|
return _CURRENT_INTERFACE
|
|
_CURRENT_INTERFACE = None
|
|
machine = Machine.objects.current()
|
|
net_info = get_host_network()
|
|
_CURRENT_INTERFACE, _ = cls.objects.update_or_create(
|
|
machine=machine, ip_public=net_info.pop('ip_public'), ip_local=net_info.pop('ip_local'),
|
|
mac_address=net_info.pop('mac_address'), dns_server=net_info.pop('dns_server'), defaults=net_info,
|
|
)
|
|
return _CURRENT_INTERFACE
|
|
|
|
|
|
|
|
class BinaryManager(models.Manager):
|
|
def get_from_db_or_cache(self, name: str, abspath: str = '', version: str = '', sha256: str = '', binprovider: str = 'env') -> 'Binary':
|
|
"""Get or create an Binary record from the database or cache."""
|
|
global _CURRENT_BINARIES
|
|
cached = _CURRENT_BINARIES.get(name)
|
|
if cached and timezone.now() < cached.modified_at + timedelta(seconds=BINARY_RECHECK_INTERVAL):
|
|
return cached
|
|
_CURRENT_BINARIES[name], _ = self.update_or_create(
|
|
machine=Machine.objects.current(), name=name, binprovider=binprovider,
|
|
version=version, abspath=abspath, sha256=sha256,
|
|
)
|
|
return _CURRENT_BINARIES[name]
|
|
|
|
def get_valid_binary(self, name: str, machine: 'Machine | None' = None) -> 'Binary | None':
|
|
"""Get a valid Binary for the given name on the current machine, or None if not found."""
|
|
machine = machine or Machine.current()
|
|
return self.filter(
|
|
machine=machine,
|
|
name__iexact=name,
|
|
).exclude(abspath='').exclude(abspath__isnull=True).order_by('-modified_at').first()
|
|
|
|
|
|
class Binary(ModelWithHealthStats):
|
|
"""
|
|
Tracks an binary on a specific machine.
|
|
|
|
Follows the unified state machine pattern:
|
|
- queued: Binary needs to be installed
|
|
- started: Installation in progress
|
|
- succeeded: Binary installed successfully (abspath, version, sha256 populated)
|
|
- failed: Installation failed
|
|
|
|
State machine calls run() which executes on_Binary__install_* hooks
|
|
to install the binary using the specified providers.
|
|
"""
|
|
|
|
class StatusChoices(models.TextChoices):
|
|
QUEUED = 'queued', 'Queued'
|
|
STARTED = 'started', 'Started'
|
|
SUCCEEDED = 'succeeded', 'Succeeded'
|
|
FAILED = 'failed', 'Failed'
|
|
|
|
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
|
|
created_at = models.DateTimeField(default=timezone.now, db_index=True)
|
|
modified_at = models.DateTimeField(auto_now=True)
|
|
machine = models.ForeignKey(Machine, on_delete=models.CASCADE, null=False)
|
|
|
|
# Binary metadata
|
|
name = models.CharField(max_length=63, default='', null=False, blank=True, db_index=True)
|
|
binproviders = models.CharField(max_length=127, default='env', null=False, blank=True,
|
|
help_text="Comma-separated list of allowed providers: apt,brew,pip,npm,env")
|
|
overrides = models.JSONField(default=dict, blank=True,
|
|
help_text="Provider-specific overrides: {'apt': {'packages': ['pkg']}, ...}")
|
|
|
|
# Installation results (populated after installation)
|
|
binprovider = models.CharField(max_length=31, default='', null=False, blank=True,
|
|
help_text="Provider that successfully installed this binary")
|
|
abspath = models.CharField(max_length=255, default='', null=False, blank=True)
|
|
version = models.CharField(max_length=32, default='', null=False, blank=True)
|
|
sha256 = models.CharField(max_length=64, default='', null=False, blank=True)
|
|
|
|
# State machine fields
|
|
status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.QUEUED, db_index=True)
|
|
retry_at = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True,
|
|
help_text="When to retry this binary installation")
|
|
output_dir = models.CharField(max_length=255, default='', null=False, blank=True,
|
|
help_text="Directory where installation hook logs are stored")
|
|
|
|
# Health stats
|
|
num_uses_failed = models.PositiveIntegerField(default=0)
|
|
num_uses_succeeded = models.PositiveIntegerField(default=0)
|
|
|
|
state_machine_name: str = 'archivebox.machine.models.BinaryMachine'
|
|
|
|
objects: BinaryManager = BinaryManager()
|
|
|
|
class Meta:
|
|
app_label = 'machine'
|
|
verbose_name = 'Binary'
|
|
verbose_name_plural = 'Binaries'
|
|
unique_together = (('machine', 'name', 'abspath', 'version', 'sha256'),)
|
|
|
|
def __str__(self) -> str:
|
|
return f'{self.name}@{self.binprovider}+{self.abspath}@{self.version}'
|
|
|
|
@property
|
|
def is_valid(self) -> bool:
|
|
"""A binary is valid if it has both abspath and version set."""
|
|
return bool(self.abspath) and bool(self.version)
|
|
|
|
@cached_property
|
|
def binary_info(self) -> dict:
|
|
"""Return info about the binary."""
|
|
return {
|
|
'name': self.name,
|
|
'abspath': self.abspath,
|
|
'version': self.version,
|
|
'binprovider': self.binprovider,
|
|
'is_valid': self.is_valid,
|
|
}
|
|
|
|
def to_json(self) -> dict:
|
|
"""
|
|
Convert Binary model instance to a JSON-serializable dict.
|
|
"""
|
|
from archivebox.config import VERSION
|
|
return {
|
|
'type': 'Binary',
|
|
'schema_version': VERSION,
|
|
'id': str(self.id),
|
|
'machine_id': str(self.machine_id),
|
|
'name': self.name,
|
|
'binprovider': self.binprovider,
|
|
'abspath': self.abspath,
|
|
'version': self.version,
|
|
'sha256': self.sha256,
|
|
'status': self.status,
|
|
}
|
|
|
|
@staticmethod
|
|
def from_json(record: dict, overrides: dict = None):
|
|
"""
|
|
Create/update Binary from JSON dict.
|
|
|
|
Handles two cases:
|
|
1. From binaries.json: creates queued binary with name, binproviders, overrides
|
|
2. From hook output: updates binary with abspath, version, sha256, binprovider
|
|
|
|
Args:
|
|
record: JSON dict with 'name' and either:
|
|
- 'binproviders', 'overrides' (from binaries.json)
|
|
- 'abspath', 'version', 'sha256', 'binprovider' (from hook output)
|
|
overrides: Not used
|
|
|
|
Returns:
|
|
Binary instance or None
|
|
"""
|
|
name = record.get('name')
|
|
if not name:
|
|
return None
|
|
|
|
machine = Machine.current()
|
|
overrides = overrides or {}
|
|
|
|
# Case 1: From binaries.jsonl - create queued binary
|
|
if 'binproviders' in record or ('overrides' in record and not record.get('abspath')):
|
|
binary, created = Binary.objects.get_or_create(
|
|
machine=machine,
|
|
name=name,
|
|
defaults={
|
|
'binproviders': record.get('binproviders', 'env'),
|
|
'overrides': record.get('overrides', {}),
|
|
'status': Binary.StatusChoices.QUEUED,
|
|
'retry_at': timezone.now(),
|
|
}
|
|
)
|
|
return binary
|
|
|
|
# Case 2: From hook output - update with installation results
|
|
abspath = record.get('abspath')
|
|
version = record.get('version')
|
|
if not abspath or not version:
|
|
return None
|
|
|
|
binary, _ = Binary.objects.update_or_create(
|
|
machine=machine,
|
|
name=name,
|
|
defaults={
|
|
'abspath': abspath,
|
|
'version': version,
|
|
'sha256': record.get('sha256', ''),
|
|
'binprovider': record.get('binprovider', 'env'),
|
|
'status': Binary.StatusChoices.SUCCEEDED,
|
|
'retry_at': None,
|
|
}
|
|
)
|
|
return binary
|
|
|
|
@property
|
|
def OUTPUT_DIR(self):
|
|
"""Return the output directory for this binary installation."""
|
|
from pathlib import Path
|
|
from django.conf import settings
|
|
|
|
DATA_DIR = getattr(settings, 'DATA_DIR', Path.cwd())
|
|
return Path(DATA_DIR) / 'machines' / str(self.machine_id) / 'binaries' / self.name / str(self.id)
|
|
|
|
def update_and_requeue(self, **kwargs):
|
|
"""
|
|
Update binary fields and requeue for worker state machine.
|
|
|
|
Sets modified_at to ensure workers pick up changes.
|
|
Always saves the model after updating.
|
|
"""
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
self.modified_at = timezone.now()
|
|
self.save()
|
|
|
|
def run(self):
|
|
"""
|
|
Execute binary installation by running on_Binary__install_* hooks.
|
|
|
|
Called by BinaryMachine when entering 'started' state.
|
|
Runs ALL on_Binary__install_* hooks - each hook checks binproviders
|
|
and decides if it can handle this binary. First hook to succeed wins.
|
|
Updates status to SUCCEEDED or FAILED based on hook output.
|
|
"""
|
|
import json
|
|
from archivebox.hooks import discover_hooks, run_hook
|
|
from archivebox.config.configset import get_config
|
|
|
|
# Get merged config (Binary doesn't have crawl/snapshot context)
|
|
config = get_config(scope='global')
|
|
|
|
# Create output directory
|
|
output_dir = self.OUTPUT_DIR
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.output_dir = str(output_dir)
|
|
self.save()
|
|
|
|
# Discover ALL on_Binary__install_* hooks
|
|
hooks = discover_hooks('Binary', config=config)
|
|
if not hooks:
|
|
self.status = self.StatusChoices.FAILED
|
|
self.save()
|
|
return
|
|
|
|
# Run each hook - they decide if they can handle this binary
|
|
for hook in hooks:
|
|
plugin_name = hook.parent.name
|
|
plugin_output_dir = output_dir / plugin_name
|
|
plugin_output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build kwargs for hook
|
|
hook_kwargs = {
|
|
'binary_id': str(self.id),
|
|
'machine_id': str(self.machine_id),
|
|
'name': self.name,
|
|
'binproviders': self.binproviders,
|
|
}
|
|
|
|
# Add overrides as JSON string if present
|
|
if self.overrides:
|
|
hook_kwargs['overrides'] = json.dumps(self.overrides)
|
|
|
|
# Run the hook
|
|
result = run_hook(
|
|
hook,
|
|
output_dir=plugin_output_dir,
|
|
config=config,
|
|
timeout=600, # 10 min timeout for binary installation
|
|
**hook_kwargs
|
|
)
|
|
|
|
# Background hook (unlikely for binary installation, but handle it)
|
|
if result is None:
|
|
continue
|
|
|
|
# Failed or skipped hook - try next one
|
|
if result['returncode'] != 0:
|
|
continue
|
|
|
|
# Parse JSONL output to check for successful installation
|
|
stdout_file = plugin_output_dir / 'stdout.log'
|
|
if stdout_file.exists():
|
|
stdout = stdout_file.read_text()
|
|
for line in stdout.splitlines():
|
|
if line.strip() and line.strip().startswith('{'):
|
|
try:
|
|
record = json.loads(line)
|
|
if record.get('type') == 'Binary' and record.get('abspath'):
|
|
# Update self from successful installation
|
|
self.abspath = record['abspath']
|
|
self.version = record.get('version', '')
|
|
self.sha256 = record.get('sha256', '')
|
|
self.binprovider = record.get('binprovider', 'env')
|
|
self.status = self.StatusChoices.SUCCEEDED
|
|
self.save()
|
|
return
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# No hook succeeded
|
|
self.status = self.StatusChoices.FAILED
|
|
self.save()
|
|
|
|
def cleanup(self):
|
|
"""
|
|
Clean up background binary installation hooks.
|
|
|
|
Called by state machine if needed (not typically used for binaries
|
|
since installations are foreground, but included for consistency).
|
|
"""
|
|
from pathlib import Path
|
|
from archivebox.misc.process_utils import safe_kill_process
|
|
|
|
output_dir = self.OUTPUT_DIR
|
|
if not output_dir.exists():
|
|
return
|
|
|
|
# Kill any background hooks
|
|
for plugin_dir in output_dir.iterdir():
|
|
if not plugin_dir.is_dir():
|
|
continue
|
|
pid_file = plugin_dir / 'hook.pid'
|
|
cmd_file = plugin_dir / 'cmd.sh'
|
|
safe_kill_process(pid_file, cmd_file)
|
|
|
|
|
|
# =============================================================================
|
|
# Process Model
|
|
# =============================================================================
|
|
|
|
class ProcessManager(models.Manager):
|
|
"""Manager for Process model."""
|
|
|
|
def current(self) -> 'Process':
|
|
"""Get the Process record for the current OS process."""
|
|
return Process.current()
|
|
|
|
def get_by_pid(self, pid: int, machine: 'Machine' = None) -> 'Process | None':
|
|
"""
|
|
Find a Process by PID with proper validation against PID reuse.
|
|
|
|
IMPORTANT: PIDs are reused by the OS! This method:
|
|
1. Filters by machine (required - PIDs are only unique per machine)
|
|
2. Filters by time window (processes older than 24h are stale)
|
|
3. Validates via psutil that start times match
|
|
|
|
Args:
|
|
pid: OS process ID
|
|
machine: Machine instance (defaults to current machine)
|
|
|
|
Returns:
|
|
Process if found and validated, None otherwise
|
|
"""
|
|
if not PSUTIL_AVAILABLE:
|
|
return None
|
|
|
|
machine = machine or Machine.current()
|
|
|
|
# Get the actual process start time from OS
|
|
try:
|
|
os_proc = psutil.Process(pid)
|
|
os_start_time = os_proc.create_time()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
# Process doesn't exist - any DB record with this PID is stale
|
|
return None
|
|
|
|
# Query candidates: same machine, same PID, recent, still RUNNING
|
|
candidates = self.filter(
|
|
machine=machine,
|
|
pid=pid,
|
|
status=Process.StatusChoices.RUNNING,
|
|
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
|
|
).order_by('-started_at')
|
|
|
|
for candidate in candidates:
|
|
# Validate start time matches (within tolerance)
|
|
if candidate.started_at:
|
|
db_start_time = candidate.started_at.timestamp()
|
|
if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
|
|
return candidate
|
|
|
|
return None
|
|
|
|
def create_for_archiveresult(self, archiveresult, **kwargs):
|
|
"""
|
|
Create a Process record for an ArchiveResult.
|
|
|
|
Called during migration and when creating new ArchiveResults.
|
|
"""
|
|
# Defaults from ArchiveResult if not provided
|
|
defaults = {
|
|
'machine': Machine.current(),
|
|
'pwd': kwargs.get('pwd') or str(archiveresult.snapshot.output_dir / archiveresult.plugin),
|
|
'cmd': kwargs.get('cmd') or [],
|
|
'status': 'queued',
|
|
'timeout': kwargs.get('timeout', 120),
|
|
'env': kwargs.get('env', {}),
|
|
}
|
|
defaults.update(kwargs)
|
|
|
|
process = self.create(**defaults)
|
|
return process
|
|
|
|
|
|
class Process(models.Model):
|
|
"""
|
|
Tracks a single OS process execution.
|
|
|
|
Process represents the actual subprocess spawned to execute a hook.
|
|
One Process can optionally be associated with an ArchiveResult (via OneToOne),
|
|
but Process can also exist standalone for internal operations.
|
|
|
|
Follows the unified state machine pattern:
|
|
- queued: Process ready to launch
|
|
- running: Process actively executing
|
|
- exited: Process completed (check exit_code for success/failure)
|
|
|
|
State machine calls launch() to spawn the process and monitors its lifecycle.
|
|
"""
|
|
|
|
class StatusChoices(models.TextChoices):
|
|
QUEUED = 'queued', 'Queued'
|
|
RUNNING = 'running', 'Running'
|
|
EXITED = 'exited', 'Exited'
|
|
|
|
class TypeChoices(models.TextChoices):
|
|
SUPERVISORD = 'supervisord', 'Supervisord'
|
|
ORCHESTRATOR = 'orchestrator', 'Orchestrator'
|
|
WORKER = 'worker', 'Worker'
|
|
CLI = 'cli', 'CLI'
|
|
BINARY = 'binary', 'Binary'
|
|
|
|
# Primary fields
|
|
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
|
|
created_at = models.DateTimeField(default=timezone.now, db_index=True)
|
|
modified_at = models.DateTimeField(auto_now=True)
|
|
|
|
# Machine FK - required (every process runs on a machine)
|
|
machine = models.ForeignKey(
|
|
Machine,
|
|
on_delete=models.CASCADE,
|
|
null=False,
|
|
related_name='process_set',
|
|
help_text='Machine where this process executed'
|
|
)
|
|
|
|
# Parent process (optional)
|
|
parent = models.ForeignKey(
|
|
'self',
|
|
on_delete=models.SET_NULL,
|
|
null=True, blank=True,
|
|
related_name='children',
|
|
help_text='Parent process that spawned this process'
|
|
)
|
|
|
|
# Process type (cli, worker, orchestrator, binary, supervisord)
|
|
process_type = models.CharField(
|
|
max_length=16,
|
|
choices=TypeChoices.choices,
|
|
default=TypeChoices.CLI,
|
|
db_index=True,
|
|
help_text='Type of process (cli, worker, orchestrator, binary, supervisord)'
|
|
)
|
|
|
|
# Execution metadata
|
|
pwd = models.CharField(max_length=512, default='', null=False, blank=True,
|
|
help_text='Working directory for process execution')
|
|
cmd = models.JSONField(default=list, null=False, blank=True,
|
|
help_text='Command as array of arguments')
|
|
env = models.JSONField(default=dict, null=False, blank=True,
|
|
help_text='Environment variables for process')
|
|
timeout = models.IntegerField(default=120, null=False,
|
|
help_text='Timeout in seconds')
|
|
|
|
# Process results
|
|
pid = models.IntegerField(default=None, null=True, blank=True,
|
|
help_text='OS process ID')
|
|
exit_code = models.IntegerField(default=None, null=True, blank=True,
|
|
help_text='Process exit code (0 = success)')
|
|
stdout = models.TextField(default='', null=False, blank=True,
|
|
help_text='Standard output from process')
|
|
stderr = models.TextField(default='', null=False, blank=True,
|
|
help_text='Standard error from process')
|
|
|
|
# Timing
|
|
started_at = models.DateTimeField(default=None, null=True, blank=True,
|
|
help_text='When process was launched')
|
|
ended_at = models.DateTimeField(default=None, null=True, blank=True,
|
|
help_text='When process completed/terminated')
|
|
|
|
# Optional FKs
|
|
binary = models.ForeignKey(
|
|
Binary,
|
|
on_delete=models.SET_NULL,
|
|
null=True, blank=True,
|
|
related_name='process_set',
|
|
help_text='Binary used by this process'
|
|
)
|
|
iface = models.ForeignKey(
|
|
NetworkInterface,
|
|
on_delete=models.SET_NULL,
|
|
null=True, blank=True,
|
|
related_name='process_set',
|
|
help_text='Network interface used by this process'
|
|
)
|
|
|
|
# Optional connection URL (for CDP, sonic, etc.)
|
|
url = models.URLField(max_length=2048, default=None, null=True, blank=True,
|
|
help_text='Connection URL (CDP endpoint, sonic server, etc.)')
|
|
|
|
# Reverse relation to ArchiveResult (OneToOne from AR side)
|
|
# archiveresult: OneToOneField defined on ArchiveResult model
|
|
|
|
# State machine fields
|
|
status = models.CharField(
|
|
max_length=16,
|
|
choices=StatusChoices.choices,
|
|
default=StatusChoices.QUEUED,
|
|
db_index=True
|
|
)
|
|
retry_at = models.DateTimeField(
|
|
default=timezone.now,
|
|
null=True, blank=True,
|
|
db_index=True,
|
|
help_text='When to retry this process'
|
|
)
|
|
|
|
state_machine_name: str = 'archivebox.machine.models.ProcessMachine'
|
|
|
|
objects: ProcessManager = ProcessManager()
|
|
|
|
class Meta:
|
|
app_label = 'machine'
|
|
verbose_name = 'Process'
|
|
verbose_name_plural = 'Processes'
|
|
indexes = [
|
|
models.Index(fields=['machine', 'status', 'retry_at']),
|
|
models.Index(fields=['binary', 'exit_code']),
|
|
]
|
|
|
|
def __str__(self) -> str:
|
|
cmd_str = ' '.join(self.cmd[:3]) if self.cmd else '(no cmd)'
|
|
return f'Process[{self.id}] {cmd_str} ({self.status})'
|
|
|
|
# Properties that delegate to related objects
|
|
@property
|
|
def cmd_version(self) -> str:
|
|
"""Get version from associated binary."""
|
|
return self.binary.version if self.binary else ''
|
|
|
|
@property
|
|
def bin_abspath(self) -> str:
|
|
"""Get absolute path from associated binary."""
|
|
return self.binary.abspath if self.binary else ''
|
|
|
|
@property
|
|
def plugin(self) -> str:
|
|
"""Get plugin name from associated ArchiveResult (if any)."""
|
|
if hasattr(self, 'archiveresult'):
|
|
# Inline import to avoid circular dependency
|
|
return self.archiveresult.plugin
|
|
return ''
|
|
|
|
@property
|
|
def hook_name(self) -> str:
|
|
"""Get hook name from associated ArchiveResult (if any)."""
|
|
if hasattr(self, 'archiveresult'):
|
|
return self.archiveresult.hook_name
|
|
return ''
|
|
|
|
def to_json(self) -> dict:
|
|
"""
|
|
Convert Process model instance to a JSON-serializable dict.
|
|
"""
|
|
from archivebox.config import VERSION
|
|
record = {
|
|
'type': 'Process',
|
|
'schema_version': VERSION,
|
|
'id': str(self.id),
|
|
'machine_id': str(self.machine_id),
|
|
'cmd': self.cmd,
|
|
'pwd': self.pwd,
|
|
'status': self.status,
|
|
'exit_code': self.exit_code,
|
|
'started_at': self.started_at.isoformat() if self.started_at else None,
|
|
'ended_at': self.ended_at.isoformat() if self.ended_at else None,
|
|
}
|
|
# Include optional fields if set
|
|
if self.binary_id:
|
|
record['binary_id'] = str(self.binary_id)
|
|
if self.pid:
|
|
record['pid'] = self.pid
|
|
if self.timeout:
|
|
record['timeout'] = self.timeout
|
|
return record
|
|
|
|
@staticmethod
|
|
def from_json(record: dict, overrides: dict = None):
|
|
"""
|
|
Create/update Process from JSON dict.
|
|
|
|
Args:
|
|
record: JSON dict with 'id' or process details
|
|
overrides: Optional dict of field overrides
|
|
|
|
Returns:
|
|
Process instance or None
|
|
"""
|
|
process_id = record.get('id')
|
|
if process_id:
|
|
try:
|
|
return Process.objects.get(id=process_id)
|
|
except Process.DoesNotExist:
|
|
pass
|
|
return None
|
|
|
|
def update_and_requeue(self, **kwargs):
|
|
"""
|
|
Update process fields and requeue for worker state machine.
|
|
Sets modified_at to ensure workers pick up changes.
|
|
"""
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
self.modified_at = timezone.now()
|
|
self.save()
|
|
|
|
# =========================================================================
|
|
# Process.current() and hierarchy methods
|
|
# =========================================================================
|
|
|
|
@classmethod
|
|
def current(cls) -> 'Process':
|
|
"""
|
|
Get or create the Process record for the current OS process.
|
|
|
|
Similar to Machine.current(), this:
|
|
1. Checks cache for existing Process with matching PID
|
|
2. Validates the cached Process is still valid (PID not reused)
|
|
3. Creates new Process if needed
|
|
|
|
IMPORTANT: Uses psutil to validate PID hasn't been reused.
|
|
PIDs are recycled by OS, so we compare start times.
|
|
"""
|
|
global _CURRENT_PROCESS
|
|
|
|
current_pid = os.getpid()
|
|
machine = Machine.current()
|
|
|
|
# Check cache validity
|
|
if _CURRENT_PROCESS:
|
|
# Verify: same PID, same machine, cache not expired
|
|
if (_CURRENT_PROCESS.pid == current_pid and
|
|
_CURRENT_PROCESS.machine_id == machine.id and
|
|
timezone.now() < _CURRENT_PROCESS.modified_at + timedelta(seconds=PROCESS_RECHECK_INTERVAL)):
|
|
return _CURRENT_PROCESS
|
|
_CURRENT_PROCESS = None
|
|
|
|
# Get actual process start time from OS for validation
|
|
os_start_time = None
|
|
if PSUTIL_AVAILABLE:
|
|
try:
|
|
os_proc = psutil.Process(current_pid)
|
|
os_start_time = os_proc.create_time()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
# Try to find existing Process for this PID on this machine
|
|
# Filter by: machine + PID + RUNNING + recent + start time matches
|
|
if os_start_time:
|
|
existing = cls.objects.filter(
|
|
machine=machine,
|
|
pid=current_pid,
|
|
status=cls.StatusChoices.RUNNING,
|
|
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
|
|
).order_by('-started_at').first()
|
|
|
|
if existing and existing.started_at:
|
|
db_start_time = existing.started_at.timestamp()
|
|
if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
|
|
_CURRENT_PROCESS = existing
|
|
return existing
|
|
|
|
# No valid existing record - create new one
|
|
parent = cls._find_parent_process(machine)
|
|
process_type = cls._detect_process_type()
|
|
|
|
# Use psutil cmdline if available (matches what proc() will validate against)
|
|
# Otherwise fall back to sys.argv
|
|
cmd = sys.argv
|
|
if PSUTIL_AVAILABLE:
|
|
try:
|
|
os_proc = psutil.Process(current_pid)
|
|
cmd = os_proc.cmdline()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
|
|
# Use psutil start time if available (more accurate than timezone.now())
|
|
if os_start_time:
|
|
started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone())
|
|
else:
|
|
started_at = timezone.now()
|
|
|
|
_CURRENT_PROCESS = cls.objects.create(
|
|
machine=machine,
|
|
parent=parent,
|
|
process_type=process_type,
|
|
cmd=cmd,
|
|
pwd=os.getcwd(),
|
|
pid=current_pid,
|
|
started_at=started_at,
|
|
status=cls.StatusChoices.RUNNING,
|
|
)
|
|
return _CURRENT_PROCESS
|
|
|
|
@classmethod
|
|
def _find_parent_process(cls, machine: 'Machine' = None) -> 'Process | None':
|
|
"""
|
|
Find the parent Process record by looking up PPID.
|
|
|
|
IMPORTANT: Validates against PID reuse by checking:
|
|
1. Same machine (PIDs are only unique per machine)
|
|
2. Start time matches OS process start time
|
|
3. Process is still RUNNING and recent
|
|
|
|
Returns None if parent is not an ArchiveBox process.
|
|
"""
|
|
if not PSUTIL_AVAILABLE:
|
|
return None
|
|
|
|
ppid = os.getppid()
|
|
machine = machine or Machine.current()
|
|
|
|
# Get parent process start time from OS
|
|
try:
|
|
os_parent = psutil.Process(ppid)
|
|
os_parent_start = os_parent.create_time()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
return None # Parent process doesn't exist
|
|
|
|
# Find matching Process record
|
|
candidates = cls.objects.filter(
|
|
machine=machine,
|
|
pid=ppid,
|
|
status=cls.StatusChoices.RUNNING,
|
|
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
|
|
).order_by('-started_at')
|
|
|
|
for candidate in candidates:
|
|
if candidate.started_at:
|
|
db_start_time = candidate.started_at.timestamp()
|
|
if abs(db_start_time - os_parent_start) < START_TIME_TOLERANCE:
|
|
return candidate
|
|
|
|
return None # No matching ArchiveBox parent process
|
|
|
|
@classmethod
|
|
def _detect_process_type(cls) -> str:
|
|
"""
|
|
Detect the type of the current process from sys.argv.
|
|
"""
|
|
argv_str = ' '.join(sys.argv).lower()
|
|
|
|
if 'supervisord' in argv_str:
|
|
return cls.TypeChoices.SUPERVISORD
|
|
elif 'orchestrator' in argv_str:
|
|
return cls.TypeChoices.ORCHESTRATOR
|
|
elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']):
|
|
return cls.TypeChoices.WORKER
|
|
elif 'archivebox' in argv_str:
|
|
return cls.TypeChoices.CLI
|
|
else:
|
|
return cls.TypeChoices.BINARY
|
|
|
|
@classmethod
|
|
def cleanup_stale_running(cls, machine: 'Machine' = None) -> int:
|
|
"""
|
|
Mark stale RUNNING processes as EXITED.
|
|
|
|
Processes are stale if:
|
|
- Status is RUNNING but OS process no longer exists
|
|
- Status is RUNNING but started_at is older than PID_REUSE_WINDOW
|
|
|
|
Returns count of processes cleaned up.
|
|
"""
|
|
machine = machine or Machine.current()
|
|
cleaned = 0
|
|
|
|
stale = cls.objects.filter(
|
|
machine=machine,
|
|
status=cls.StatusChoices.RUNNING,
|
|
)
|
|
|
|
for proc in stale:
|
|
is_stale = False
|
|
|
|
# Check if too old (PID definitely reused)
|
|
if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW:
|
|
is_stale = True
|
|
elif PSUTIL_AVAILABLE and proc.pid is not None:
|
|
# Check if OS process still exists with matching start time
|
|
try:
|
|
os_proc = psutil.Process(proc.pid)
|
|
if proc.started_at:
|
|
db_start = proc.started_at.timestamp()
|
|
os_start = os_proc.create_time()
|
|
if abs(db_start - os_start) > START_TIME_TOLERANCE:
|
|
is_stale = True # PID reused by different process
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
is_stale = True # Process no longer exists
|
|
|
|
if is_stale:
|
|
proc.status = cls.StatusChoices.EXITED
|
|
proc.ended_at = proc.ended_at or timezone.now()
|
|
proc.exit_code = proc.exit_code if proc.exit_code is not None else -1
|
|
proc.save(update_fields=['status', 'ended_at', 'exit_code'])
|
|
cleaned += 1
|
|
|
|
return cleaned
|
|
|
|
# =========================================================================
|
|
# Tree traversal properties
|
|
# =========================================================================
|
|
|
|
@property
|
|
def root(self) -> 'Process':
|
|
"""Get the root process (CLI command) of this hierarchy."""
|
|
proc = self
|
|
while proc.parent_id:
|
|
proc = proc.parent
|
|
return proc
|
|
|
|
@property
|
|
def ancestors(self) -> list['Process']:
|
|
"""Get all ancestor processes from parent to root."""
|
|
ancestors = []
|
|
proc = self.parent
|
|
while proc:
|
|
ancestors.append(proc)
|
|
proc = proc.parent
|
|
return ancestors
|
|
|
|
@property
|
|
def depth(self) -> int:
|
|
"""Get depth in the process tree (0 = root)."""
|
|
return len(self.ancestors)
|
|
|
|
def get_descendants(self, include_self: bool = False):
|
|
"""Get all descendant processes recursively."""
|
|
if include_self:
|
|
pks = [self.pk]
|
|
else:
|
|
pks = []
|
|
|
|
children = list(self.children.values_list('pk', flat=True))
|
|
while children:
|
|
pks.extend(children)
|
|
children = list(Process.objects.filter(parent_id__in=children).values_list('pk', flat=True))
|
|
|
|
return Process.objects.filter(pk__in=pks)
|
|
|
|
# =========================================================================
|
|
# Validated psutil access via .proc property
|
|
# =========================================================================
|
|
|
|
@property
|
|
def proc(self) -> 'psutil.Process | None':
|
|
"""
|
|
Get validated psutil.Process for this record.
|
|
|
|
Returns psutil.Process ONLY if:
|
|
1. Process with this PID exists in OS
|
|
2. OS process start time matches our started_at (within tolerance)
|
|
3. Process is on current machine
|
|
|
|
Returns None if:
|
|
- PID doesn't exist (process exited)
|
|
- PID was reused by a different process (start times don't match)
|
|
- We're on a different machine than where process ran
|
|
- psutil is not available
|
|
|
|
This prevents accidentally matching a stale/recycled PID.
|
|
"""
|
|
if not PSUTIL_AVAILABLE:
|
|
return None
|
|
|
|
# Can't get psutil.Process if we don't have a PID
|
|
if not self.pid:
|
|
return None
|
|
|
|
# Can't validate processes on other machines
|
|
if self.machine_id != Machine.current().id:
|
|
return None
|
|
|
|
try:
|
|
os_proc = psutil.Process(self.pid)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
return None # Process no longer exists
|
|
|
|
# Validate start time matches to prevent PID reuse confusion
|
|
if self.started_at:
|
|
os_start_time = os_proc.create_time()
|
|
db_start_time = self.started_at.timestamp()
|
|
|
|
if abs(os_start_time - db_start_time) > START_TIME_TOLERANCE:
|
|
# PID has been reused by a different process!
|
|
return None
|
|
|
|
# Optionally validate command matches (extra safety)
|
|
if self.cmd:
|
|
try:
|
|
os_cmdline = os_proc.cmdline()
|
|
# Check if first arg (binary) matches
|
|
if os_cmdline and self.cmd:
|
|
os_binary = os_cmdline[0] if os_cmdline else ''
|
|
db_binary = self.cmd[0] if self.cmd else ''
|
|
# Match by basename (handles /usr/bin/python3 vs python3)
|
|
if os_binary and db_binary:
|
|
if Path(os_binary).name != Path(db_binary).name:
|
|
return None # Different binary, PID reused
|
|
except (psutil.AccessDenied, psutil.ZombieProcess):
|
|
pass # Can't check cmdline, trust start time match
|
|
|
|
return os_proc
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
"""
|
|
Check if process is currently running via psutil.
|
|
|
|
More reliable than checking status field since it validates
|
|
the actual OS process exists and matches our record.
|
|
"""
|
|
proc = self.proc
|
|
return proc is not None and proc.is_running()
|
|
|
|
def is_alive(self) -> bool:
|
|
"""
|
|
Alias for is_running, for compatibility with subprocess.Popen API.
|
|
"""
|
|
return self.is_running
|
|
|
|
def get_memory_info(self) -> dict | None:
|
|
"""Get memory usage if process is running."""
|
|
proc = self.proc
|
|
if proc:
|
|
try:
|
|
mem = proc.memory_info()
|
|
return {'rss': mem.rss, 'vms': mem.vms}
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
return None
|
|
|
|
def get_cpu_percent(self) -> float | None:
|
|
"""Get CPU usage percentage if process is running."""
|
|
proc = self.proc
|
|
if proc:
|
|
try:
|
|
return proc.cpu_percent(interval=0.1)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
return None
|
|
|
|
def get_children_pids(self) -> list[int]:
|
|
"""Get PIDs of child processes from OS (not DB)."""
|
|
proc = self.proc
|
|
if proc:
|
|
try:
|
|
return [child.pid for child in proc.children(recursive=True)]
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
return []
|
|
|
|
# =========================================================================
|
|
# Lifecycle methods (launch, kill, poll, wait)
|
|
# =========================================================================
|
|
|
|
@property
|
|
def pid_file(self) -> Path:
|
|
"""Path to PID file for this process."""
|
|
return Path(self.pwd) / 'process.pid' if self.pwd else None
|
|
|
|
@property
|
|
def cmd_file(self) -> Path:
|
|
"""Path to cmd.sh script for this process."""
|
|
return Path(self.pwd) / 'cmd.sh' if self.pwd else None
|
|
|
|
@property
|
|
def stdout_file(self) -> Path:
|
|
"""Path to stdout log."""
|
|
return Path(self.pwd) / 'stdout.log' if self.pwd else None
|
|
|
|
@property
|
|
def stderr_file(self) -> Path:
|
|
"""Path to stderr log."""
|
|
return Path(self.pwd) / 'stderr.log' if self.pwd else None
|
|
|
|
def _write_pid_file(self) -> None:
|
|
"""Write PID file with mtime set to process start time."""
|
|
from archivebox.misc.process_utils import write_pid_file_with_mtime
|
|
if self.pid and self.started_at and self.pid_file:
|
|
write_pid_file_with_mtime(
|
|
self.pid_file,
|
|
self.pid,
|
|
self.started_at.timestamp()
|
|
)
|
|
|
|
def _write_cmd_file(self) -> None:
|
|
"""Write cmd.sh script for debugging/validation."""
|
|
from archivebox.misc.process_utils import write_cmd_file
|
|
if self.cmd and self.cmd_file:
|
|
write_cmd_file(self.cmd_file, self.cmd)
|
|
|
|
def _build_env(self) -> dict:
|
|
"""Build environment dict for subprocess, merging stored env with system."""
|
|
env = os.environ.copy()
|
|
env.update(self.env or {})
|
|
return env
|
|
|
|
def launch(self, background: bool = False) -> 'Process':
|
|
"""
|
|
Spawn the subprocess and update this Process record.
|
|
|
|
Args:
|
|
background: If True, don't wait for completion (for daemons/bg hooks)
|
|
|
|
Returns:
|
|
self (updated with pid, started_at, etc.)
|
|
"""
|
|
import subprocess
|
|
import time
|
|
|
|
# Validate pwd is set (required for output files)
|
|
if not self.pwd:
|
|
raise ValueError("Process.pwd must be set before calling launch()")
|
|
|
|
# Ensure output directory exists
|
|
Path(self.pwd).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Write cmd.sh for debugging
|
|
self._write_cmd_file()
|
|
|
|
stdout_path = self.stdout_file
|
|
stderr_path = self.stderr_file
|
|
|
|
with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err:
|
|
proc = subprocess.Popen(
|
|
self.cmd,
|
|
cwd=self.pwd,
|
|
stdout=out,
|
|
stderr=err,
|
|
env=self._build_env(),
|
|
)
|
|
|
|
# Get accurate start time from psutil if available
|
|
if PSUTIL_AVAILABLE:
|
|
try:
|
|
ps_proc = psutil.Process(proc.pid)
|
|
self.started_at = datetime.fromtimestamp(
|
|
ps_proc.create_time(),
|
|
tz=timezone.get_current_timezone()
|
|
)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
self.started_at = timezone.now()
|
|
else:
|
|
self.started_at = timezone.now()
|
|
|
|
self.pid = proc.pid
|
|
self.status = self.StatusChoices.RUNNING
|
|
self.save()
|
|
|
|
self._write_pid_file()
|
|
|
|
if not background:
|
|
try:
|
|
proc.wait(timeout=self.timeout)
|
|
self.exit_code = proc.returncode
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
proc.wait()
|
|
self.exit_code = -1
|
|
|
|
self.ended_at = timezone.now()
|
|
if stdout_path.exists():
|
|
self.stdout = stdout_path.read_text()
|
|
if stderr_path.exists():
|
|
self.stderr = stderr_path.read_text()
|
|
self.status = self.StatusChoices.EXITED
|
|
self.save()
|
|
|
|
return self
|
|
|
|
def kill(self, signal_num: int = 15) -> bool:
|
|
"""
|
|
Kill this process and update status.
|
|
|
|
Uses self.proc for safe killing - only kills if PID matches
|
|
our recorded process (prevents killing recycled PIDs).
|
|
|
|
Args:
|
|
signal_num: Signal to send (default SIGTERM=15)
|
|
|
|
Returns:
|
|
True if killed successfully, False otherwise
|
|
"""
|
|
# Use validated psutil.Process to ensure we're killing the right process
|
|
proc = self.proc
|
|
if proc is None:
|
|
# Process doesn't exist or PID was recycled - just update status
|
|
if self.status != self.StatusChoices.EXITED:
|
|
self.status = self.StatusChoices.EXITED
|
|
self.ended_at = self.ended_at or timezone.now()
|
|
self.save()
|
|
return False
|
|
|
|
try:
|
|
# Safe to kill - we validated it's our process via start time match
|
|
proc.send_signal(signal_num)
|
|
|
|
# Update our record
|
|
# Use standard Unix convention: 128 + signal number
|
|
self.exit_code = 128 + signal_num
|
|
self.ended_at = timezone.now()
|
|
self.status = self.StatusChoices.EXITED
|
|
self.save()
|
|
|
|
# Clean up PID file
|
|
if self.pid_file and self.pid_file.exists():
|
|
self.pid_file.unlink(missing_ok=True)
|
|
|
|
return True
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, ProcessLookupError):
|
|
# Process already exited between proc check and kill
|
|
self.status = self.StatusChoices.EXITED
|
|
self.ended_at = self.ended_at or timezone.now()
|
|
self.save()
|
|
return False
|
|
|
|
def poll(self) -> int | None:
|
|
"""
|
|
Check if process has exited and update status if so.
|
|
|
|
Returns:
|
|
exit_code if exited, None if still running
|
|
"""
|
|
if self.status == self.StatusChoices.EXITED:
|
|
return self.exit_code
|
|
|
|
if not self.is_running:
|
|
# Process exited - read output and update status
|
|
if self.stdout_file and self.stdout_file.exists():
|
|
self.stdout = self.stdout_file.read_text()
|
|
if self.stderr_file and self.stderr_file.exists():
|
|
self.stderr = self.stderr_file.read_text()
|
|
|
|
# Try to get exit code from proc or default to unknown
|
|
self.exit_code = self.exit_code if self.exit_code is not None else -1
|
|
self.ended_at = timezone.now()
|
|
self.status = self.StatusChoices.EXITED
|
|
self.save()
|
|
return self.exit_code
|
|
|
|
return None # Still running
|
|
|
|
def wait(self, timeout: int | None = None) -> int:
|
|
"""
|
|
Wait for process to exit, polling periodically.
|
|
|
|
Args:
|
|
timeout: Max seconds to wait (None = use self.timeout)
|
|
|
|
Returns:
|
|
exit_code
|
|
|
|
Raises:
|
|
TimeoutError if process doesn't exit in time
|
|
"""
|
|
import time
|
|
|
|
timeout = timeout or self.timeout
|
|
start = time.time()
|
|
|
|
while True:
|
|
exit_code = self.poll()
|
|
if exit_code is not None:
|
|
return exit_code
|
|
|
|
if time.time() - start > timeout:
|
|
raise TimeoutError(f"Process {self.id} did not exit within {timeout}s")
|
|
|
|
time.sleep(0.1)
|
|
|
|
def terminate(self, graceful_timeout: float = 5.0) -> bool:
|
|
"""
|
|
Gracefully terminate process: SIGTERM → wait → SIGKILL.
|
|
|
|
This consolidates the scattered SIGTERM/SIGKILL logic from:
|
|
- crawls/models.py Crawl.cleanup()
|
|
- workers/pid_utils.py stop_worker()
|
|
- supervisord_util.py stop_existing_supervisord_process()
|
|
|
|
Args:
|
|
graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
|
|
|
|
Returns:
|
|
True if process was terminated, False if already dead
|
|
"""
|
|
import time
|
|
import signal
|
|
|
|
proc = self.proc
|
|
if proc is None:
|
|
# Already dead - just update status
|
|
if self.status != self.StatusChoices.EXITED:
|
|
self.status = self.StatusChoices.EXITED
|
|
self.ended_at = self.ended_at or timezone.now()
|
|
self.save()
|
|
return False
|
|
|
|
try:
|
|
# Step 1: Send SIGTERM for graceful shutdown
|
|
proc.terminate()
|
|
|
|
# Step 2: Wait for graceful exit
|
|
try:
|
|
exit_status = proc.wait(timeout=graceful_timeout)
|
|
# Process exited gracefully
|
|
# psutil.Process.wait() returns the exit status
|
|
self.exit_code = exit_status if exit_status is not None else 0
|
|
self.status = self.StatusChoices.EXITED
|
|
self.ended_at = timezone.now()
|
|
self.save()
|
|
return True
|
|
except psutil.TimeoutExpired:
|
|
pass # Still running, need to force kill
|
|
|
|
# Step 3: Force kill with SIGKILL
|
|
proc.kill()
|
|
proc.wait(timeout=2)
|
|
|
|
# Use standard Unix convention: 128 + signal number
|
|
self.exit_code = 128 + signal.SIGKILL
|
|
self.status = self.StatusChoices.EXITED
|
|
self.ended_at = timezone.now()
|
|
self.save()
|
|
return True
|
|
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
# Process already dead
|
|
self.status = self.StatusChoices.EXITED
|
|
self.ended_at = self.ended_at or timezone.now()
|
|
self.save()
|
|
return False
|
|
|
|
def kill_tree(self, graceful_timeout: float = 2.0) -> int:
|
|
"""
|
|
Kill this process and all its children (OS children, not DB children).
|
|
|
|
This consolidates the scattered child-killing logic from:
|
|
- crawls/models.py Crawl.cleanup() os.killpg()
|
|
- supervisord_util.py stop_existing_supervisord_process()
|
|
|
|
Args:
|
|
graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
|
|
|
|
Returns:
|
|
Number of processes killed (including self)
|
|
"""
|
|
import signal
|
|
|
|
killed_count = 0
|
|
proc = self.proc
|
|
if proc is None:
|
|
# Already dead
|
|
if self.status != self.StatusChoices.EXITED:
|
|
self.status = self.StatusChoices.EXITED
|
|
self.ended_at = self.ended_at or timezone.now()
|
|
self.save()
|
|
return 0
|
|
|
|
try:
|
|
# Get all children before killing parent
|
|
children = proc.children(recursive=True)
|
|
|
|
# Kill children first (reverse order - deepest first)
|
|
for child in reversed(children):
|
|
try:
|
|
child.terminate()
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
# Child already dead or we don't have permission - continue
|
|
pass
|
|
|
|
# Wait briefly for children to exit
|
|
gone, alive = psutil.wait_procs(children, timeout=graceful_timeout)
|
|
killed_count += len(gone)
|
|
|
|
# Force kill remaining children
|
|
for child in alive:
|
|
try:
|
|
child.kill()
|
|
killed_count += 1
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
# Child exited or we don't have permission - continue
|
|
pass
|
|
|
|
# Now kill self
|
|
if self.terminate(graceful_timeout=graceful_timeout):
|
|
killed_count += 1
|
|
|
|
return killed_count
|
|
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
# Process tree already dead
|
|
self.status = self.StatusChoices.EXITED
|
|
self.ended_at = self.ended_at or timezone.now()
|
|
self.save()
|
|
return killed_count
|
|
|
|
def kill_children_db(self) -> int:
|
|
"""
|
|
Kill all DB-tracked child processes (via parent FK).
|
|
|
|
Different from kill_tree() which uses OS children.
|
|
This kills processes created via Process.create(parent=self).
|
|
|
|
Returns:
|
|
Number of child Process records killed
|
|
"""
|
|
killed = 0
|
|
for child in self.children.filter(status=self.StatusChoices.RUNNING):
|
|
if child.terminate():
|
|
killed += 1
|
|
return killed
|
|
|
|
# =========================================================================
|
|
# Class methods for querying processes
|
|
# =========================================================================
|
|
|
|
@classmethod
|
|
def get_running(cls, process_type: str = None, machine: 'Machine' = None) -> 'QuerySet[Process]':
|
|
"""
|
|
Get all running processes, optionally filtered by type.
|
|
|
|
Replaces:
|
|
- workers/pid_utils.py get_all_worker_pids()
|
|
- workers/orchestrator.py get_total_worker_count()
|
|
|
|
Args:
|
|
process_type: Filter by TypeChoices (e.g., 'worker', 'hook')
|
|
machine: Filter by machine (defaults to current)
|
|
|
|
Returns:
|
|
QuerySet of running Process records
|
|
"""
|
|
machine = machine or Machine.current()
|
|
qs = cls.objects.filter(
|
|
machine=machine,
|
|
status=cls.StatusChoices.RUNNING,
|
|
)
|
|
if process_type:
|
|
qs = qs.filter(process_type=process_type)
|
|
return qs
|
|
|
|
@classmethod
|
|
def get_running_count(cls, process_type: str = None, machine: 'Machine' = None) -> int:
|
|
"""
|
|
Get count of running processes.
|
|
|
|
Replaces:
|
|
- workers/pid_utils.py get_running_worker_count()
|
|
"""
|
|
return cls.get_running(process_type=process_type, machine=machine).count()
|
|
|
|
@classmethod
|
|
def stop_all(cls, process_type: str = None, machine: 'Machine' = None, graceful: bool = True) -> int:
|
|
"""
|
|
Stop all running processes of a given type.
|
|
|
|
Args:
|
|
process_type: Filter by TypeChoices
|
|
machine: Filter by machine
|
|
graceful: If True, use terminate() (SIGTERM→SIGKILL), else kill()
|
|
|
|
Returns:
|
|
Number of processes stopped
|
|
"""
|
|
stopped = 0
|
|
for proc in cls.get_running(process_type=process_type, machine=machine):
|
|
if graceful:
|
|
if proc.terminate():
|
|
stopped += 1
|
|
else:
|
|
if proc.kill():
|
|
stopped += 1
|
|
return stopped
|
|
|
|
@classmethod
|
|
def get_next_worker_id(cls, process_type: str = 'worker', machine: 'Machine' = None) -> int:
|
|
"""
|
|
Get the next available worker ID for spawning new workers.
|
|
|
|
Replaces workers/pid_utils.py get_next_worker_id().
|
|
Simply returns count of running workers of this type.
|
|
|
|
Args:
|
|
process_type: Worker type to count
|
|
machine: Machine to scope query
|
|
|
|
Returns:
|
|
Next available worker ID (0-indexed)
|
|
"""
|
|
return cls.get_running_count(process_type=process_type, machine=machine)
|
|
|
|
|
|
# =============================================================================
|
|
# Binary State Machine
|
|
# =============================================================================
|
|
|
|
class BinaryMachine(BaseStateMachine, strict_states=True):
|
|
"""
|
|
State machine for managing Binary installation lifecycle.
|
|
|
|
Hook Lifecycle:
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ QUEUED State │
|
|
│ • Binary needs to be installed │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓ tick() when can_start()
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ STARTED State → enter_started() │
|
|
│ 1. binary.run() │
|
|
│ • discover_hooks('Binary') → all on_Binary__install_* │
|
|
│ • Try each provider hook in sequence: │
|
|
│ - run_hook(script, output_dir, ...) │
|
|
│ - If returncode == 0: │
|
|
│ * Read stdout.log │
|
|
│ * Parse JSONL for 'Binary' record with abspath │
|
|
│ * Update self: abspath, version, sha256, provider │
|
|
│ * Set status=SUCCEEDED, RETURN │
|
|
│ • If no hook succeeds: set status=FAILED │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓ tick() checks status
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ SUCCEEDED / FAILED │
|
|
│ • Set by binary.run() based on hook results │
|
|
│ • Health stats incremented (num_uses_succeeded/failed) │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
"""
|
|
|
|
model_attr_name = 'binary'
|
|
|
|
# States
|
|
queued = State(value=Binary.StatusChoices.QUEUED, initial=True)
|
|
started = State(value=Binary.StatusChoices.STARTED)
|
|
succeeded = State(value=Binary.StatusChoices.SUCCEEDED, final=True)
|
|
failed = State(value=Binary.StatusChoices.FAILED, final=True)
|
|
|
|
# Tick Event - transitions based on conditions
|
|
tick = (
|
|
queued.to.itself(unless='can_start') |
|
|
queued.to(started, cond='can_start') |
|
|
started.to.itself(unless='is_finished') |
|
|
started.to(succeeded, cond='is_succeeded') |
|
|
started.to(failed, cond='is_failed')
|
|
)
|
|
|
|
def can_start(self) -> bool:
|
|
"""Check if binary installation can start."""
|
|
return bool(self.binary.name and self.binary.binproviders)
|
|
|
|
def is_succeeded(self) -> bool:
|
|
"""Check if installation succeeded (status was set by run())."""
|
|
return self.binary.status == Binary.StatusChoices.SUCCEEDED
|
|
|
|
def is_failed(self) -> bool:
|
|
"""Check if installation failed (status was set by run())."""
|
|
return self.binary.status == Binary.StatusChoices.FAILED
|
|
|
|
def is_finished(self) -> bool:
|
|
"""Check if installation has completed (success or failure)."""
|
|
return self.binary.status in (
|
|
Binary.StatusChoices.SUCCEEDED,
|
|
Binary.StatusChoices.FAILED,
|
|
)
|
|
|
|
@queued.enter
|
|
def enter_queued(self):
|
|
"""Binary is queued for installation."""
|
|
self.binary.update_and_requeue(
|
|
retry_at=timezone.now(),
|
|
status=Binary.StatusChoices.QUEUED,
|
|
)
|
|
|
|
@started.enter
|
|
def enter_started(self):
|
|
"""Start binary installation."""
|
|
# Lock the binary while installation runs
|
|
self.binary.update_and_requeue(
|
|
retry_at=timezone.now() + timedelta(seconds=300), # 5 min timeout for installation
|
|
status=Binary.StatusChoices.STARTED,
|
|
)
|
|
|
|
# Run installation hooks
|
|
self.binary.run()
|
|
|
|
# Save updated status (run() updates status to succeeded/failed)
|
|
self.binary.save()
|
|
|
|
@succeeded.enter
|
|
def enter_succeeded(self):
|
|
"""Binary installed successfully."""
|
|
self.binary.update_and_requeue(
|
|
retry_at=None,
|
|
status=Binary.StatusChoices.SUCCEEDED,
|
|
)
|
|
|
|
# Increment health stats
|
|
self.binary.increment_health_stats(success=True)
|
|
|
|
@failed.enter
|
|
def enter_failed(self):
|
|
"""Binary installation failed."""
|
|
self.binary.update_and_requeue(
|
|
retry_at=None,
|
|
status=Binary.StatusChoices.FAILED,
|
|
)
|
|
|
|
# Increment health stats
|
|
self.binary.increment_health_stats(success=False)
|
|
|
|
|
|
# =============================================================================
|
|
# Process State Machine
|
|
# =============================================================================
|
|
|
|
class ProcessMachine(BaseStateMachine, strict_states=True):
|
|
"""
|
|
State machine for managing Process (OS subprocess) lifecycle.
|
|
|
|
Process Lifecycle:
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ QUEUED State │
|
|
│ • Process ready to launch, waiting for resources │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓ tick() when can_start()
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ RUNNING State → enter_running() │
|
|
│ 1. process.launch() │
|
|
│ • Spawn subprocess with cmd, pwd, env, timeout │
|
|
│ • Set pid, started_at │
|
|
│ • Process runs in background or foreground │
|
|
│ 2. Monitor process completion │
|
|
│ • Check exit code when process completes │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓ tick() checks is_exited()
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ EXITED State │
|
|
│ • Process completed (exit_code set) │
|
|
│ • Health stats incremented │
|
|
│ • stdout/stderr captured │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
|
|
Note: This is a simpler state machine than ArchiveResult.
|
|
Process is just about execution lifecycle. ArchiveResult handles
|
|
the archival-specific logic (status, output parsing, etc.).
|
|
"""
|
|
|
|
model_attr_name = 'process'
|
|
|
|
# States
|
|
queued = State(value=Process.StatusChoices.QUEUED, initial=True)
|
|
running = State(value=Process.StatusChoices.RUNNING)
|
|
exited = State(value=Process.StatusChoices.EXITED, final=True)
|
|
|
|
# Tick Event - transitions based on conditions
|
|
tick = (
|
|
queued.to.itself(unless='can_start') |
|
|
queued.to(running, cond='can_start') |
|
|
running.to.itself(unless='is_exited') |
|
|
running.to(exited, cond='is_exited')
|
|
)
|
|
|
|
# Additional events (for explicit control)
|
|
launch = queued.to(running)
|
|
kill = running.to(exited)
|
|
|
|
def can_start(self) -> bool:
|
|
"""Check if process can start (has cmd and machine)."""
|
|
return bool(self.process.cmd and self.process.machine)
|
|
|
|
def is_exited(self) -> bool:
|
|
"""Check if process has exited (exit_code is set)."""
|
|
return self.process.exit_code is not None
|
|
|
|
@queued.enter
|
|
def enter_queued(self):
|
|
"""Process is queued for execution."""
|
|
self.process.update_and_requeue(
|
|
retry_at=timezone.now(),
|
|
status=Process.StatusChoices.QUEUED,
|
|
)
|
|
|
|
@running.enter
|
|
def enter_running(self):
|
|
"""Start process execution."""
|
|
# Lock the process while it runs
|
|
self.process.update_and_requeue(
|
|
retry_at=timezone.now() + timedelta(seconds=self.process.timeout),
|
|
status=Process.StatusChoices.RUNNING,
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
# Launch the subprocess
|
|
# NOTE: This is a placeholder - actual launch logic would
|
|
# be implemented based on how hooks currently spawn processes
|
|
# For now, Process is a data model that tracks execution metadata
|
|
# The actual subprocess spawning is still handled by run_hook()
|
|
|
|
# Mark as immediately exited for now (until we refactor run_hook)
|
|
# In the future, this would actually spawn the subprocess
|
|
self.process.exit_code = 0 # Placeholder
|
|
self.process.save()
|
|
|
|
@exited.enter
|
|
def enter_exited(self):
|
|
"""Process has exited."""
|
|
self.process.update_and_requeue(
|
|
retry_at=None,
|
|
status=Process.StatusChoices.EXITED,
|
|
ended_at=timezone.now(),
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# State Machine Registration
|
|
# =============================================================================
|
|
|
|
# Manually register state machines with python-statemachine registry
|
|
registry.register(BinaryMachine)
|
|
registry.register(ProcessMachine)
|
|
|
|
|