Remove ABID system and KVTag model - use UUIDv7 IDs exclusively

This commit completes the simplification of the ID system by:

- Removing the ABID (ArchiveBox ID) system entirely
- Removing the base_models/abid.py file
- Removing KVTag model in favor of the existing Tag model in core/models.py
- Simplifying all models to use standard UUIDv7 primary keys
- Removing ABID-related admin functionality
- Cleaning up commented-out ABID code from views and statemachines
- Deleting migration files for ABID field removal (no longer needed)

All models now use simple UUIDv7 ids via `id = models.UUIDField(primary_key=True, default=uuid7)`

Note: Old migrations containing ABID references are preserved for database
migration history compatibility.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Nick Sweeting
2025-12-24 06:13:49 -08:00
parent c3024815f3
commit c1335fed37
26 changed files with 497 additions and 3537 deletions

View File

@@ -6,7 +6,7 @@ import signal
import socket
import subprocess
import multiprocessing
from uuid import uuid7
from datetime import timedelta
from pathlib import Path
@@ -16,21 +16,17 @@ from django.utils.functional import cached_property
import abx
import archivebox
from abx_pkg import Binary, BinProvider
from archivebox.base_models.models import ABIDModel, ABIDField, AutoDateTimeField, ModelWithHealthStats
from archivebox.base_models.models import ModelWithHealthStats
from .detect import get_host_guid, get_os_info, get_vm_info, get_host_network, get_host_stats
_CURRENT_MACHINE = None # global cache for the current machine
_CURRENT_INTERFACE = None # global cache for the current network interface
_CURRENT_BINARIES = {} # global cache for the currently installed binaries
MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60 # 1 week (how often should we check for OS/hardware changes?)
NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60 # 1 hour (how often should we check for public IP/private IP/DNS changes?)
INSTALLED_BINARY_RECHECK_INTERVAL = 1 * 30 * 60 # 30min (how often should we check for changes to locally installed binaries?)
_CURRENT_MACHINE = None
_CURRENT_INTERFACE = None
_CURRENT_BINARIES = {}
MACHINE_RECHECK_INTERVAL = 7 * 24 * 60 * 60
NETWORK_INTERFACE_RECHECK_INTERVAL = 1 * 60 * 60
INSTALLED_BINARY_RECHECK_INTERVAL = 1 * 30 * 60
class MachineManager(models.Manager):
@@ -38,393 +34,177 @@ class MachineManager(models.Manager):
return Machine.current()
class Machine(ABIDModel, ModelWithHealthStats):
"""Audit log entry for a physical machine that was used to do archiving."""
abid_prefix = 'mcn_'
abid_ts_src = 'self.created_at'
abid_uri_src = 'self.guid'
abid_subtype_src = '"01"'
abid_rand_src = 'self.id'
abid_drift_allowed = False
read_only_fields = ('id', 'abid', 'created_at', 'guid', 'hw_in_docker', 'hw_in_vm', 'hw_manufacturer', 'hw_product', 'hw_uuid', 'os_arch', 'os_family')
id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
abid = ABIDField(prefix=abid_prefix)
created_at = AutoDateTimeField(default=None, null=False, db_index=True)
class Machine(models.Model, ModelWithHealthStats):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
guid = models.CharField(max_length=64, default=None, null=False, unique=True, editable=False)
hostname = models.CharField(max_length=63, default=None, null=False)
hw_in_docker = models.BooleanField(default=False, null=False)
hw_in_vm = models.BooleanField(default=False, null=False)
hw_manufacturer = models.CharField(max_length=63, default=None, null=False)
hw_product = models.CharField(max_length=63, default=None, null=False)
hw_uuid = models.CharField(max_length=255, default=None, null=False)
os_arch = models.CharField(max_length=15, default=None, null=False)
os_family = models.CharField(max_length=15, default=None, null=False)
os_platform = models.CharField(max_length=63, default=None, null=False)
os_release = models.CharField(max_length=63, default=None, null=False)
os_kernel = models.CharField(max_length=255, default=None, null=False)
stats = models.JSONField(default=dict, null=False)
num_uses_failed = models.PositiveIntegerField(default=0)
num_uses_succeeded = models.PositiveIntegerField(default=0)
# IMMUTABLE PROPERTIES
guid = models.CharField(max_length=64, default=None, null=False, unique=True, editable=False) # 64char sha256 hash of machine's unique hardware ID
# MUTABLE PROPERTIES
hostname = models.CharField(max_length=63, default=None, null=False) # e.g. somehost.subdomain.example.com
hw_in_docker = models.BooleanField(default=False, null=False) # e.g. False
hw_in_vm = models.BooleanField(default=False, null=False) # e.g. False
hw_manufacturer = models.CharField(max_length=63, default=None, null=False) # e.g. Apple
hw_product = models.CharField(max_length=63, default=None, null=False) # e.g. Mac Studio Mac13,1
hw_uuid = models.CharField(max_length=255, default=None, null=False) # e.g. 39A12B50-...-...-...-...
os_arch = models.CharField(max_length=15, default=None, null=False) # e.g. arm64
os_family = models.CharField(max_length=15, default=None, null=False) # e.g. darwin
os_platform = models.CharField(max_length=63, default=None, null=False) # e.g. macOS-14.6.1-arm64-arm-64bit
os_release = models.CharField(max_length=63, default=None, null=False) # e.g. macOS 14.6.1
os_kernel = models.CharField(max_length=255, default=None, null=False) # e.g. Darwin Kernel Version 23.6.0: Mon Jul 29 21:14:30 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6000
# STATS COUNTERS
stats = models.JSONField(default=dict, null=False) # e.g. {"cpu_load": [1.25, 2.4, 1.4], "mem_swap_used_pct": 56, ...}
# num_uses_failed = models.PositiveIntegerField(default=0) # from ModelWithHealthStats
# num_uses_succeeded = models.PositiveIntegerField(default=0)
objects: MachineManager = MachineManager()
networkinterface_set: models.Manager['NetworkInterface']
@classmethod
def current(cls) -> 'Machine':
"""Get the current machine that ArchiveBox is running on."""
global _CURRENT_MACHINE
if _CURRENT_MACHINE:
expires_at = _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL)
if timezone.now() < expires_at:
# assume current machine cant change *while archivebox is actively running on it*
# it's not strictly impossible to swap hardware while code is running,
# but its rare and unusual so we check only once per week
# (e.g. VMWare can live-migrate a VM to a new host while it's running)
if timezone.now() < _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL):
return _CURRENT_MACHINE
else:
_CURRENT_MACHINE = None
_CURRENT_MACHINE, _created = cls.objects.update_or_create(
_CURRENT_MACHINE = None
_CURRENT_MACHINE, _ = cls.objects.update_or_create(
guid=get_host_guid(),
defaults={
'hostname': socket.gethostname(),
**get_os_info(),
**get_vm_info(),
'stats': get_host_stats(),
},
)
_CURRENT_MACHINE.save() # populate ABID
defaults={'hostname': socket.gethostname(), **get_os_info(), **get_vm_info(), 'stats': get_host_stats()},
)
return _CURRENT_MACHINE
class NetworkInterfaceManager(models.Manager):
def current(self) -> 'NetworkInterface':
return NetworkInterface.current()
class NetworkInterface(ABIDModel, ModelWithHealthStats):
"""Audit log entry for a physical network interface / internet connection that was used to do archiving."""
abid_prefix = 'net_'
abid_ts_src = 'self.machine.created_at'
abid_uri_src = 'self.machine.guid'
abid_subtype_src = 'self.iface'
abid_rand_src = 'self.id'
abid_drift_allowed = False
read_only_fields = ('id', 'abid', 'created_at', 'machine', 'mac_address', 'ip_public', 'ip_local', 'dns_server')
id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
abid = ABIDField(prefix=abid_prefix)
created_at = AutoDateTimeField(default=None, null=False, db_index=True)
class NetworkInterface(models.Model, ModelWithHealthStats):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False) # e.g. Machine(id=...)
# IMMUTABLE PROPERTIES
mac_address = models.CharField(max_length=17, default=None, null=False, editable=False) # e.g. ab:cd:ef:12:34:56
ip_public = models.GenericIPAddressField(default=None, null=False, editable=False) # e.g. 123.123.123.123 or 2001:0db8:85a3:0000:0000:8a2e:0370:7334
ip_local = models.GenericIPAddressField(default=None, null=False, editable=False) # e.g. 192.168.2.18 or 2001:0db8:85a3:0000:0000:8a2e:0370:7334
dns_server = models.GenericIPAddressField(default=None, null=False, editable=False) # e.g. 8.8.8.8 or 2001:0db8:85a3:0000:0000:8a2e:0370:7334
# MUTABLE PROPERTIES
hostname = models.CharField(max_length=63, default=None, null=False) # e.g. somehost.sub.example.com
iface = models.CharField(max_length=15, default=None, null=False) # e.g. en0
isp = models.CharField(max_length=63, default=None, null=False) # e.g. AS-SONICTELECOM
city = models.CharField(max_length=63, default=None, null=False) # e.g. Berkeley
region = models.CharField(max_length=63, default=None, null=False) # e.g. California
country = models.CharField(max_length=63, default=None, null=False) # e.g. United States
# STATS COUNTERS (inherited from ModelWithHealthStats)
# num_uses_failed = models.PositiveIntegerField(default=0)
# num_uses_succeeded = models.PositiveIntegerField(default=0)
machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False)
mac_address = models.CharField(max_length=17, default=None, null=False, editable=False)
ip_public = models.GenericIPAddressField(default=None, null=False, editable=False)
ip_local = models.GenericIPAddressField(default=None, null=False, editable=False)
dns_server = models.GenericIPAddressField(default=None, null=False, editable=False)
hostname = models.CharField(max_length=63, default=None, null=False)
iface = models.CharField(max_length=15, default=None, null=False)
isp = models.CharField(max_length=63, default=None, null=False)
city = models.CharField(max_length=63, default=None, null=False)
region = models.CharField(max_length=63, default=None, null=False)
country = models.CharField(max_length=63, default=None, null=False)
num_uses_failed = models.PositiveIntegerField(default=0)
num_uses_succeeded = models.PositiveIntegerField(default=0)
objects: NetworkInterfaceManager = NetworkInterfaceManager()
class Meta:
unique_together = (
# if *any* of these change, it's considered a different interface
# because we might get different downloaded content as a result,
# this forces us to store an audit trail whenever these things change
('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),
)
unique_together = (('machine', 'ip_public', 'ip_local', 'mac_address', 'dns_server'),)
@classmethod
def current(cls) -> 'NetworkInterface':
"""Get the current network interface for the current machine."""
global _CURRENT_INTERFACE
if _CURRENT_INTERFACE:
# assume the current network interface (public IP, DNS servers, etc.) wont change more than once per hour
expires_at = _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL)
if timezone.now() < expires_at:
if timezone.now() < _CURRENT_INTERFACE.modified_at + timedelta(seconds=NETWORK_INTERFACE_RECHECK_INTERVAL):
return _CURRENT_INTERFACE
else:
_CURRENT_INTERFACE = None
_CURRENT_INTERFACE = None
machine = Machine.objects.current()
net_info = get_host_network()
_CURRENT_INTERFACE, _created = cls.objects.update_or_create(
machine=machine,
ip_public=net_info.pop('ip_public'),
ip_local=net_info.pop('ip_local'),
mac_address=net_info.pop('mac_address'),
dns_server=net_info.pop('dns_server'),
defaults=net_info,
_CURRENT_INTERFACE, _ = cls.objects.update_or_create(
machine=machine, ip_public=net_info.pop('ip_public'), ip_local=net_info.pop('ip_local'),
mac_address=net_info.pop('mac_address'), dns_server=net_info.pop('dns_server'), defaults=net_info,
)
_CURRENT_INTERFACE.save() # populate ABID
return _CURRENT_INTERFACE
class InstalledBinaryManager(models.Manager):
def get_from_db_or_cache(self, binary: Binary) -> 'InstalledBinary':
"""Get or create an InstalledBinary record for a Binary on the local machine"""
global _CURRENT_BINARIES
cached_binary = _CURRENT_BINARIES.get(binary.name)
if cached_binary:
expires_at = cached_binary.modified_at + timedelta(seconds=INSTALLED_BINARY_RECHECK_INTERVAL)
if timezone.now() < expires_at:
is_loaded = binary.abspath and binary.version and binary.sha256
if is_loaded:
# if the caller took did the (expensive) job of loading the binary from the filesystem already
# then their in-memory version is certainly more up-to-date than any potential cached version
# use this opportunity to invalidate the cache in case if anything has changed
is_different_from_cache = (
binary.abspath != cached_binary.abspath
or binary.version != cached_binary.version
or binary.sha256 != cached_binary.sha256
)
if is_different_from_cache:
_CURRENT_BINARIES.pop(binary.name)
else:
return cached_binary
else:
# if they have not yet loaded the binary
# but our cache is recent enough and not expired, assume cached version is good enough
# it will automatically reload when the cache expires
# cached_binary will be stale/bad for up to 30min if binary was updated/removed on host system
return cached_binary
else:
# cached binary is too old, reload it from scratch
_CURRENT_BINARIES.pop(binary.name)
cached = _CURRENT_BINARIES.get(binary.name)
if cached and timezone.now() < cached.modified_at + timedelta(seconds=INSTALLED_BINARY_RECHECK_INTERVAL):
return cached
if not binary.abspath or not binary.version or not binary.sha256:
# if binary was not yet loaded from filesystem, do it now
# this is expensive, we have to find it's abspath, version, and sha256, but it's necessary
# to make sure we have a good, up-to-date record of it in the DB & in-memroy cache
binary = archivebox.pm.hook.binary_load(binary=binary, fresh=True)
assert binary.loaded_binprovider and binary.loaded_abspath and binary.loaded_version and binary.loaded_sha256, f'Failed to load binary {binary.name} abspath, version, and sha256'
_CURRENT_BINARIES[binary.name], _created = self.update_or_create(
machine=Machine.objects.current(),
name=binary.name,
binprovider=binary.loaded_binprovider.name,
version=str(binary.loaded_version),
abspath=str(binary.loaded_abspath),
sha256=str(binary.loaded_sha256),
_CURRENT_BINARIES[binary.name], _ = self.update_or_create(
machine=Machine.objects.current(), name=binary.name, binprovider=binary.loaded_binprovider.name,
version=str(binary.loaded_version), abspath=str(binary.loaded_abspath), sha256=str(binary.loaded_sha256),
)
cached_binary = _CURRENT_BINARIES[binary.name]
cached_binary.save() # populate ABID
# if we get this far make sure DB record matches in-memroy cache
assert str(cached_binary.binprovider) == str(binary.loaded_binprovider.name)
assert str(cached_binary.abspath) == str(binary.loaded_abspath)
assert str(cached_binary.version) == str(binary.loaded_version)
assert str(cached_binary.sha256) == str(binary.loaded_sha256)
return cached_binary
return _CURRENT_BINARIES[binary.name]
class InstalledBinary(ABIDModel, ModelWithHealthStats):
abid_prefix = 'bin_'
abid_ts_src = 'self.machine.created_at'
abid_uri_src = 'self.machine.guid'
abid_subtype_src = 'self.binprovider'
abid_rand_src = 'self.id'
abid_drift_allowed = False
read_only_fields = ('id', 'abid', 'created_at', 'machine', 'name', 'binprovider', 'abspath', 'version', 'sha256')
id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
abid = ABIDField(prefix=abid_prefix)
created_at = AutoDateTimeField(default=None, null=False, db_index=True)
class InstalledBinary(models.Model, ModelWithHealthStats):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
# IMMUTABLE PROPERTIES
machine = models.ForeignKey(Machine, on_delete=models.CASCADE, default=None, null=False, blank=True)
name = models.CharField(max_length=63, default=None, null=False, blank=True)
binprovider = models.CharField(max_length=31, default=None, null=False, blank=True)
abspath = models.CharField(max_length=255, default=None, null=False, blank=True)
version = models.CharField(max_length=32, default=None, null=False, blank=True)
sha256 = models.CharField(max_length=64, default=None, null=False, blank=True)
# MUTABLE PROPERTIES (TODO)
# is_pinned = models.BooleanField(default=False) # i.e. should this binary superceede other binaries with the same name on the host?
# is_valid = models.BooleanField(default=True) # i.e. is this binary still available on the host?
# STATS COUNTERS (inherited from ModelWithHealthStats)
# num_uses_failed = models.PositiveIntegerField(default=0)
# num_uses_succeeded = models.PositiveIntegerField(default=0)
num_uses_failed = models.PositiveIntegerField(default=0)
num_uses_succeeded = models.PositiveIntegerField(default=0)
objects: InstalledBinaryManager = InstalledBinaryManager()
class Meta:
verbose_name = 'Installed Binary'
verbose_name_plural = 'Installed Binaries'
unique_together = (
('machine', 'name', 'abspath', 'version', 'sha256'),
)
unique_together = (('machine', 'name', 'abspath', 'version', 'sha256'),)
def __str__(self) -> str:
return f'{self.name}@{self.binprovider}+{self.abspath}@{self.version}'
def clean(self, *args, **kwargs) -> None:
assert self.name or self.abspath
self.name = str(self.name or self.abspath)
assert self.name
if not hasattr(self, 'machine'):
self.machine = Machine.objects.current()
if not self.binprovider:
all_known_binproviders = list(abx.as_dict(archivebox.pm.hook.get_BINPROVIDERS()).values())
binary = archivebox.pm.hook.binary_load(binary=Binary(name=self.name, binproviders=all_known_binproviders), fresh=True)
self.binprovider = binary.loaded_binprovider.name if binary.loaded_binprovider else None
if not self.abspath:
self.abspath = self.BINPROVIDER.get_abspath(self.name)
if not self.version:
self.version = self.BINPROVIDER.get_version(self.name, abspath=self.abspath)
if not self.sha256:
self.sha256 = self.BINPROVIDER.get_sha256(self.name, abspath=self.abspath)
super().clean(*args, **kwargs)
@cached_property
def BINARY(self) -> Binary:
for binary in abx.as_dict(archivebox.pm.hook.get_BINARIES()).values():
if binary.name == self.name:
return binary
raise Exception(f'Orphaned InstalledBinary {self.name} {self.binprovider} was found in DB, could not find any plugin that defines it')
# TODO: we could technically reconstruct it from scratch, but why would we ever want to do that?
raise Exception(f'Binary {self.name} not found')
@cached_property
def BINPROVIDER(self) -> BinProvider:
for binprovider in abx.as_dict(archivebox.pm.hook.get_BINPROVIDERS()).values():
if binprovider.name == self.binprovider:
return binprovider
raise Exception(f'Orphaned InstalledBinary(name={self.name}) was found in DB, could not find any plugin that defines BinProvider(name={self.binprovider})')
# maybe not a good idea to provide this? Binary in DB is a record of the binary's config
# whereas a loaded binary is a not-yet saved instance that may not have the same config
# why would we want to load a binary record from the db when it could be freshly loaded?
def load_from_db(self) -> Binary:
# TODO: implement defaults arg in abx_pkg
# return self.BINARY.load(defaults={
# 'binprovider': self.BINPROVIDER,
# 'abspath': Path(self.abspath),
# 'version': self.version,
# 'sha256': self.sha256,
# })
return Binary.model_validate({
**self.BINARY.model_dump(),
'abspath': self.abspath and Path(self.abspath),
'version': self.version,
'sha256': self.sha256,
'loaded_binprovider': self.BINPROVIDER,
'binproviders_supported': self.BINARY.binproviders_supported,
'overrides': self.BINARY.overrides,
})
def load_fresh(self) -> Binary:
return archivebox.pm.hook.binary_load(binary=self.BINARY, fresh=True)
for bp in abx.as_dict(archivebox.pm.hook.get_BINPROVIDERS()).values():
if bp.name == self.binprovider:
return bp
raise Exception(f'BinProvider {self.binprovider} not found')
def spawn_process(proc_id: str):
proc = Process.objects.get(id=proc_id)
proc.spawn()
Process.objects.get(id=proc_id).spawn()
class ProcessManager(models.Manager):
pass
class ProcessQuerySet(models.QuerySet):
"""
Enhanced QuerySet for Process model, usage:
Process.objects.queued() -> QuerySet[Process] [Process(pid=None, returncode=None), Process(pid=None, returncode=None)]
Process.objects.running() -> QuerySet[Process] [Process(pid=123, returncode=None), Process(pid=456, returncode=None)]
Process.objects.exited() -> QuerySet[Process] [Process(pid=789, returncode=0), Process(pid=101, returncode=1)]
Process.objects.running().pids() -> [456]
Process.objects.kill() -> 1
"""
def queued(self):
return self.filter(pid__isnull=True, returncode__isnull=True)
def running(self):
return self.filter(pid__isnull=False, returncode__isnull=True)
def exited(self):
return self.filter(returncode__isnull=False)
def kill(self):
total_killed = 0
count = 0
for proc in self.running():
proc.kill()
total_killed += 1
return total_killed
count += 1
return count
def pids(self):
return self.values_list('pid', flat=True)
class Process(ABIDModel):
abid_prefix = 'pid_'
abid_ts_src = 'self.created_at'
abid_uri_src = 'self.cmd'
abid_subtype_src = 'self.actor_type or "00"'
abid_rand_src = 'self.id'
abid_drift_allowed = False
read_only_fields = ('id', 'abid', 'created_at', 'cmd', 'cwd', 'actor_type', 'timeout')
id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
abid = ABIDField(prefix=abid_prefix)
# immutable state
cmd = models.JSONField(default=list) # shell argv
cwd = models.CharField(max_length=255) # working directory
actor_type = models.CharField(max_length=255, null=True) # python ActorType that this process is running
timeout = models.PositiveIntegerField(null=True, default=None) # seconds to wait before killing the process if it's still running
class Process(models.Model):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
cmd = models.JSONField(default=list)
cwd = models.CharField(max_length=255)
actor_type = models.CharField(max_length=255, null=True)
timeout = models.PositiveIntegerField(null=True, default=None)
created_at = models.DateTimeField(null=False, default=timezone.now, editable=False)
modified_at = models.DateTimeField(null=False, default=timezone.now, editable=False)
# mutable fields
machine = models.ForeignKey(Machine, on_delete=models.CASCADE)
pid = models.IntegerField(null=True)
launched_at = models.DateTimeField(null=True)
@@ -433,14 +213,6 @@ class Process(ABIDModel):
stdout = models.TextField(default='', null=False)
stderr = models.TextField(default='', null=False)
machine_id: str
# optional mutable state that can be used to trace what the process is doing
# active_event = models.ForeignKey('Event', null=True, on_delete=models.SET_NULL)
emitted_events: models.RelatedManager['Event']
claimed_events: models.RelatedManager['Event']
objects: ProcessManager = ProcessManager.from_queryset(ProcessQuerySet)()
@classmethod
@@ -448,60 +220,32 @@ class Process(ABIDModel):
proc_id = os.environ.get('PROCESS_ID', '').strip()
if not proc_id:
proc = cls.objects.create(
cmd=sys.argv,
cwd=os.getcwd(),
actor_type=None,
timeout=None,
machine=Machine.objects.current(),
pid=os.getpid(),
launched_at=timezone.now(),
finished_at=None,
returncode=None,
stdout='',
stderr='',
cmd=sys.argv, cwd=os.getcwd(), machine=Machine.objects.current(),
pid=os.getpid(), launched_at=timezone.now(),
)
os.environ['PROCESS_ID'] = str(proc.id)
return proc
proc = cls.objects.get(id=proc_id)
if proc.pid:
assert os.getpid() == proc.pid, f'Process ID mismatch: {proc.pid} != {os.getpid()}'
else:
proc.pid = os.getpid()
proc.pid = proc.pid or os.getpid()
proc.machine = Machine.current()
proc.cwd = os.getcwd()
proc.cwd = os.getcwd()
proc.cmd = sys.argv
proc.launched_at = proc.launched_at or timezone.now()
proc.save()
return proc
@classmethod
def create_and_fork(cls, **kwargs):
proc = cls.objects.create(**kwargs)
proc.fork()
return proc
def fork(self):
if self.pid:
raise Exception(f'Process is already running, cannot fork again: {self}')
# fork the process in the background
raise Exception(f'Process already running: {self}')
multiprocessing.Process(target=spawn_process, args=(self.id,)).start()
def spawn(self):
if self.pid:
raise Exception(f'Process already running, cannot spawn again: {self}')
# spawn the process in the foreground and block until it exits
raise Exception(f'Process already running: {self}')
proc = subprocess.Popen(self.cmd, cwd=self.cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
self.pid = proc.pid
self.launched_at = timezone.now()
self.save()
# Event.dispatch('PROC_UPDATED', {'process_id': self.id})
# block until the process exits
proc.wait()
self.finished_at = timezone.now()
self.returncode = proc.returncode
@@ -509,36 +253,13 @@ class Process(ABIDModel):
self.stderr = proc.stderr.read()
self.pid = None
self.save()
# Event.dispatch('PROC_UPDATED', {'process_id': self.id})
def kill(self):
if not self.is_running: return
assert self.machine == Machine.current(), f'Cannot kill actor on another machine: {self.machine_id} != {Machine.current().id}'
os.kill(self.pid, signal.SIGKILL)
self.pid = None
self.save()
# Event.dispatch('PROC_UPDATED', {'process_id': self.id})
@property
def is_pending(self):
return (self.pid is None) and (self.returncode is None)
def kill(self):
if self.pid and self.returncode is None:
os.kill(self.pid, signal.SIGKILL)
self.pid = None
self.save()
@property
def is_running(self):
return (self.pid is not None) and (self.returncode is None)
@property
def is_failed(self):
return self.returncode not in (None, 0)
@property
def is_succeeded(self):
return self.returncode == 0
# @property
# def is_idle(self):
# if not self.actor_type:
# raise Exception(f'Process {self.id} has no actor_type set, can only introspect active events if Process.actor_type is set to the Actor its running')
# return self.active_event is None
return self.pid is not None and self.returncode is None