From 3da523fc748112eadba7477c88c322d3a53f1cf0 Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Fri, 2 Jan 2026 04:27:38 -0800 Subject: [PATCH] more consistent crawl, snapshot, hook cleanup and Process tracking --- archivebox/core/models.py | 42 +++++++++--- archivebox/misc/progress_layout.py | 102 +++++++++++++++++++++++++++-- archivebox/workers/orchestrator.py | 11 ++++ archivebox/workers/worker.py | 41 ++++-------- 4 files changed, 151 insertions(+), 45 deletions(-) diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 3de5b4f8..1888922e 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -1413,27 +1413,49 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea def cleanup(self): """ - Clean up background ArchiveResult hooks. + Clean up background ArchiveResult hooks and empty results. Called by the state machine when entering the 'sealed' state. - Kills any background hooks and finalizes their ArchiveResults. + Uses Process records to kill background hooks, then deletes empty ArchiveResults. """ - from archivebox.misc.process_utils import safe_kill_process + from archivebox.machine.models import Process - # Kill any background ArchiveResult hooks - if not self.OUTPUT_DIR.exists(): - return + # Kill any background ArchiveResult hooks using Process records + # Find all running hook Processes linked to this snapshot's ArchiveResults + running_hooks = Process.objects.filter( + archiveresult__snapshot=self, + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + ).distinct() - # Find all .pid files in this snapshot's output directory - for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): - cmd_file = pid_file.parent / 'cmd.sh' - safe_kill_process(pid_file, cmd_file) + for process in running_hooks: + # Use Process.kill_tree() to gracefully kill parent + children + killed_count = process.kill_tree(graceful_timeout=2.0) + if killed_count > 0: + print(f'[yellow]🔪 Killed {killed_count} process(es) for hook {process.pid}[/yellow]') + + # Clean up .pid files from output directory + if self.OUTPUT_DIR.exists(): + for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): + pid_file.unlink(missing_ok=True) # Update all STARTED ArchiveResults from filesystem results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED) for ar in results: ar.update_from_output() + # Delete ArchiveResults that produced no output files + empty_ars = self.archiveresult_set.filter( + output_files={} # No output files + ).filter( + status__in=ArchiveResult.FINAL_STATES # Only delete finished ones + ) + + deleted_count = empty_ars.count() + if deleted_count > 0: + empty_ars.delete() + print(f'[yellow]🗑️ Deleted {deleted_count} empty ArchiveResults for {self.url}[/yellow]') + def has_running_background_hooks(self) -> bool: """ Check if any ArchiveResult background hooks are still running. diff --git a/archivebox/misc/progress_layout.py b/archivebox/misc/progress_layout.py index f386cd13..fc4001d8 100644 --- a/archivebox/misc/progress_layout.py +++ b/archivebox/misc/progress_layout.py @@ -163,10 +163,89 @@ class SnapshotWorkerPanel: self.recent_logs.append((message, style)) +class CrawlWorkerLogPanel: + """Display CrawlWorker logs by tailing stdout/stderr from Process.""" + + def __init__(self, max_lines: int = 8): + self.log_lines: deque = deque(maxlen=max_lines * 2) # Allow more buffer + self.max_lines = max_lines + self.last_stdout_pos = 0 # Track file position for efficient tailing + self.last_stderr_pos = 0 + + def update_from_process(self, process: Any): + """Update logs by tailing the Process stdout/stderr files.""" + from pathlib import Path + + if not process: + return + + # Read new stdout lines since last read + try: + stdout_path = Path(process.stdout) + if stdout_path.exists(): + with open(stdout_path, 'r') as f: + # Seek to last read position + f.seek(self.last_stdout_pos) + new_lines = f.readlines() + + # Update position + self.last_stdout_pos = f.tell() + + # Add new lines (up to max_lines to avoid overflow) + for line in new_lines[-self.max_lines:]: + line = line.rstrip('\n') + if line and not line.startswith('['): # Skip Rich markup lines + self.log_lines.append(('stdout', line)) + except Exception: + pass + + # Read new stderr lines since last read + try: + stderr_path = Path(process.stderr) + if stderr_path.exists(): + with open(stderr_path, 'r') as f: + f.seek(self.last_stderr_pos) + new_lines = f.readlines() + + self.last_stderr_pos = f.tell() + + for line in new_lines[-self.max_lines:]: + line = line.rstrip('\n') + if line and not line.startswith('['): # Skip Rich markup lines + self.log_lines.append(('stderr', line)) + except Exception: + pass + + def __rich__(self) -> Panel: + if not self.log_lines: + content = Text("No CrawlWorker logs yet", style="grey53", justify="center") + else: + # Get the last max_lines for display + display_lines = list(self.log_lines)[-self.max_lines:] + lines = [] + for stream, message in display_lines: + line = Text() + # Color code by stream - stderr is usually debug output + if stream == 'stderr': + # Rich formatted logs from stderr + line.append(message, style="cyan") + else: + line.append(message, style="white") + lines.append(line) + content = Group(*lines) + + return Panel( + content, + title="[bold cyan]CrawlWorker Logs (stdout/stderr)", + border_style="cyan", + box=box.ROUNDED, + ) + + class OrchestratorLogPanel: """Display orchestrator and system logs.""" - def __init__(self, max_events: int = 15): + def __init__(self, max_events: int = 8): self.events: deque = deque(maxlen=max_events) self.max_events = max_events @@ -192,7 +271,6 @@ class OrchestratorLogPanel: title="[bold white]Orchestrator / Daphne Logs", border_style="white", box=box.ROUNDED, - height=12, ) @@ -211,6 +289,8 @@ class ArchiveBoxProgressLayout: │ Stats + │ Stats + │ Stats + │ Stats + │ │ Logs │ Logs │ Logs │ Logs │ ├───────────────┴───────────────┴───────────────┴─────────────┤ + │ CrawlWorker Logs (stdout/stderr) │ + ├─────────────────────────────────────────────────────────────┤ │ Orchestrator / Daphne Logs │ └─────────────────────────────────────────────────────────────┘ """ @@ -226,7 +306,8 @@ class ArchiveBoxProgressLayout: # Create 4 worker panels self.worker_panels = [SnapshotWorkerPanel(i + 1) for i in range(MAX_WORKER_COLUMNS)] - self.orchestrator_log = OrchestratorLogPanel(max_events=12) + self.crawl_worker_log = CrawlWorkerLogPanel(max_lines=8) + self.orchestrator_log = OrchestratorLogPanel(max_events=8) # Create layout self.layout = self._make_layout() @@ -242,7 +323,7 @@ class ArchiveBoxProgressLayout: layout.split( Layout(name="crawl_queue", size=3), Layout(name="workers", ratio=1), - Layout(name="logs", size=13), + Layout(name="logs", size=20), ) # Split workers into 4 columns @@ -253,13 +334,20 @@ class ArchiveBoxProgressLayout: Layout(name="worker4"), ) + # Split logs into crawl_worker_logs and orchestrator_logs + layout["logs"].split( + Layout(name="crawl_worker_logs", size=10), + Layout(name="orchestrator_logs", size=10), + ) + # Assign components to layout sections layout["crawl_queue"].update(self.crawl_queue) layout["worker1"].update(self.worker_panels[0]) layout["worker2"].update(self.worker_panels[1]) layout["worker3"].update(self.worker_panels[2]) layout["worker4"].update(self.worker_panels[3]) - layout["logs"].update(self.orchestrator_log) + layout["crawl_worker_logs"].update(self.crawl_worker_log) + layout["orchestrator_logs"].update(self.orchestrator_log) return layout @@ -340,6 +428,10 @@ class ArchiveBoxProgressLayout: """Add an event to the orchestrator log.""" self.orchestrator_log.add_event(message, style) + def update_crawl_worker_logs(self, process: Any): + """Update CrawlWorker logs by tailing the Process stdout/stderr files.""" + self.crawl_worker_log.update_from_process(process) + def get_layout(self) -> Layout: """Get the Rich Layout object for rendering.""" return self.layout diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index 7c7b4d0b..1197aa4c 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -497,6 +497,17 @@ class Orchestrator: max_crawl_workers=self.MAX_CRAWL_WORKERS, ) + # Update CrawlWorker logs by tailing Process stdout/stderr + if crawl_workers_count > 0: + from archivebox.machine.models import Process + crawl_worker_process = Process.objects.filter( + process_type=Process.TypeChoices.WORKER, + worker_type='crawl', + status__in=['running', 'started'] + ).first() + if crawl_worker_process: + progress_layout.update_crawl_worker_logs(crawl_worker_process) + # Log queue size changes if queue_sizes != last_queue_sizes: for worker_type, count in queue_sizes.items(): diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index 439bdda4..633bec66 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -513,16 +513,15 @@ class SnapshotWorker(Worker): return Snapshot def on_startup(self) -> None: - """Load snapshot and mark as STARTED.""" + """Load snapshot and mark as STARTED using state machine.""" super().on_startup() from archivebox.core.models import Snapshot self.snapshot = Snapshot.objects.get(id=self.snapshot_id) - # Mark snapshot as STARTED - self.snapshot.status = Snapshot.StatusChoices.STARTED - self.snapshot.retry_at = None # No more polling needed - self.snapshot.save(update_fields=['status', 'retry_at', 'modified_at']) + # Use state machine to transition queued -> started (triggers enter_started()) + self.snapshot.sm.tick() + self.snapshot.refresh_from_db() def runloop(self) -> None: """Execute all hooks sequentially.""" @@ -587,15 +586,15 @@ class SnapshotWorker(Worker): # Check if we can advance to next step self._try_advance_step() - # All hooks launched (or completed) - cleanup and seal - self._cleanup_empty_archiveresults() - self.snapshot.status = Snapshot.StatusChoices.SEALED - self.snapshot.save(update_fields=['status', 'modified_at']) + # All hooks launched (or completed) - seal using state machine + # This triggers enter_sealed() which calls cleanup() and checks parent crawl sealing + self.snapshot.sm.seal() + self.snapshot.refresh_from_db() except Exception as e: - # Mark snapshot as failed - self.snapshot.status = Snapshot.StatusChoices.SEALED # Still seal on error - self.snapshot.save(update_fields=['status', 'modified_at']) + # Mark snapshot as sealed even on error (still triggers cleanup) + self.snapshot.sm.seal() + self.snapshot.refresh_from_db() raise finally: self.on_shutdown() @@ -676,24 +675,6 @@ class SnapshotWorker(Worker): pid=self.pid, ) - def _cleanup_empty_archiveresults(self) -> None: - """Delete ArchiveResults that produced no output files.""" - empty_ars = self.snapshot.archiveresult_set.filter( - output_files={} # No output files - ).filter( - status__in=self.snapshot.archiveresult_set.model.FINAL_STATES # Only delete finished ones - ) - - deleted_count = empty_ars.count() - if deleted_count > 0: - empty_ars.delete() - log_worker_event( - worker_type='SnapshotWorker', - event=f'Deleted {deleted_count} empty ArchiveResults', - indent_level=2, - pid=self.pid, - ) - def on_shutdown(self, error: BaseException | None = None) -> None: """ Terminate all background Snapshot hooks when snapshot finishes.