more consistent crawl, snapshot, hook cleanup and Process tracking

This commit is contained in:
Nick Sweeting
2026-01-02 04:27:38 -08:00
parent dd77511026
commit 3da523fc74
4 changed files with 151 additions and 45 deletions

View File

@@ -1413,27 +1413,49 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
def cleanup(self):
"""
Clean up background ArchiveResult hooks.
Clean up background ArchiveResult hooks and empty results.
Called by the state machine when entering the 'sealed' state.
Kills any background hooks and finalizes their ArchiveResults.
Uses Process records to kill background hooks, then deletes empty ArchiveResults.
"""
from archivebox.misc.process_utils import safe_kill_process
from archivebox.machine.models import Process
# Kill any background ArchiveResult hooks
if not self.OUTPUT_DIR.exists():
return
# Kill any background ArchiveResult hooks using Process records
# Find all running hook Processes linked to this snapshot's ArchiveResults
running_hooks = Process.objects.filter(
archiveresult__snapshot=self,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
).distinct()
# Find all .pid files in this snapshot's output directory
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
cmd_file = pid_file.parent / 'cmd.sh'
safe_kill_process(pid_file, cmd_file)
for process in running_hooks:
# Use Process.kill_tree() to gracefully kill parent + children
killed_count = process.kill_tree(graceful_timeout=2.0)
if killed_count > 0:
print(f'[yellow]🔪 Killed {killed_count} process(es) for hook {process.pid}[/yellow]')
# Clean up .pid files from output directory
if self.OUTPUT_DIR.exists():
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
pid_file.unlink(missing_ok=True)
# Update all STARTED ArchiveResults from filesystem
results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED)
for ar in results:
ar.update_from_output()
# Delete ArchiveResults that produced no output files
empty_ars = self.archiveresult_set.filter(
output_files={} # No output files
).filter(
status__in=ArchiveResult.FINAL_STATES # Only delete finished ones
)
deleted_count = empty_ars.count()
if deleted_count > 0:
empty_ars.delete()
print(f'[yellow]🗑️ Deleted {deleted_count} empty ArchiveResults for {self.url}[/yellow]')
def has_running_background_hooks(self) -> bool:
"""
Check if any ArchiveResult background hooks are still running.

View File

@@ -163,10 +163,89 @@ class SnapshotWorkerPanel:
self.recent_logs.append((message, style))
class CrawlWorkerLogPanel:
"""Display CrawlWorker logs by tailing stdout/stderr from Process."""
def __init__(self, max_lines: int = 8):
self.log_lines: deque = deque(maxlen=max_lines * 2) # Allow more buffer
self.max_lines = max_lines
self.last_stdout_pos = 0 # Track file position for efficient tailing
self.last_stderr_pos = 0
def update_from_process(self, process: Any):
"""Update logs by tailing the Process stdout/stderr files."""
from pathlib import Path
if not process:
return
# Read new stdout lines since last read
try:
stdout_path = Path(process.stdout)
if stdout_path.exists():
with open(stdout_path, 'r') as f:
# Seek to last read position
f.seek(self.last_stdout_pos)
new_lines = f.readlines()
# Update position
self.last_stdout_pos = f.tell()
# Add new lines (up to max_lines to avoid overflow)
for line in new_lines[-self.max_lines:]:
line = line.rstrip('\n')
if line and not line.startswith('['): # Skip Rich markup lines
self.log_lines.append(('stdout', line))
except Exception:
pass
# Read new stderr lines since last read
try:
stderr_path = Path(process.stderr)
if stderr_path.exists():
with open(stderr_path, 'r') as f:
f.seek(self.last_stderr_pos)
new_lines = f.readlines()
self.last_stderr_pos = f.tell()
for line in new_lines[-self.max_lines:]:
line = line.rstrip('\n')
if line and not line.startswith('['): # Skip Rich markup lines
self.log_lines.append(('stderr', line))
except Exception:
pass
def __rich__(self) -> Panel:
if not self.log_lines:
content = Text("No CrawlWorker logs yet", style="grey53", justify="center")
else:
# Get the last max_lines for display
display_lines = list(self.log_lines)[-self.max_lines:]
lines = []
for stream, message in display_lines:
line = Text()
# Color code by stream - stderr is usually debug output
if stream == 'stderr':
# Rich formatted logs from stderr
line.append(message, style="cyan")
else:
line.append(message, style="white")
lines.append(line)
content = Group(*lines)
return Panel(
content,
title="[bold cyan]CrawlWorker Logs (stdout/stderr)",
border_style="cyan",
box=box.ROUNDED,
)
class OrchestratorLogPanel:
"""Display orchestrator and system logs."""
def __init__(self, max_events: int = 15):
def __init__(self, max_events: int = 8):
self.events: deque = deque(maxlen=max_events)
self.max_events = max_events
@@ -192,7 +271,6 @@ class OrchestratorLogPanel:
title="[bold white]Orchestrator / Daphne Logs",
border_style="white",
box=box.ROUNDED,
height=12,
)
@@ -211,6 +289,8 @@ class ArchiveBoxProgressLayout:
│ Stats + │ Stats + │ Stats + │ Stats + │
│ Logs │ Logs │ Logs │ Logs │
├───────────────┴───────────────┴───────────────┴─────────────┤
│ CrawlWorker Logs (stdout/stderr) │
├─────────────────────────────────────────────────────────────┤
│ Orchestrator / Daphne Logs │
└─────────────────────────────────────────────────────────────┘
"""
@@ -226,7 +306,8 @@ class ArchiveBoxProgressLayout:
# Create 4 worker panels
self.worker_panels = [SnapshotWorkerPanel(i + 1) for i in range(MAX_WORKER_COLUMNS)]
self.orchestrator_log = OrchestratorLogPanel(max_events=12)
self.crawl_worker_log = CrawlWorkerLogPanel(max_lines=8)
self.orchestrator_log = OrchestratorLogPanel(max_events=8)
# Create layout
self.layout = self._make_layout()
@@ -242,7 +323,7 @@ class ArchiveBoxProgressLayout:
layout.split(
Layout(name="crawl_queue", size=3),
Layout(name="workers", ratio=1),
Layout(name="logs", size=13),
Layout(name="logs", size=20),
)
# Split workers into 4 columns
@@ -253,13 +334,20 @@ class ArchiveBoxProgressLayout:
Layout(name="worker4"),
)
# Split logs into crawl_worker_logs and orchestrator_logs
layout["logs"].split(
Layout(name="crawl_worker_logs", size=10),
Layout(name="orchestrator_logs", size=10),
)
# Assign components to layout sections
layout["crawl_queue"].update(self.crawl_queue)
layout["worker1"].update(self.worker_panels[0])
layout["worker2"].update(self.worker_panels[1])
layout["worker3"].update(self.worker_panels[2])
layout["worker4"].update(self.worker_panels[3])
layout["logs"].update(self.orchestrator_log)
layout["crawl_worker_logs"].update(self.crawl_worker_log)
layout["orchestrator_logs"].update(self.orchestrator_log)
return layout
@@ -340,6 +428,10 @@ class ArchiveBoxProgressLayout:
"""Add an event to the orchestrator log."""
self.orchestrator_log.add_event(message, style)
def update_crawl_worker_logs(self, process: Any):
"""Update CrawlWorker logs by tailing the Process stdout/stderr files."""
self.crawl_worker_log.update_from_process(process)
def get_layout(self) -> Layout:
"""Get the Rich Layout object for rendering."""
return self.layout

View File

@@ -497,6 +497,17 @@ class Orchestrator:
max_crawl_workers=self.MAX_CRAWL_WORKERS,
)
# Update CrawlWorker logs by tailing Process stdout/stderr
if crawl_workers_count > 0:
from archivebox.machine.models import Process
crawl_worker_process = Process.objects.filter(
process_type=Process.TypeChoices.WORKER,
worker_type='crawl',
status__in=['running', 'started']
).first()
if crawl_worker_process:
progress_layout.update_crawl_worker_logs(crawl_worker_process)
# Log queue size changes
if queue_sizes != last_queue_sizes:
for worker_type, count in queue_sizes.items():

View File

@@ -513,16 +513,15 @@ class SnapshotWorker(Worker):
return Snapshot
def on_startup(self) -> None:
"""Load snapshot and mark as STARTED."""
"""Load snapshot and mark as STARTED using state machine."""
super().on_startup()
from archivebox.core.models import Snapshot
self.snapshot = Snapshot.objects.get(id=self.snapshot_id)
# Mark snapshot as STARTED
self.snapshot.status = Snapshot.StatusChoices.STARTED
self.snapshot.retry_at = None # No more polling needed
self.snapshot.save(update_fields=['status', 'retry_at', 'modified_at'])
# Use state machine to transition queued -> started (triggers enter_started())
self.snapshot.sm.tick()
self.snapshot.refresh_from_db()
def runloop(self) -> None:
"""Execute all hooks sequentially."""
@@ -587,15 +586,15 @@ class SnapshotWorker(Worker):
# Check if we can advance to next step
self._try_advance_step()
# All hooks launched (or completed) - cleanup and seal
self._cleanup_empty_archiveresults()
self.snapshot.status = Snapshot.StatusChoices.SEALED
self.snapshot.save(update_fields=['status', 'modified_at'])
# All hooks launched (or completed) - seal using state machine
# This triggers enter_sealed() which calls cleanup() and checks parent crawl sealing
self.snapshot.sm.seal()
self.snapshot.refresh_from_db()
except Exception as e:
# Mark snapshot as failed
self.snapshot.status = Snapshot.StatusChoices.SEALED # Still seal on error
self.snapshot.save(update_fields=['status', 'modified_at'])
# Mark snapshot as sealed even on error (still triggers cleanup)
self.snapshot.sm.seal()
self.snapshot.refresh_from_db()
raise
finally:
self.on_shutdown()
@@ -676,24 +675,6 @@ class SnapshotWorker(Worker):
pid=self.pid,
)
def _cleanup_empty_archiveresults(self) -> None:
"""Delete ArchiveResults that produced no output files."""
empty_ars = self.snapshot.archiveresult_set.filter(
output_files={} # No output files
).filter(
status__in=self.snapshot.archiveresult_set.model.FINAL_STATES # Only delete finished ones
)
deleted_count = empty_ars.count()
if deleted_count > 0:
empty_ars.delete()
log_worker_event(
worker_type='SnapshotWorker',
event=f'Deleted {deleted_count} empty ArchiveResults',
indent_level=2,
pid=self.pid,
)
def on_shutdown(self, error: BaseException | None = None) -> None:
"""
Terminate all background Snapshot hooks when snapshot finishes.