mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-05 15:27:53 +10:00
much better tests and add page ui
This commit is contained in:
@@ -433,6 +433,190 @@ class Binary(ModelWithHealthStats):
|
||||
kill_process(pid_file)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Process Model
|
||||
# =============================================================================
|
||||
|
||||
class ProcessManager(models.Manager):
|
||||
"""Manager for Process model."""
|
||||
|
||||
def create_for_archiveresult(self, archiveresult, **kwargs):
|
||||
"""
|
||||
Create a Process record for an ArchiveResult.
|
||||
|
||||
Called during migration and when creating new ArchiveResults.
|
||||
"""
|
||||
# Defaults from ArchiveResult if not provided
|
||||
defaults = {
|
||||
'machine': Machine.current(),
|
||||
'pwd': kwargs.get('pwd') or str(archiveresult.snapshot.output_dir / archiveresult.plugin),
|
||||
'cmd': kwargs.get('cmd') or [],
|
||||
'status': 'queued',
|
||||
'timeout': kwargs.get('timeout', 120),
|
||||
'env': kwargs.get('env', {}),
|
||||
}
|
||||
defaults.update(kwargs)
|
||||
|
||||
process = self.create(**defaults)
|
||||
return process
|
||||
|
||||
|
||||
class Process(ModelWithHealthStats):
|
||||
"""
|
||||
Tracks a single OS process execution.
|
||||
|
||||
Process represents the actual subprocess spawned to execute a hook.
|
||||
One Process can optionally be associated with an ArchiveResult (via OneToOne),
|
||||
but Process can also exist standalone for internal operations.
|
||||
|
||||
Follows the unified state machine pattern:
|
||||
- queued: Process ready to launch
|
||||
- running: Process actively executing
|
||||
- exited: Process completed (check exit_code for success/failure)
|
||||
|
||||
State machine calls launch() to spawn the process and monitors its lifecycle.
|
||||
"""
|
||||
|
||||
class StatusChoices(models.TextChoices):
|
||||
QUEUED = 'queued', 'Queued'
|
||||
RUNNING = 'running', 'Running'
|
||||
EXITED = 'exited', 'Exited'
|
||||
|
||||
# Primary fields
|
||||
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
|
||||
created_at = models.DateTimeField(default=timezone.now, db_index=True)
|
||||
modified_at = models.DateTimeField(auto_now=True)
|
||||
|
||||
# Machine FK - required (every process runs on a machine)
|
||||
machine = models.ForeignKey(
|
||||
Machine,
|
||||
on_delete=models.CASCADE,
|
||||
null=False,
|
||||
related_name='processes',
|
||||
help_text='Machine where this process executed'
|
||||
)
|
||||
|
||||
# Execution metadata
|
||||
pwd = models.CharField(max_length=512, default='', null=False, blank=True,
|
||||
help_text='Working directory for process execution')
|
||||
cmd = models.JSONField(default=list, null=False, blank=True,
|
||||
help_text='Command as array of arguments')
|
||||
env = models.JSONField(default=dict, null=False, blank=True,
|
||||
help_text='Environment variables for process')
|
||||
timeout = models.IntegerField(default=120, null=False,
|
||||
help_text='Timeout in seconds')
|
||||
|
||||
# Process results
|
||||
pid = models.IntegerField(default=None, null=True, blank=True,
|
||||
help_text='OS process ID')
|
||||
exit_code = models.IntegerField(default=None, null=True, blank=True,
|
||||
help_text='Process exit code (0 = success)')
|
||||
stdout = models.TextField(default='', null=False, blank=True,
|
||||
help_text='Standard output from process')
|
||||
stderr = models.TextField(default='', null=False, blank=True,
|
||||
help_text='Standard error from process')
|
||||
|
||||
# Timing
|
||||
started_at = models.DateTimeField(default=None, null=True, blank=True,
|
||||
help_text='When process was launched')
|
||||
ended_at = models.DateTimeField(default=None, null=True, blank=True,
|
||||
help_text='When process completed/terminated')
|
||||
|
||||
# Optional FKs
|
||||
binary = models.ForeignKey(
|
||||
Binary,
|
||||
on_delete=models.SET_NULL,
|
||||
null=True, blank=True,
|
||||
related_name='processes',
|
||||
help_text='Binary used by this process'
|
||||
)
|
||||
iface = models.ForeignKey(
|
||||
NetworkInterface,
|
||||
on_delete=models.SET_NULL,
|
||||
null=True, blank=True,
|
||||
related_name='processes',
|
||||
help_text='Network interface used by this process'
|
||||
)
|
||||
|
||||
# Optional connection URL (for CDP, sonic, etc.)
|
||||
url = models.URLField(max_length=2048, default=None, null=True, blank=True,
|
||||
help_text='Connection URL (CDP endpoint, sonic server, etc.)')
|
||||
|
||||
# Reverse relation to ArchiveResult (OneToOne from AR side)
|
||||
# archiveresult: OneToOneField defined on ArchiveResult model
|
||||
|
||||
# State machine fields
|
||||
status = models.CharField(
|
||||
max_length=16,
|
||||
choices=StatusChoices.choices,
|
||||
default=StatusChoices.QUEUED,
|
||||
db_index=True
|
||||
)
|
||||
retry_at = models.DateTimeField(
|
||||
default=timezone.now,
|
||||
null=True, blank=True,
|
||||
db_index=True,
|
||||
help_text='When to retry this process'
|
||||
)
|
||||
|
||||
# Health stats
|
||||
num_uses_failed = models.PositiveIntegerField(default=0)
|
||||
num_uses_succeeded = models.PositiveIntegerField(default=0)
|
||||
|
||||
state_machine_name: str = 'archivebox.machine.models.ProcessMachine'
|
||||
|
||||
objects: ProcessManager = ProcessManager()
|
||||
|
||||
class Meta:
|
||||
app_label = 'machine'
|
||||
verbose_name = 'Process'
|
||||
verbose_name_plural = 'Processes'
|
||||
indexes = [
|
||||
models.Index(fields=['machine', 'status', 'retry_at']),
|
||||
models.Index(fields=['binary', 'exit_code']),
|
||||
]
|
||||
|
||||
def __str__(self) -> str:
|
||||
cmd_str = ' '.join(self.cmd[:3]) if self.cmd else '(no cmd)'
|
||||
return f'Process[{self.id}] {cmd_str} ({self.status})'
|
||||
|
||||
# Properties that delegate to related objects
|
||||
@property
|
||||
def cmd_version(self) -> str:
|
||||
"""Get version from associated binary."""
|
||||
return self.binary.version if self.binary else ''
|
||||
|
||||
@property
|
||||
def bin_abspath(self) -> str:
|
||||
"""Get absolute path from associated binary."""
|
||||
return self.binary.abspath if self.binary else ''
|
||||
|
||||
@property
|
||||
def plugin(self) -> str:
|
||||
"""Get plugin name from associated ArchiveResult (if any)."""
|
||||
if hasattr(self, 'archiveresult'):
|
||||
# Inline import to avoid circular dependency
|
||||
return self.archiveresult.plugin
|
||||
return ''
|
||||
|
||||
@property
|
||||
def hook_name(self) -> str:
|
||||
"""Get hook name from associated ArchiveResult (if any)."""
|
||||
if hasattr(self, 'archiveresult'):
|
||||
return self.archiveresult.hook_name
|
||||
return ''
|
||||
|
||||
def update_and_requeue(self, **kwargs):
|
||||
"""
|
||||
Update process fields and requeue for worker state machine.
|
||||
Sets modified_at to ensure workers pick up changes.
|
||||
"""
|
||||
for key, value in kwargs.items():
|
||||
setattr(self, key, value)
|
||||
self.modified_at = timezone.now()
|
||||
self.save()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Binary State Machine
|
||||
# =============================================================================
|
||||
@@ -550,11 +734,119 @@ class BinaryMachine(BaseStateMachine, strict_states=True):
|
||||
self.binary.increment_health_stats(success=False)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Process State Machine
|
||||
# =============================================================================
|
||||
|
||||
class ProcessMachine(BaseStateMachine, strict_states=True):
|
||||
"""
|
||||
State machine for managing Process (OS subprocess) lifecycle.
|
||||
|
||||
Process Lifecycle:
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ QUEUED State │
|
||||
│ • Process ready to launch, waiting for resources │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
↓ tick() when can_start()
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ RUNNING State → enter_running() │
|
||||
│ 1. process.launch() │
|
||||
│ • Spawn subprocess with cmd, pwd, env, timeout │
|
||||
│ • Set pid, started_at │
|
||||
│ • Process runs in background or foreground │
|
||||
│ 2. Monitor process completion │
|
||||
│ • Check exit code when process completes │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
↓ tick() checks is_exited()
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ EXITED State │
|
||||
│ • Process completed (exit_code set) │
|
||||
│ • Health stats incremented │
|
||||
│ • stdout/stderr captured │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
|
||||
Note: This is a simpler state machine than ArchiveResult.
|
||||
Process is just about execution lifecycle. ArchiveResult handles
|
||||
the archival-specific logic (status, output parsing, etc.).
|
||||
"""
|
||||
|
||||
model_attr_name = 'process'
|
||||
|
||||
# States
|
||||
queued = State(value=Process.StatusChoices.QUEUED, initial=True)
|
||||
running = State(value=Process.StatusChoices.RUNNING)
|
||||
exited = State(value=Process.StatusChoices.EXITED, final=True)
|
||||
|
||||
# Tick Event - transitions based on conditions
|
||||
tick = (
|
||||
queued.to.itself(unless='can_start') |
|
||||
queued.to(running, cond='can_start') |
|
||||
running.to.itself(unless='is_exited') |
|
||||
running.to(exited, cond='is_exited')
|
||||
)
|
||||
|
||||
# Additional events (for explicit control)
|
||||
launch = queued.to(running)
|
||||
kill = running.to(exited)
|
||||
|
||||
def can_start(self) -> bool:
|
||||
"""Check if process can start (has cmd and machine)."""
|
||||
return bool(self.process.cmd and self.process.machine)
|
||||
|
||||
def is_exited(self) -> bool:
|
||||
"""Check if process has exited (exit_code is set)."""
|
||||
return self.process.exit_code is not None
|
||||
|
||||
@queued.enter
|
||||
def enter_queued(self):
|
||||
"""Process is queued for execution."""
|
||||
self.process.update_and_requeue(
|
||||
retry_at=timezone.now(),
|
||||
status=Process.StatusChoices.QUEUED,
|
||||
)
|
||||
|
||||
@running.enter
|
||||
def enter_running(self):
|
||||
"""Start process execution."""
|
||||
# Lock the process while it runs
|
||||
self.process.update_and_requeue(
|
||||
retry_at=timezone.now() + timedelta(seconds=self.process.timeout),
|
||||
status=Process.StatusChoices.RUNNING,
|
||||
started_at=timezone.now(),
|
||||
)
|
||||
|
||||
# Launch the subprocess
|
||||
# NOTE: This is a placeholder - actual launch logic would
|
||||
# be implemented based on how hooks currently spawn processes
|
||||
# For now, Process is a data model that tracks execution metadata
|
||||
# The actual subprocess spawning is still handled by run_hook()
|
||||
|
||||
# Mark as immediately exited for now (until we refactor run_hook)
|
||||
# In the future, this would actually spawn the subprocess
|
||||
self.process.exit_code = 0 # Placeholder
|
||||
self.process.save()
|
||||
|
||||
@exited.enter
|
||||
def enter_exited(self):
|
||||
"""Process has exited."""
|
||||
success = self.process.exit_code == 0
|
||||
|
||||
self.process.update_and_requeue(
|
||||
retry_at=None,
|
||||
status=Process.StatusChoices.EXITED,
|
||||
ended_at=timezone.now(),
|
||||
)
|
||||
|
||||
# Increment health stats based on exit code
|
||||
self.process.increment_health_stats(success=success)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# State Machine Registration
|
||||
# =============================================================================
|
||||
|
||||
# Manually register state machines with python-statemachine registry
|
||||
registry.register(BinaryMachine)
|
||||
registry.register(ProcessMachine)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user