Files
ArchiveBox/archivebox/machine/models.py

1807 lines
67 KiB
Python
Executable File

__package__ = 'archivebox.machine'
import os
import sys
import socket
from pathlib import Path
from archivebox.uuid_compat import uuid7
from datetime import timedelta, datetime
from statemachine import State, registry
from django.db import models
from django.utils import timezone
from django.utils.functional import cached_property
from archivebox.base_models.models import ModelWithHealthStats
from archivebox.workers.models import BaseStateMachine
from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
_CURRENT_MACHINE = None
_CURRENT_INTERFACE = None
_CURRENT_BINARIES = {}
_CURRENT_PROCESS = None
MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60
NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60
BINARY_RECHECK_INTERVAL = 1 * 30 * 60
PROCESS_RECHECK_INTERVAL = 60 # Re-validate every 60 seconds
PID_REUSE_WINDOW = timedelta(hours=24) # Max age for considering a PID match valid
START_TIME_TOLERANCE = 5.0 # Seconds tolerance for start time matching
class MachineManager(models.Manager):
def current(self) -> 'Machine':
return Machine.current()
class Machine(ModelWithHealthStats):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
guid = models.CharField(max_length=64, default=None, null=False, unique=True, editable=False)
hostname = models.CharField(max_length=63, default=None, null=False)
hw_in_docker = models.BooleanField(default=False, null=False)
hw_in_vm = models.BooleanField(default=False, null=False)
hw_manufacturer = models.CharField(max_length=63, default=None, null=False)
hw_product = models.CharField(max_length=63, default=None, null=False)
hw_uuid = models.CharField(max_length=255, default=None, null=False)
os_arch = models.CharField(max_length=15, default=None, null=False)
os_family = models.CharField(max_length=15, default=None, null=False)
os_platform = models.CharField(max_length=63, default=None, null=False)
os_release = models.CharField(max_length=63, default=None, null=False)
os_kernel = models.CharField(max_length=255, default=None, null=False)
stats = models.JSONField(default=dict, null=True, blank=True)
config = models.JSONField(default=dict, null=True, blank=True,
help_text="Machine-specific config overrides (e.g., resolved binary paths like WGET_BINARY)")
num_uses_failed = models.PositiveIntegerField(default=0)
num_uses_succeeded = models.PositiveIntegerField(default=0)
objects: MachineManager = MachineManager()
networkinterface_set: models.Manager['NetworkInterface']
class Meta:
app_label = 'machine'
@classmethod
def current(cls) -> 'Machine':
global _CURRENT_MACHINE
if _CURRENT_MACHINE:
if timezone.now() < _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL):
return _CURRENT_MACHINE
_CURRENT_MACHINE = None
_CURRENT_MACHINE, _ = cls.objects.update_or_create(
guid=get_host_guid(),
defaults={'hostname': socket.gethostname(), **get_os_info(), **get_vm_info(), 'stats': get_host_stats()},
)
return _CURRENT_MACHINE
def to_json(self) -> dict:
"""
Convert Machine model instance to a JSON-serializable dict.
"""
from archivebox.config import VERSION
return {
'type': 'Machine',
'schema_version': VERSION,
'id': str(self.id),
'guid': self.guid,
'hostname': self.hostname,
'hw_in_docker': self.hw_in_docker,
'hw_in_vm': self.hw_in_vm,
'hw_manufacturer': self.hw_manufacturer,
'hw_product': self.hw_product,
'hw_uuid': self.hw_uuid,
'os_arch': self.os_arch,
'os_family': self.os_family,
'os_platform': self.os_platform,
'os_kernel': self.os_kernel,
'os_release': self.os_release,
'stats': self.stats,
'config': self.config or {},
}
@staticmethod
def from_json(record: dict, overrides: dict = None):
"""
Update Machine config from JSON dict.
Args:
record: JSON dict with '_method': 'update', 'key': '...', 'value': '...'
overrides: Not used
Returns:
Machine instance or None
"""
method = record.get('_method')
if method == 'update':
key = record.get('key')
value = record.get('value')
if key and value:
machine = Machine.current()
if not machine.config:
machine.config = {}
machine.config[key] = value
machine.save(update_fields=['config'])
return machine
return None
class NetworkInterfaceManager(models.Manager):
def current(self) -> 'NetworkInterface':
return NetworkInterface.current()
class NetworkInterface(ModelWithHealthStats):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False)
mac_address = models.CharField(max_length=17, default=None, null=False, editable=False)
ip_public = models.GenericIPAddressField(default=None, null=False, editable=False)
ip_local = models.GenericIPAddressField(default=None, null=False, editable=False)
dns_server = models.GenericIPAddressField(default=None, null=False, editable=False)
hostname = models.CharField(max_length=63, default=None, null=False)
iface = models.CharField(max_length=15, default=None, null=False)
isp = models.CharField(max_length=63, default=None, null=False)
city = models.CharField(max_length=63, default=None, null=False)
region = models.CharField(max_length=63, default=None, null=False)
country = models.CharField(max_length=63, default=None, null=False)
# num_uses_failed = models.PositiveIntegerField(default=0) # from ModelWithHealthStats
# num_uses_succeeded = models.PositiveIntegerField(default=0) # from ModelWithHealthStats
objects: NetworkInterfaceManager = NetworkInterfaceManager()
class Meta:
app_label = 'machine'
unique_together = (('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),)
@classmethod
def current(cls) -> 'NetworkInterface':
global _CURRENT_INTERFACE
if _CURRENT_INTERFACE:
if timezone.now() < _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL):
return _CURRENT_INTERFACE
_CURRENT_INTERFACE = None
machine = Machine.objects.current()
net_info = get_host_network()
_CURRENT_INTERFACE, _ = cls.objects.update_or_create(
machine=machine, ip_public=net_info.pop('ip_public'), ip_local=net_info.pop('ip_local'),
mac_address=net_info.pop('mac_address'), dns_server=net_info.pop('dns_server'), defaults=net_info,
)
return _CURRENT_INTERFACE
class BinaryManager(models.Manager):
def get_from_db_or_cache(self, name: str, abspath: str = '', version: str = '', sha256: str = '', binprovider: str = 'env') -> 'Binary':
"""Get or create an Binary record from the database or cache."""
global _CURRENT_BINARIES
cached = _CURRENT_BINARIES.get(name)
if cached and timezone.now() < cached.modified_at + timedelta(seconds=BINARY_RECHECK_INTERVAL):
return cached
_CURRENT_BINARIES[name], _ = self.update_or_create(
machine=Machine.objects.current(), name=name, binprovider=binprovider,
version=version, abspath=abspath, sha256=sha256,
)
return _CURRENT_BINARIES[name]
def get_valid_binary(self, name: str, machine: 'Machine | None' = None) -> 'Binary | None':
"""Get a valid Binary for the given name on the current machine, or None if not found."""
machine = machine or Machine.current()
return self.filter(
machine=machine,
name__iexact=name,
).exclude(abspath='').exclude(abspath__isnull=True).order_by('-modified_at').first()
class Binary(ModelWithHealthStats):
"""
Tracks an binary on a specific machine.
Follows the unified state machine pattern:
- queued: Binary needs to be installed
- started: Installation in progress
- succeeded: Binary installed successfully (abspath, version, sha256 populated)
- failed: Installation failed
State machine calls run() which executes on_Binary__install_* hooks
to install the binary using the specified providers.
"""
class StatusChoices(models.TextChoices):
QUEUED = 'queued', 'Queued'
STARTED = 'started', 'Started'
SUCCEEDED = 'succeeded', 'Succeeded'
FAILED = 'failed', 'Failed'
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
machine = models.ForeignKey(Machine, on_delete=models.CASCADE, null=False)
# Binary metadata
name = models.CharField(max_length=63, default='', null=False, blank=True, db_index=True)
binproviders = models.CharField(max_length=127, default='env', null=False, blank=True,
help_text="Comma-separated list of allowed providers: apt,brew,pip,npm,env")
overrides = models.JSONField(default=dict, blank=True,
help_text="Provider-specific overrides: {'apt': {'packages': ['pkg']}, ...}")
# Installation results (populated after installation)
binprovider = models.CharField(max_length=31, default='', null=False, blank=True,
help_text="Provider that successfully installed this binary")
abspath = models.CharField(max_length=255, default='', null=False, blank=True)
version = models.CharField(max_length=32, default='', null=False, blank=True)
sha256 = models.CharField(max_length=64, default='', null=False, blank=True)
# State machine fields
status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.QUEUED, db_index=True)
retry_at = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True,
help_text="When to retry this binary installation")
output_dir = models.CharField(max_length=255, default='', null=False, blank=True,
help_text="Directory where installation hook logs are stored")
# Health stats
num_uses_failed = models.PositiveIntegerField(default=0)
num_uses_succeeded = models.PositiveIntegerField(default=0)
state_machine_name: str = 'archivebox.machine.models.BinaryMachine'
objects: BinaryManager = BinaryManager()
class Meta:
app_label = 'machine'
verbose_name = 'Binary'
verbose_name_plural = 'Binaries'
unique_together = (('machine', 'name', 'abspath', 'version', 'sha256'),)
def __str__(self) -> str:
return f'{self.name}@{self.binprovider}+{self.abspath}@{self.version}'
@property
def is_valid(self) -> bool:
"""A binary is valid if it has both abspath and version set."""
return bool(self.abspath) and bool(self.version)
@cached_property
def binary_info(self) -> dict:
"""Return info about the binary."""
return {
'name': self.name,
'abspath': self.abspath,
'version': self.version,
'binprovider': self.binprovider,
'is_valid': self.is_valid,
}
def to_json(self) -> dict:
"""
Convert Binary model instance to a JSON-serializable dict.
"""
from archivebox.config import VERSION
return {
'type': 'Binary',
'schema_version': VERSION,
'id': str(self.id),
'machine_id': str(self.machine_id),
'name': self.name,
'binprovider': self.binprovider,
'abspath': self.abspath,
'version': self.version,
'sha256': self.sha256,
'status': self.status,
}
@staticmethod
def from_json(record: dict, overrides: dict = None):
"""
Create/update Binary from JSON dict.
Handles two cases:
1. From binaries.json: creates queued binary with name, binproviders, overrides
2. From hook output: updates binary with abspath, version, sha256, binprovider
Args:
record: JSON dict with 'name' and either:
- 'binproviders', 'overrides' (from binaries.json)
- 'abspath', 'version', 'sha256', 'binprovider' (from hook output)
overrides: Not used
Returns:
Binary instance or None
"""
name = record.get('name')
if not name:
return None
machine = Machine.current()
overrides = overrides or {}
# Case 1: From binaries.jsonl - create queued binary
if 'binproviders' in record or ('overrides' in record and not record.get('abspath')):
binary, created = Binary.objects.get_or_create(
machine=machine,
name=name,
defaults={
'binproviders': record.get('binproviders', 'env'),
'overrides': record.get('overrides', {}),
'status': Binary.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
)
return binary
# Case 2: From hook output - update with installation results
abspath = record.get('abspath')
version = record.get('version')
if not abspath or not version:
return None
binary, _ = Binary.objects.update_or_create(
machine=machine,
name=name,
defaults={
'abspath': abspath,
'version': version,
'sha256': record.get('sha256', ''),
'binprovider': record.get('binprovider', 'env'),
'status': Binary.StatusChoices.SUCCEEDED,
'retry_at': None,
}
)
return binary
@property
def OUTPUT_DIR(self):
"""Return the output directory for this binary installation."""
from pathlib import Path
from django.conf import settings
DATA_DIR = getattr(settings, 'DATA_DIR', Path.cwd())
return Path(DATA_DIR) / 'machines' / str(self.machine_id) / 'binaries' / self.name / str(self.id)
def update_and_requeue(self, **kwargs):
"""
Update binary fields and requeue for worker state machine.
Sets modified_at to ensure workers pick up changes.
Always saves the model after updating.
"""
for key, value in kwargs.items():
setattr(self, key, value)
self.modified_at = timezone.now()
self.save()
def run(self):
"""
Execute binary installation by running on_Binary__install_* hooks.
Called by BinaryMachine when entering 'started' state.
Runs ALL on_Binary__install_* hooks - each hook checks binproviders
and decides if it can handle this binary. First hook to succeed wins.
Updates status to SUCCEEDED or FAILED based on hook output.
"""
import json
from archivebox.hooks import discover_hooks, run_hook
from archivebox.config.configset import get_config
# Get merged config (Binary doesn't have crawl/snapshot context)
config = get_config(scope='global')
# Create output directory
output_dir = self.OUTPUT_DIR
output_dir.mkdir(parents=True, exist_ok=True)
self.output_dir = str(output_dir)
self.save()
# Discover ALL on_Binary__install_* hooks
hooks = discover_hooks('Binary', config=config)
if not hooks:
self.status = self.StatusChoices.FAILED
self.save()
return
# Run each hook - they decide if they can handle this binary
for hook in hooks:
plugin_name = hook.parent.name
plugin_output_dir = output_dir / plugin_name
plugin_output_dir.mkdir(parents=True, exist_ok=True)
# Build kwargs for hook
hook_kwargs = {
'binary_id': str(self.id),
'machine_id': str(self.machine_id),
'name': self.name,
'binproviders': self.binproviders,
}
# Add overrides as JSON string if present
if self.overrides:
hook_kwargs['overrides'] = json.dumps(self.overrides)
# Run the hook
result = run_hook(
hook,
output_dir=plugin_output_dir,
config=config,
timeout=600, # 10 min timeout for binary installation
**hook_kwargs
)
# Background hook (unlikely for binary installation, but handle it)
if result is None:
continue
# Failed or skipped hook - try next one
if result['returncode'] != 0:
continue
# Parse JSONL output to check for successful installation
stdout_file = plugin_output_dir / 'stdout.log'
if stdout_file.exists():
stdout = stdout_file.read_text()
for line in stdout.splitlines():
if line.strip() and line.strip().startswith('{'):
try:
record = json.loads(line)
if record.get('type') == 'Binary' and record.get('abspath'):
# Update self from successful installation
self.abspath = record['abspath']
self.version = record.get('version', '')
self.sha256 = record.get('sha256', '')
self.binprovider = record.get('binprovider', 'env')
self.status = self.StatusChoices.SUCCEEDED
self.save()
return
except json.JSONDecodeError:
continue
# No hook succeeded
self.status = self.StatusChoices.FAILED
self.save()
def cleanup(self):
"""
Clean up background binary installation hooks.
Called by state machine if needed (not typically used for binaries
since installations are foreground, but included for consistency).
"""
from pathlib import Path
from archivebox.misc.process_utils import safe_kill_process
output_dir = self.OUTPUT_DIR
if not output_dir.exists():
return
# Kill any background hooks
for plugin_dir in output_dir.iterdir():
if not plugin_dir.is_dir():
continue
pid_file = plugin_dir / 'hook.pid'
cmd_file = plugin_dir / 'cmd.sh'
safe_kill_process(pid_file, cmd_file)
# =============================================================================
# Process Model
# =============================================================================
class ProcessManager(models.Manager):
"""Manager for Process model."""
def current(self) -> 'Process':
"""Get the Process record for the current OS process."""
return Process.current()
def get_by_pid(self, pid: int, machine: 'Machine' = None) -> 'Process | None':
"""
Find a Process by PID with proper validation against PID reuse.
IMPORTANT: PIDs are reused by the OS! This method:
1. Filters by machine (required - PIDs are only unique per machine)
2. Filters by time window (processes older than 24h are stale)
3. Validates via psutil that start times match
Args:
pid: OS process ID
machine: Machine instance (defaults to current machine)
Returns:
Process if found and validated, None otherwise
"""
if not PSUTIL_AVAILABLE:
return None
machine = machine or Machine.current()
# Get the actual process start time from OS
try:
os_proc = psutil.Process(pid)
os_start_time = os_proc.create_time()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# Process doesn't exist - any DB record with this PID is stale
return None
# Query candidates: same machine, same PID, recent, still RUNNING
candidates = self.filter(
machine=machine,
pid=pid,
status=Process.StatusChoices.RUNNING,
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
).order_by('-started_at')
for candidate in candidates:
# Validate start time matches (within tolerance)
if candidate.started_at:
db_start_time = candidate.started_at.timestamp()
if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
return candidate
return None
def create_for_archiveresult(self, archiveresult, **kwargs):
"""
Create a Process record for an ArchiveResult.
Called during migration and when creating new ArchiveResults.
"""
# Defaults from ArchiveResult if not provided
defaults = {
'machine': Machine.current(),
'pwd': kwargs.get('pwd') or str(archiveresult.snapshot.output_dir / archiveresult.plugin),
'cmd': kwargs.get('cmd') or [],
'status': 'queued',
'timeout': kwargs.get('timeout', 120),
'env': kwargs.get('env', {}),
}
defaults.update(kwargs)
process = self.create(**defaults)
return process
class Process(models.Model):
"""
Tracks a single OS process execution.
Process represents the actual subprocess spawned to execute a hook.
One Process can optionally be associated with an ArchiveResult (via OneToOne),
but Process can also exist standalone for internal operations.
Follows the unified state machine pattern:
- queued: Process ready to launch
- running: Process actively executing
- exited: Process completed (check exit_code for success/failure)
State machine calls launch() to spawn the process and monitors its lifecycle.
"""
class StatusChoices(models.TextChoices):
QUEUED = 'queued', 'Queued'
RUNNING = 'running', 'Running'
EXITED = 'exited', 'Exited'
class TypeChoices(models.TextChoices):
SUPERVISORD = 'supervisord', 'Supervisord'
ORCHESTRATOR = 'orchestrator', 'Orchestrator'
WORKER = 'worker', 'Worker'
CLI = 'cli', 'CLI'
BINARY = 'binary', 'Binary'
# Primary fields
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
# Machine FK - required (every process runs on a machine)
machine = models.ForeignKey(
Machine,
on_delete=models.CASCADE,
null=False,
related_name='process_set',
help_text='Machine where this process executed'
)
# Parent process (optional)
parent = models.ForeignKey(
'self',
on_delete=models.SET_NULL,
null=True, blank=True,
related_name='children',
help_text='Parent process that spawned this process'
)
# Process type (cli, worker, orchestrator, binary, supervisord)
process_type = models.CharField(
max_length=16,
choices=TypeChoices.choices,
default=TypeChoices.CLI,
db_index=True,
help_text='Type of process (cli, worker, orchestrator, binary, supervisord)'
)
# Execution metadata
pwd = models.CharField(max_length=512, default='', null=False, blank=True,
help_text='Working directory for process execution')
cmd = models.JSONField(default=list, null=False, blank=True,
help_text='Command as array of arguments')
env = models.JSONField(default=dict, null=False, blank=True,
help_text='Environment variables for process')
timeout = models.IntegerField(default=120, null=False,
help_text='Timeout in seconds')
# Process results
pid = models.IntegerField(default=None, null=True, blank=True,
help_text='OS process ID')
exit_code = models.IntegerField(default=None, null=True, blank=True,
help_text='Process exit code (0 = success)')
stdout = models.TextField(default='', null=False, blank=True,
help_text='Standard output from process')
stderr = models.TextField(default='', null=False, blank=True,
help_text='Standard error from process')
# Timing
started_at = models.DateTimeField(default=None, null=True, blank=True,
help_text='When process was launched')
ended_at = models.DateTimeField(default=None, null=True, blank=True,
help_text='When process completed/terminated')
# Optional FKs
binary = models.ForeignKey(
Binary,
on_delete=models.SET_NULL,
null=True, blank=True,
related_name='process_set',
help_text='Binary used by this process'
)
iface = models.ForeignKey(
NetworkInterface,
on_delete=models.SET_NULL,
null=True, blank=True,
related_name='process_set',
help_text='Network interface used by this process'
)
# Optional connection URL (for CDP, sonic, etc.)
url = models.URLField(max_length=2048, default=None, null=True, blank=True,
help_text='Connection URL (CDP endpoint, sonic server, etc.)')
# Reverse relation to ArchiveResult (OneToOne from AR side)
# archiveresult: OneToOneField defined on ArchiveResult model
# State machine fields
status = models.CharField(
max_length=16,
choices=StatusChoices.choices,
default=StatusChoices.QUEUED,
db_index=True
)
retry_at = models.DateTimeField(
default=timezone.now,
null=True, blank=True,
db_index=True,
help_text='When to retry this process'
)
state_machine_name: str = 'archivebox.machine.models.ProcessMachine'
objects: ProcessManager = ProcessManager()
class Meta:
app_label = 'machine'
verbose_name = 'Process'
verbose_name_plural = 'Processes'
indexes = [
models.Index(fields=['machine', 'status', 'retry_at']),
models.Index(fields=['binary', 'exit_code']),
]
def __str__(self) -> str:
cmd_str = ' '.join(self.cmd[:3]) if self.cmd else '(no cmd)'
return f'Process[{self.id}] {cmd_str} ({self.status})'
# Properties that delegate to related objects
@property
def cmd_version(self) -> str:
"""Get version from associated binary."""
return self.binary.version if self.binary else ''
@property
def bin_abspath(self) -> str:
"""Get absolute path from associated binary."""
return self.binary.abspath if self.binary else ''
@property
def plugin(self) -> str:
"""Get plugin name from associated ArchiveResult (if any)."""
if hasattr(self, 'archiveresult'):
# Inline import to avoid circular dependency
return self.archiveresult.plugin
return ''
@property
def hook_name(self) -> str:
"""Get hook name from associated ArchiveResult (if any)."""
if hasattr(self, 'archiveresult'):
return self.archiveresult.hook_name
return ''
def to_json(self) -> dict:
"""
Convert Process model instance to a JSON-serializable dict.
"""
from archivebox.config import VERSION
record = {
'type': 'Process',
'schema_version': VERSION,
'id': str(self.id),
'machine_id': str(self.machine_id),
'cmd': self.cmd,
'pwd': self.pwd,
'status': self.status,
'exit_code': self.exit_code,
'started_at': self.started_at.isoformat() if self.started_at else None,
'ended_at': self.ended_at.isoformat() if self.ended_at else None,
}
# Include optional fields if set
if self.binary_id:
record['binary_id'] = str(self.binary_id)
if self.pid:
record['pid'] = self.pid
if self.timeout:
record['timeout'] = self.timeout
return record
@staticmethod
def from_json(record: dict, overrides: dict = None):
"""
Create/update Process from JSON dict.
Args:
record: JSON dict with 'id' or process details
overrides: Optional dict of field overrides
Returns:
Process instance or None
"""
process_id = record.get('id')
if process_id:
try:
return Process.objects.get(id=process_id)
except Process.DoesNotExist:
pass
return None
def update_and_requeue(self, **kwargs):
"""
Update process fields and requeue for worker state machine.
Sets modified_at to ensure workers pick up changes.
"""
for key, value in kwargs.items():
setattr(self, key, value)
self.modified_at = timezone.now()
self.save()
# =========================================================================
# Process.current() and hierarchy methods
# =========================================================================
@classmethod
def current(cls) -> 'Process':
"""
Get or create the Process record for the current OS process.
Similar to Machine.current(), this:
1. Checks cache for existing Process with matching PID
2. Validates the cached Process is still valid (PID not reused)
3. Creates new Process if needed
IMPORTANT: Uses psutil to validate PID hasn't been reused.
PIDs are recycled by OS, so we compare start times.
"""
global _CURRENT_PROCESS
current_pid = os.getpid()
machine = Machine.current()
# Check cache validity
if _CURRENT_PROCESS:
# Verify: same PID, same machine, cache not expired
if (_CURRENT_PROCESS.pid == current_pid and
_CURRENT_PROCESS.machine_id == machine.id and
timezone.now() < _CURRENT_PROCESS.modified_at + timedelta(seconds=PROCESS_RECHECK_INTERVAL)):
return _CURRENT_PROCESS
_CURRENT_PROCESS = None
# Get actual process start time from OS for validation
os_start_time = None
if PSUTIL_AVAILABLE:
try:
os_proc = psutil.Process(current_pid)
os_start_time = os_proc.create_time()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Try to find existing Process for this PID on this machine
# Filter by: machine + PID + RUNNING + recent + start time matches
if os_start_time:
existing = cls.objects.filter(
machine=machine,
pid=current_pid,
status=cls.StatusChoices.RUNNING,
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
).order_by('-started_at').first()
if existing and existing.started_at:
db_start_time = existing.started_at.timestamp()
if abs(db_start_time - os_start_time) < START_TIME_TOLERANCE:
_CURRENT_PROCESS = existing
return existing
# No valid existing record - create new one
parent = cls._find_parent_process(machine)
process_type = cls._detect_process_type()
# Use psutil cmdline if available (matches what proc() will validate against)
# Otherwise fall back to sys.argv
cmd = sys.argv
if PSUTIL_AVAILABLE:
try:
os_proc = psutil.Process(current_pid)
cmd = os_proc.cmdline()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Use psutil start time if available (more accurate than timezone.now())
if os_start_time:
started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone())
else:
started_at = timezone.now()
_CURRENT_PROCESS = cls.objects.create(
machine=machine,
parent=parent,
process_type=process_type,
cmd=cmd,
pwd=os.getcwd(),
pid=current_pid,
started_at=started_at,
status=cls.StatusChoices.RUNNING,
)
return _CURRENT_PROCESS
@classmethod
def _find_parent_process(cls, machine: 'Machine' = None) -> 'Process | None':
"""
Find the parent Process record by looking up PPID.
IMPORTANT: Validates against PID reuse by checking:
1. Same machine (PIDs are only unique per machine)
2. Start time matches OS process start time
3. Process is still RUNNING and recent
Returns None if parent is not an ArchiveBox process.
"""
if not PSUTIL_AVAILABLE:
return None
ppid = os.getppid()
machine = machine or Machine.current()
# Get parent process start time from OS
try:
os_parent = psutil.Process(ppid)
os_parent_start = os_parent.create_time()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return None # Parent process doesn't exist
# Find matching Process record
candidates = cls.objects.filter(
machine=machine,
pid=ppid,
status=cls.StatusChoices.RUNNING,
started_at__gte=timezone.now() - PID_REUSE_WINDOW,
).order_by('-started_at')
for candidate in candidates:
if candidate.started_at:
db_start_time = candidate.started_at.timestamp()
if abs(db_start_time - os_parent_start) < START_TIME_TOLERANCE:
return candidate
return None # No matching ArchiveBox parent process
@classmethod
def _detect_process_type(cls) -> str:
"""
Detect the type of the current process from sys.argv.
"""
argv_str = ' '.join(sys.argv).lower()
if 'supervisord' in argv_str:
return cls.TypeChoices.SUPERVISORD
elif 'orchestrator' in argv_str:
return cls.TypeChoices.ORCHESTRATOR
elif any(w in argv_str for w in ['crawl_worker', 'snapshot_worker', 'archiveresult_worker']):
return cls.TypeChoices.WORKER
elif 'archivebox' in argv_str:
return cls.TypeChoices.CLI
else:
return cls.TypeChoices.BINARY
@classmethod
def cleanup_stale_running(cls, machine: 'Machine' = None) -> int:
"""
Mark stale RUNNING processes as EXITED.
Processes are stale if:
- Status is RUNNING but OS process no longer exists
- Status is RUNNING but started_at is older than PID_REUSE_WINDOW
Returns count of processes cleaned up.
"""
machine = machine or Machine.current()
cleaned = 0
stale = cls.objects.filter(
machine=machine,
status=cls.StatusChoices.RUNNING,
)
for proc in stale:
is_stale = False
# Check if too old (PID definitely reused)
if proc.started_at and proc.started_at < timezone.now() - PID_REUSE_WINDOW:
is_stale = True
elif PSUTIL_AVAILABLE and proc.pid is not None:
# Check if OS process still exists with matching start time
try:
os_proc = psutil.Process(proc.pid)
if proc.started_at:
db_start = proc.started_at.timestamp()
os_start = os_proc.create_time()
if abs(db_start - os_start) > START_TIME_TOLERANCE:
is_stale = True # PID reused by different process
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
is_stale = True # Process no longer exists
if is_stale:
proc.status = cls.StatusChoices.EXITED
proc.ended_at = proc.ended_at or timezone.now()
proc.exit_code = proc.exit_code if proc.exit_code is not None else -1
proc.save(update_fields=['status', 'ended_at', 'exit_code'])
cleaned += 1
return cleaned
# =========================================================================
# Tree traversal properties
# =========================================================================
@property
def root(self) -> 'Process':
"""Get the root process (CLI command) of this hierarchy."""
proc = self
while proc.parent_id:
proc = proc.parent
return proc
@property
def ancestors(self) -> list['Process']:
"""Get all ancestor processes from parent to root."""
ancestors = []
proc = self.parent
while proc:
ancestors.append(proc)
proc = proc.parent
return ancestors
@property
def depth(self) -> int:
"""Get depth in the process tree (0 = root)."""
return len(self.ancestors)
def get_descendants(self, include_self: bool = False):
"""Get all descendant processes recursively."""
if include_self:
pks = [self.pk]
else:
pks = []
children = list(self.children.values_list('pk', flat=True))
while children:
pks.extend(children)
children = list(Process.objects.filter(parent_id__in=children).values_list('pk', flat=True))
return Process.objects.filter(pk__in=pks)
# =========================================================================
# Validated psutil access via .proc property
# =========================================================================
@property
def proc(self) -> 'psutil.Process | None':
"""
Get validated psutil.Process for this record.
Returns psutil.Process ONLY if:
1. Process with this PID exists in OS
2. OS process start time matches our started_at (within tolerance)
3. Process is on current machine
Returns None if:
- PID doesn't exist (process exited)
- PID was reused by a different process (start times don't match)
- We're on a different machine than where process ran
- psutil is not available
This prevents accidentally matching a stale/recycled PID.
"""
if not PSUTIL_AVAILABLE:
return None
# Can't get psutil.Process if we don't have a PID
if not self.pid:
return None
# Can't validate processes on other machines
if self.machine_id != Machine.current().id:
return None
try:
os_proc = psutil.Process(self.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
return None # Process no longer exists
# Validate start time matches to prevent PID reuse confusion
if self.started_at:
os_start_time = os_proc.create_time()
db_start_time = self.started_at.timestamp()
if abs(os_start_time - db_start_time) > START_TIME_TOLERANCE:
# PID has been reused by a different process!
return None
# Optionally validate command matches (extra safety)
if self.cmd:
try:
os_cmdline = os_proc.cmdline()
# Check if first arg (binary) matches
if os_cmdline and self.cmd:
os_binary = os_cmdline[0] if os_cmdline else ''
db_binary = self.cmd[0] if self.cmd else ''
# Match by basename (handles /usr/bin/python3 vs python3)
if os_binary and db_binary:
if Path(os_binary).name != Path(db_binary).name:
return None # Different binary, PID reused
except (psutil.AccessDenied, psutil.ZombieProcess):
pass # Can't check cmdline, trust start time match
return os_proc
@property
def is_running(self) -> bool:
"""
Check if process is currently running via psutil.
More reliable than checking status field since it validates
the actual OS process exists and matches our record.
"""
proc = self.proc
return proc is not None and proc.is_running()
def is_alive(self) -> bool:
"""
Alias for is_running, for compatibility with subprocess.Popen API.
"""
return self.is_running
def get_memory_info(self) -> dict | None:
"""Get memory usage if process is running."""
proc = self.proc
if proc:
try:
mem = proc.memory_info()
return {'rss': mem.rss, 'vms': mem.vms}
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return None
def get_cpu_percent(self) -> float | None:
"""Get CPU usage percentage if process is running."""
proc = self.proc
if proc:
try:
return proc.cpu_percent(interval=0.1)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return None
def get_children_pids(self) -> list[int]:
"""Get PIDs of child processes from OS (not DB)."""
proc = self.proc
if proc:
try:
return [child.pid for child in proc.children(recursive=True)]
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return []
# =========================================================================
# Lifecycle methods (launch, kill, poll, wait)
# =========================================================================
@property
def pid_file(self) -> Path:
"""Path to PID file for this process."""
return Path(self.pwd) / 'process.pid' if self.pwd else None
@property
def cmd_file(self) -> Path:
"""Path to cmd.sh script for this process."""
return Path(self.pwd) / 'cmd.sh' if self.pwd else None
@property
def stdout_file(self) -> Path:
"""Path to stdout log."""
return Path(self.pwd) / 'stdout.log' if self.pwd else None
@property
def stderr_file(self) -> Path:
"""Path to stderr log."""
return Path(self.pwd) / 'stderr.log' if self.pwd else None
def _write_pid_file(self) -> None:
"""Write PID file with mtime set to process start time."""
from archivebox.misc.process_utils import write_pid_file_with_mtime
if self.pid and self.started_at and self.pid_file:
write_pid_file_with_mtime(
self.pid_file,
self.pid,
self.started_at.timestamp()
)
def _write_cmd_file(self) -> None:
"""Write cmd.sh script for debugging/validation."""
from archivebox.misc.process_utils import write_cmd_file
if self.cmd and self.cmd_file:
write_cmd_file(self.cmd_file, self.cmd)
def _build_env(self) -> dict:
"""Build environment dict for subprocess, merging stored env with system."""
env = os.environ.copy()
env.update(self.env or {})
return env
def launch(self, background: bool = False) -> 'Process':
"""
Spawn the subprocess and update this Process record.
Args:
background: If True, don't wait for completion (for daemons/bg hooks)
Returns:
self (updated with pid, started_at, etc.)
"""
import subprocess
import time
# Validate pwd is set (required for output files)
if not self.pwd:
raise ValueError("Process.pwd must be set before calling launch()")
# Ensure output directory exists
Path(self.pwd).mkdir(parents=True, exist_ok=True)
# Write cmd.sh for debugging
self._write_cmd_file()
stdout_path = self.stdout_file
stderr_path = self.stderr_file
with open(stdout_path, 'w') as out, open(stderr_path, 'w') as err:
proc = subprocess.Popen(
self.cmd,
cwd=self.pwd,
stdout=out,
stderr=err,
env=self._build_env(),
)
# Get accurate start time from psutil if available
if PSUTIL_AVAILABLE:
try:
ps_proc = psutil.Process(proc.pid)
self.started_at = datetime.fromtimestamp(
ps_proc.create_time(),
tz=timezone.get_current_timezone()
)
except (psutil.NoSuchProcess, psutil.AccessDenied):
self.started_at = timezone.now()
else:
self.started_at = timezone.now()
self.pid = proc.pid
self.status = self.StatusChoices.RUNNING
self.save()
self._write_pid_file()
if not background:
try:
proc.wait(timeout=self.timeout)
self.exit_code = proc.returncode
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
self.exit_code = -1
self.ended_at = timezone.now()
if stdout_path.exists():
self.stdout = stdout_path.read_text()
if stderr_path.exists():
self.stderr = stderr_path.read_text()
self.status = self.StatusChoices.EXITED
self.save()
return self
def kill(self, signal_num: int = 15) -> bool:
"""
Kill this process and update status.
Uses self.proc for safe killing - only kills if PID matches
our recorded process (prevents killing recycled PIDs).
Args:
signal_num: Signal to send (default SIGTERM=15)
Returns:
True if killed successfully, False otherwise
"""
# Use validated psutil.Process to ensure we're killing the right process
proc = self.proc
if proc is None:
# Process doesn't exist or PID was recycled - just update status
if self.status != self.StatusChoices.EXITED:
self.status = self.StatusChoices.EXITED
self.ended_at = self.ended_at or timezone.now()
self.save()
return False
try:
# Safe to kill - we validated it's our process via start time match
proc.send_signal(signal_num)
# Update our record
# Use standard Unix convention: 128 + signal number
self.exit_code = 128 + signal_num
self.ended_at = timezone.now()
self.status = self.StatusChoices.EXITED
self.save()
# Clean up PID file
if self.pid_file and self.pid_file.exists():
self.pid_file.unlink(missing_ok=True)
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, ProcessLookupError):
# Process already exited between proc check and kill
self.status = self.StatusChoices.EXITED
self.ended_at = self.ended_at or timezone.now()
self.save()
return False
def poll(self) -> int | None:
"""
Check if process has exited and update status if so.
Returns:
exit_code if exited, None if still running
"""
if self.status == self.StatusChoices.EXITED:
return self.exit_code
if not self.is_running:
# Process exited - read output and update status
if self.stdout_file and self.stdout_file.exists():
self.stdout = self.stdout_file.read_text()
if self.stderr_file and self.stderr_file.exists():
self.stderr = self.stderr_file.read_text()
# Try to get exit code from proc or default to unknown
self.exit_code = self.exit_code if self.exit_code is not None else -1
self.ended_at = timezone.now()
self.status = self.StatusChoices.EXITED
self.save()
return self.exit_code
return None # Still running
def wait(self, timeout: int | None = None) -> int:
"""
Wait for process to exit, polling periodically.
Args:
timeout: Max seconds to wait (None = use self.timeout)
Returns:
exit_code
Raises:
TimeoutError if process doesn't exit in time
"""
import time
timeout = timeout or self.timeout
start = time.time()
while True:
exit_code = self.poll()
if exit_code is not None:
return exit_code
if time.time() - start > timeout:
raise TimeoutError(f"Process {self.id} did not exit within {timeout}s")
time.sleep(0.1)
def terminate(self, graceful_timeout: float = 5.0) -> bool:
"""
Gracefully terminate process: SIGTERM → wait → SIGKILL.
This consolidates the scattered SIGTERM/SIGKILL logic from:
- crawls/models.py Crawl.cleanup()
- workers/pid_utils.py stop_worker()
- supervisord_util.py stop_existing_supervisord_process()
Args:
graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
Returns:
True if process was terminated, False if already dead
"""
import time
import signal
proc = self.proc
if proc is None:
# Already dead - just update status
if self.status != self.StatusChoices.EXITED:
self.status = self.StatusChoices.EXITED
self.ended_at = self.ended_at or timezone.now()
self.save()
return False
try:
# Step 1: Send SIGTERM for graceful shutdown
proc.terminate()
# Step 2: Wait for graceful exit
try:
exit_status = proc.wait(timeout=graceful_timeout)
# Process exited gracefully
# psutil.Process.wait() returns the exit status
self.exit_code = exit_status if exit_status is not None else 0
self.status = self.StatusChoices.EXITED
self.ended_at = timezone.now()
self.save()
return True
except psutil.TimeoutExpired:
pass # Still running, need to force kill
# Step 3: Force kill with SIGKILL
proc.kill()
proc.wait(timeout=2)
# Use standard Unix convention: 128 + signal number
self.exit_code = 128 + signal.SIGKILL
self.status = self.StatusChoices.EXITED
self.ended_at = timezone.now()
self.save()
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# Process already dead
self.status = self.StatusChoices.EXITED
self.ended_at = self.ended_at or timezone.now()
self.save()
return False
def kill_tree(self, graceful_timeout: float = 2.0) -> int:
"""
Kill this process and all its children (OS children, not DB children).
This consolidates the scattered child-killing logic from:
- crawls/models.py Crawl.cleanup() os.killpg()
- supervisord_util.py stop_existing_supervisord_process()
Args:
graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
Returns:
Number of processes killed (including self)
"""
import signal
killed_count = 0
proc = self.proc
if proc is None:
# Already dead
if self.status != self.StatusChoices.EXITED:
self.status = self.StatusChoices.EXITED
self.ended_at = self.ended_at or timezone.now()
self.save()
return 0
try:
# Get all children before killing parent
children = proc.children(recursive=True)
# Kill children first (reverse order - deepest first)
for child in reversed(children):
try:
child.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Child already dead or we don't have permission - continue
pass
# Wait briefly for children to exit
gone, alive = psutil.wait_procs(children, timeout=graceful_timeout)
killed_count += len(gone)
# Force kill remaining children
for child in alive:
try:
child.kill()
killed_count += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Child exited or we don't have permission - continue
pass
# Now kill self
if self.terminate(graceful_timeout=graceful_timeout):
killed_count += 1
return killed_count
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# Process tree already dead
self.status = self.StatusChoices.EXITED
self.ended_at = self.ended_at or timezone.now()
self.save()
return killed_count
def kill_children_db(self) -> int:
"""
Kill all DB-tracked child processes (via parent FK).
Different from kill_tree() which uses OS children.
This kills processes created via Process.create(parent=self).
Returns:
Number of child Process records killed
"""
killed = 0
for child in self.children.filter(status=self.StatusChoices.RUNNING):
if child.terminate():
killed += 1
return killed
# =========================================================================
# Class methods for querying processes
# =========================================================================
@classmethod
def get_running(cls, process_type: str = None, machine: 'Machine' = None) -> 'QuerySet[Process]':
"""
Get all running processes, optionally filtered by type.
Replaces:
- workers/pid_utils.py get_all_worker_pids()
- workers/orchestrator.py get_total_worker_count()
Args:
process_type: Filter by TypeChoices (e.g., 'worker', 'hook')
machine: Filter by machine (defaults to current)
Returns:
QuerySet of running Process records
"""
machine = machine or Machine.current()
qs = cls.objects.filter(
machine=machine,
status=cls.StatusChoices.RUNNING,
)
if process_type:
qs = qs.filter(process_type=process_type)
return qs
@classmethod
def get_running_count(cls, process_type: str = None, machine: 'Machine' = None) -> int:
"""
Get count of running processes.
Replaces:
- workers/pid_utils.py get_running_worker_count()
"""
return cls.get_running(process_type=process_type, machine=machine).count()
@classmethod
def stop_all(cls, process_type: str = None, machine: 'Machine' = None, graceful: bool = True) -> int:
"""
Stop all running processes of a given type.
Args:
process_type: Filter by TypeChoices
machine: Filter by machine
graceful: If True, use terminate() (SIGTERM→SIGKILL), else kill()
Returns:
Number of processes stopped
"""
stopped = 0
for proc in cls.get_running(process_type=process_type, machine=machine):
if graceful:
if proc.terminate():
stopped += 1
else:
if proc.kill():
stopped += 1
return stopped
@classmethod
def get_next_worker_id(cls, process_type: str = 'worker', machine: 'Machine' = None) -> int:
"""
Get the next available worker ID for spawning new workers.
Replaces workers/pid_utils.py get_next_worker_id().
Simply returns count of running workers of this type.
Args:
process_type: Worker type to count
machine: Machine to scope query
Returns:
Next available worker ID (0-indexed)
"""
return cls.get_running_count(process_type=process_type, machine=machine)
# =============================================================================
# Binary State Machine
# =============================================================================
class BinaryMachine(BaseStateMachine, strict_states=True):
"""
State machine for managing Binary installation lifecycle.
Hook Lifecycle:
┌─────────────────────────────────────────────────────────────┐
│ QUEUED State │
│ • Binary needs to be installed │
└─────────────────────────────────────────────────────────────┘
↓ tick() when can_start()
┌─────────────────────────────────────────────────────────────┐
│ STARTED State → enter_started() │
│ 1. binary.run() │
│ • discover_hooks('Binary') → all on_Binary__install_* │
│ • Try each provider hook in sequence: │
│ - run_hook(script, output_dir, ...) │
│ - If returncode == 0: │
│ * Read stdout.log │
│ * Parse JSONL for 'Binary' record with abspath │
│ * Update self: abspath, version, sha256, provider │
│ * Set status=SUCCEEDED, RETURN │
│ • If no hook succeeds: set status=FAILED │
└─────────────────────────────────────────────────────────────┘
↓ tick() checks status
┌─────────────────────────────────────────────────────────────┐
│ SUCCEEDED / FAILED │
│ • Set by binary.run() based on hook results │
│ • Health stats incremented (num_uses_succeeded/failed) │
└─────────────────────────────────────────────────────────────┘
"""
model_attr_name = 'binary'
# States
queued = State(value=Binary.StatusChoices.QUEUED, initial=True)
started = State(value=Binary.StatusChoices.STARTED)
succeeded = State(value=Binary.StatusChoices.SUCCEEDED, final=True)
failed = State(value=Binary.StatusChoices.FAILED, final=True)
# Tick Event - transitions based on conditions
tick = (
queued.to.itself(unless='can_start') |
queued.to(started, cond='can_start') |
started.to.itself(unless='is_finished') |
started.to(succeeded, cond='is_succeeded') |
started.to(failed, cond='is_failed')
)
def can_start(self) -> bool:
"""Check if binary installation can start."""
return bool(self.binary.name and self.binary.binproviders)
def is_succeeded(self) -> bool:
"""Check if installation succeeded (status was set by run())."""
return self.binary.status == Binary.StatusChoices.SUCCEEDED
def is_failed(self) -> bool:
"""Check if installation failed (status was set by run())."""
return self.binary.status == Binary.StatusChoices.FAILED
def is_finished(self) -> bool:
"""Check if installation has completed (success or failure)."""
return self.binary.status in (
Binary.StatusChoices.SUCCEEDED,
Binary.StatusChoices.FAILED,
)
@queued.enter
def enter_queued(self):
"""Binary is queued for installation."""
self.binary.update_and_requeue(
retry_at=timezone.now(),
status=Binary.StatusChoices.QUEUED,
)
@started.enter
def enter_started(self):
"""Start binary installation."""
# Lock the binary while installation runs
self.binary.update_and_requeue(
retry_at=timezone.now() + timedelta(seconds=300), # 5 min timeout for installation
status=Binary.StatusChoices.STARTED,
)
# Run installation hooks
self.binary.run()
# Save updated status (run() updates status to succeeded/failed)
self.binary.save()
@succeeded.enter
def enter_succeeded(self):
"""Binary installed successfully."""
self.binary.update_and_requeue(
retry_at=None,
status=Binary.StatusChoices.SUCCEEDED,
)
# Increment health stats
self.binary.increment_health_stats(success=True)
@failed.enter
def enter_failed(self):
"""Binary installation failed."""
self.binary.update_and_requeue(
retry_at=None,
status=Binary.StatusChoices.FAILED,
)
# Increment health stats
self.binary.increment_health_stats(success=False)
# =============================================================================
# Process State Machine
# =============================================================================
class ProcessMachine(BaseStateMachine, strict_states=True):
"""
State machine for managing Process (OS subprocess) lifecycle.
Process Lifecycle:
┌─────────────────────────────────────────────────────────────┐
│ QUEUED State │
│ • Process ready to launch, waiting for resources │
└─────────────────────────────────────────────────────────────┘
↓ tick() when can_start()
┌─────────────────────────────────────────────────────────────┐
│ RUNNING State → enter_running() │
│ 1. process.launch() │
│ • Spawn subprocess with cmd, pwd, env, timeout │
│ • Set pid, started_at │
│ • Process runs in background or foreground │
│ 2. Monitor process completion │
│ • Check exit code when process completes │
└─────────────────────────────────────────────────────────────┘
↓ tick() checks is_exited()
┌─────────────────────────────────────────────────────────────┐
│ EXITED State │
│ • Process completed (exit_code set) │
│ • Health stats incremented │
│ • stdout/stderr captured │
└─────────────────────────────────────────────────────────────┘
Note: This is a simpler state machine than ArchiveResult.
Process is just about execution lifecycle. ArchiveResult handles
the archival-specific logic (status, output parsing, etc.).
"""
model_attr_name = 'process'
# States
queued = State(value=Process.StatusChoices.QUEUED, initial=True)
running = State(value=Process.StatusChoices.RUNNING)
exited = State(value=Process.StatusChoices.EXITED, final=True)
# Tick Event - transitions based on conditions
tick = (
queued.to.itself(unless='can_start') |
queued.to(running, cond='can_start') |
running.to.itself(unless='is_exited') |
running.to(exited, cond='is_exited')
)
# Additional events (for explicit control)
launch = queued.to(running)
kill = running.to(exited)
def can_start(self) -> bool:
"""Check if process can start (has cmd and machine)."""
return bool(self.process.cmd and self.process.machine)
def is_exited(self) -> bool:
"""Check if process has exited (exit_code is set)."""
return self.process.exit_code is not None
@queued.enter
def enter_queued(self):
"""Process is queued for execution."""
self.process.update_and_requeue(
retry_at=timezone.now(),
status=Process.StatusChoices.QUEUED,
)
@running.enter
def enter_running(self):
"""Start process execution."""
# Lock the process while it runs
self.process.update_and_requeue(
retry_at=timezone.now() + timedelta(seconds=self.process.timeout),
status=Process.StatusChoices.RUNNING,
started_at=timezone.now(),
)
# Launch the subprocess
# NOTE: This is a placeholder - actual launch logic would
# be implemented based on how hooks currently spawn processes
# For now, Process is a data model that tracks execution metadata
# The actual subprocess spawning is still handled by run_hook()
# Mark as immediately exited for now (until we refactor run_hook)
# In the future, this would actually spawn the subprocess
self.process.exit_code = 0 # Placeholder
self.process.save()
@exited.enter
def enter_exited(self):
"""Process has exited."""
self.process.update_and_requeue(
retry_at=None,
status=Process.StatusChoices.EXITED,
ended_at=timezone.now(),
)
# =============================================================================
# State Machine Registration
# =============================================================================
# Manually register state machines with python-statemachine registry
registry.register(BinaryMachine)
registry.register(ProcessMachine)