fix orchestrator statemachine and Process from archiveresult migrations

This commit is contained in:
Nick Sweeting
2026-01-01 16:43:02 -08:00
parent 876feac522
commit 60422adc87
13 changed files with 378 additions and 96 deletions

View File

@@ -175,8 +175,50 @@ class Orchestrator:
"""Spawn a new worker process. Returns PID or None if spawn failed."""
try:
pid = WorkerClass.start(daemon=False)
# Worker spawning is logged by the worker itself in on_startup()
return pid
# CRITICAL: Block until worker registers itself in Process table
# This prevents race condition where orchestrator spawns multiple workers
# before any of them finish on_startup() and register
from archivebox.machine.models import Process
import time
timeout = 5.0 # seconds to wait for worker registration
poll_interval = 0.1 # check every 100ms
elapsed = 0.0
spawn_time = timezone.now()
while elapsed < timeout:
# Check if worker process is registered with strict criteria:
# 1. Correct PID
# 2. WORKER process type
# 3. RUNNING status
# 4. Parent is this orchestrator
# 5. Started recently (within last 10 seconds)
worker_process = Process.objects.filter(
pid=pid,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
parent_id=self.db_process.id,
started_at__gte=spawn_time - timedelta(seconds=10),
).first()
if worker_process:
# Worker successfully registered!
return pid
time.sleep(poll_interval)
elapsed += poll_interval
# Timeout - worker failed to register
log_worker_event(
worker_type='Orchestrator',
event='Worker failed to register in time',
indent_level=0,
pid=self.pid,
metadata={'worker_type': WorkerClass.name, 'worker_pid': pid, 'timeout': timeout},
)
return None
except Exception as e:
log_worker_event(
worker_type='Orchestrator',
@@ -266,48 +308,75 @@ class Orchestrator:
def runloop(self) -> None:
"""Main orchestrator loop."""
from rich.progress import Progress, BarColumn, TextColumn, TaskProgressColumn
from archivebox.misc.logging import IS_TTY
import archivebox.misc.logging as logging_module
from archivebox.misc.logging import IS_TTY, CONSOLE
import sys
import os
# Enable progress bars only in TTY + foreground mode
show_progress = IS_TTY and self.exit_on_idle
# Save original consoles
original_console = logging_module.CONSOLE
original_stderr = logging_module.STDERR
# Debug
print(f"[yellow]DEBUG: IS_TTY={IS_TTY}, exit_on_idle={self.exit_on_idle}, show_progress={show_progress}[/yellow]")
# Create Progress with the console it will control
progress = Progress(
TextColumn("[cyan]{task.description}"),
BarColumn(bar_width=40),
TaskProgressColumn(),
transient=False,
console=original_console, # Use the original console
) if show_progress else None
self.on_startup()
task_ids = {}
task_ids = {} # snapshot_id -> task_id
if not show_progress:
# No progress bars - just run normally
self._run_orchestrator_loop(None, task_ids, None, None)
else:
# Redirect worker subprocess output to /dev/null
devnull_fd = os.open(os.devnull, os.O_WRONLY)
# Wrapper to convert console.print() to console.log() for Rich Progress
class ConsoleLogWrapper:
def __init__(self, console):
self._console = console
def print(self, *args, **kwargs):
# Use log() instead of print() to work with Live display
self._console.log(*args)
def __getattr__(self, name):
return getattr(self._console, name)
# Save original stdout/stderr (make 2 copies - one for Console, one for restoring)
original_stdout = sys.stdout.fileno()
original_stderr = sys.stderr.fileno()
stdout_for_console = os.dup(original_stdout)
stdout_for_restore = os.dup(original_stdout)
stderr_for_restore = os.dup(original_stderr)
try:
# Redirect stdout/stderr to /dev/null (workers will inherit this)
os.dup2(devnull_fd, original_stdout)
os.dup2(devnull_fd, original_stderr)
# Create Console using saved stdout (not the redirected one)
from rich.console import Console
import archivebox.misc.logging as logging_module
orchestrator_console = Console(file=os.fdopen(stdout_for_console, 'w'), force_terminal=True)
# Update global CONSOLE so orchestrator logs appear too
original_console = logging_module.CONSOLE
logging_module.CONSOLE = orchestrator_console
# Now create Progress and run loop (DON'T restore stdout/stderr - workers need /dev/null)
with Progress(
TextColumn("[cyan]{task.description}"),
BarColumn(bar_width=40),
TaskProgressColumn(),
console=orchestrator_console,
) as progress:
self._run_orchestrator_loop(progress, task_ids, None, None)
# Restore original console
logging_module.CONSOLE = original_console
finally:
# Restore stdout/stderr
os.dup2(stdout_for_restore, original_stdout)
os.dup2(stderr_for_restore, original_stderr)
# Cleanup
try:
os.close(devnull_fd)
os.close(stdout_for_restore)
os.close(stderr_for_restore)
except:
pass
# stdout_for_console is closed by orchestrator_console
def _run_orchestrator_loop(self, progress, task_ids, read_fd, console):
"""Run the main orchestrator loop with optional progress display."""
try:
if progress:
progress.start()
# Wrap progress.console so print() calls become log() calls
wrapped_console = ConsoleLogWrapper(progress.console)
logging_module.CONSOLE = wrapped_console
logging_module.STDERR = wrapped_console
# Call on_startup AFTER redirecting consoles
self.on_startup()
while True:
# Check queues and spawn workers
queue_sizes = self.check_queues_and_spawn_workers()
@@ -333,12 +402,33 @@ class Orchestrator:
status__in=['succeeded', 'skipped', 'failed']
).count()
# Find currently running hook (ordered by hook_name to get lowest step number)
current_ar = snapshot.archiveresult_set.filter(status='started').order_by('hook_name').first()
if not current_ar:
# If nothing running, show next queued item (ordered to get next in sequence)
current_ar = snapshot.archiveresult_set.filter(status='queued').order_by('hook_name').first()
current_plugin = ''
if current_ar:
# Use hook_name if available, otherwise plugin name
hook_name = current_ar.hook_name or current_ar.plugin or ''
# Extract just the hook name without path (e.g., "on_Snapshot__50_wget.py" -> "wget")
if hook_name:
# Clean up the name: remove prefix and extension
clean_name = hook_name.split('__')[-1] if '__' in hook_name else hook_name
clean_name = clean_name.replace('.py', '').replace('.sh', '').replace('.bg', '')
current_plugin = f"{clean_name}"
# Build description with URL + current plugin
url = snapshot.url[:50] + '...' if len(snapshot.url) > 50 else snapshot.url
description = f"{url}{current_plugin}"
# Create or update task
if snapshot.id not in task_ids:
url = snapshot.url[:60] + '...' if len(snapshot.url) > 60 else snapshot.url
task_ids[snapshot.id] = progress.add_task(url, total=total, completed=completed)
task_ids[snapshot.id] = progress.add_task(description, total=total, completed=completed)
else:
progress.update(task_ids[snapshot.id], completed=completed)
# Update both progress and description
progress.update(task_ids[snapshot.id], description=description, completed=completed)
# Remove tasks for snapshots that are no longer active
for snapshot_id in list(task_ids.keys()):
@@ -373,12 +463,6 @@ class Orchestrator:
raise
else:
self.on_shutdown()
finally:
if progress:
# Restore original consoles
logging_module.CONSOLE = original_console
logging_module.STDERR = original_stderr
progress.stop()
def start(self) -> int:
"""