mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-07 03:16:01 +10:00
- Remove pid_utils tests (module deleted in dev) - Update orchestrator tests to use Process model for tracking - Add tests for Process.current(), cleanup_stale_running(), terminate() - Add tests for Process hierarchy (parent/child, root, depth) - Add tests for Process.get_running(), get_running_count() - Add tests for ProcessMachine state machine - Update machine model tests to match current API (from_jsonl vs from_json)
454 lines
16 KiB
Python
454 lines
16 KiB
Python
"""
|
|
Unit tests for the Orchestrator and Worker classes.
|
|
|
|
Tests cover:
|
|
1. Orchestrator lifecycle (startup, shutdown)
|
|
2. Queue polling and worker spawning
|
|
3. Idle detection and exit logic
|
|
4. Worker registration and management
|
|
5. Process model methods (replacing old pid_utils)
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import timedelta
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
from django.test import TestCase
|
|
from django.utils import timezone
|
|
|
|
from archivebox.workers.orchestrator import Orchestrator
|
|
|
|
|
|
class TestOrchestratorUnit(TestCase):
|
|
"""Unit tests for Orchestrator class (mocked dependencies)."""
|
|
|
|
def test_orchestrator_creation(self):
|
|
"""Orchestrator should initialize with correct defaults."""
|
|
orchestrator = Orchestrator(exit_on_idle=True)
|
|
|
|
self.assertTrue(orchestrator.exit_on_idle)
|
|
self.assertEqual(orchestrator.idle_count, 0)
|
|
self.assertIsNone(orchestrator.pid_file)
|
|
|
|
def test_orchestrator_repr(self):
|
|
"""Orchestrator __repr__ should include PID."""
|
|
orchestrator = Orchestrator()
|
|
repr_str = repr(orchestrator)
|
|
|
|
self.assertIn('Orchestrator', repr_str)
|
|
self.assertIn(str(os.getpid()), repr_str)
|
|
|
|
def test_has_pending_work(self):
|
|
"""has_pending_work should check if any queue has items."""
|
|
orchestrator = Orchestrator()
|
|
|
|
self.assertFalse(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 0}))
|
|
self.assertTrue(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 5}))
|
|
self.assertTrue(orchestrator.has_pending_work({'crawl': 10, 'snapshot': 0}))
|
|
|
|
def test_should_exit_not_exit_on_idle(self):
|
|
"""should_exit should return False when exit_on_idle is False."""
|
|
orchestrator = Orchestrator(exit_on_idle=False)
|
|
orchestrator.idle_count = 100
|
|
|
|
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
|
|
|
|
def test_should_exit_pending_work(self):
|
|
"""should_exit should return False when there's pending work."""
|
|
orchestrator = Orchestrator(exit_on_idle=True)
|
|
orchestrator.idle_count = 100
|
|
|
|
self.assertFalse(orchestrator.should_exit({'crawl': 5}))
|
|
|
|
@patch.object(Orchestrator, 'has_running_workers')
|
|
def test_should_exit_running_workers(self, mock_has_workers):
|
|
"""should_exit should return False when workers are running."""
|
|
mock_has_workers.return_value = True
|
|
orchestrator = Orchestrator(exit_on_idle=True)
|
|
orchestrator.idle_count = 100
|
|
|
|
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
|
|
|
|
@patch.object(Orchestrator, 'has_running_workers')
|
|
@patch.object(Orchestrator, 'has_future_work')
|
|
def test_should_exit_idle_timeout(self, mock_future, mock_workers):
|
|
"""should_exit should return True after idle timeout with no work."""
|
|
mock_workers.return_value = False
|
|
mock_future.return_value = False
|
|
|
|
orchestrator = Orchestrator(exit_on_idle=True)
|
|
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT
|
|
|
|
self.assertTrue(orchestrator.should_exit({'crawl': 0, 'snapshot': 0}))
|
|
|
|
@patch.object(Orchestrator, 'has_running_workers')
|
|
@patch.object(Orchestrator, 'has_future_work')
|
|
def test_should_exit_below_idle_timeout(self, mock_future, mock_workers):
|
|
"""should_exit should return False below idle timeout."""
|
|
mock_workers.return_value = False
|
|
mock_future.return_value = False
|
|
|
|
orchestrator = Orchestrator(exit_on_idle=True)
|
|
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT - 1
|
|
|
|
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
|
|
|
|
def test_should_spawn_worker_no_queue(self):
|
|
"""should_spawn_worker should return False when queue is empty."""
|
|
orchestrator = Orchestrator()
|
|
|
|
# Create a mock worker class
|
|
mock_worker = MagicMock()
|
|
mock_worker.get_running_workers.return_value = []
|
|
|
|
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 0))
|
|
|
|
def test_should_spawn_worker_at_limit(self):
|
|
"""should_spawn_worker should return False when at per-type limit."""
|
|
orchestrator = Orchestrator()
|
|
|
|
mock_worker = MagicMock()
|
|
mock_worker.get_running_workers.return_value = [{}] * orchestrator.MAX_WORKERS_PER_TYPE
|
|
|
|
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
|
|
|
|
@patch.object(Orchestrator, 'get_total_worker_count')
|
|
def test_should_spawn_worker_at_total_limit(self, mock_total):
|
|
"""should_spawn_worker should return False when at total limit."""
|
|
orchestrator = Orchestrator()
|
|
mock_total.return_value = orchestrator.MAX_TOTAL_WORKERS
|
|
|
|
mock_worker = MagicMock()
|
|
mock_worker.get_running_workers.return_value = []
|
|
|
|
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
|
|
|
|
@patch.object(Orchestrator, 'get_total_worker_count')
|
|
def test_should_spawn_worker_success(self, mock_total):
|
|
"""should_spawn_worker should return True when conditions are met."""
|
|
orchestrator = Orchestrator()
|
|
mock_total.return_value = 0
|
|
|
|
mock_worker = MagicMock()
|
|
mock_worker.get_running_workers.return_value = []
|
|
mock_worker.MAX_CONCURRENT_TASKS = 5
|
|
|
|
self.assertTrue(orchestrator.should_spawn_worker(mock_worker, 10))
|
|
|
|
@patch.object(Orchestrator, 'get_total_worker_count')
|
|
def test_should_spawn_worker_enough_workers(self, mock_total):
|
|
"""should_spawn_worker should return False when enough workers for queue."""
|
|
orchestrator = Orchestrator()
|
|
mock_total.return_value = 2
|
|
|
|
mock_worker = MagicMock()
|
|
mock_worker.get_running_workers.return_value = [{}] # 1 worker running
|
|
mock_worker.MAX_CONCURRENT_TASKS = 5 # Can handle 5 items
|
|
|
|
# Queue size (3) <= running_workers (1) * MAX_CONCURRENT_TASKS (5)
|
|
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3))
|
|
|
|
|
|
class TestOrchestratorWithProcess(TestCase):
|
|
"""Test Orchestrator using Process model for tracking."""
|
|
|
|
def setUp(self):
|
|
"""Reset process cache."""
|
|
import archivebox.machine.models as models
|
|
models._CURRENT_MACHINE = None
|
|
models._CURRENT_PROCESS = None
|
|
|
|
def test_is_running_no_orchestrator(self):
|
|
"""is_running should return False when no orchestrator process exists."""
|
|
from archivebox.machine.models import Process
|
|
|
|
# Clean up any stale processes first
|
|
Process.cleanup_stale_running()
|
|
|
|
# Mark any running orchestrators as exited for clean test state
|
|
Process.objects.filter(
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
status=Process.StatusChoices.RUNNING
|
|
).update(status=Process.StatusChoices.EXITED)
|
|
|
|
self.assertFalse(Orchestrator.is_running())
|
|
|
|
def test_is_running_with_orchestrator_process(self):
|
|
"""is_running should return True when orchestrator Process exists."""
|
|
from archivebox.machine.models import Process, Machine
|
|
|
|
machine = Machine.current()
|
|
|
|
# Create an orchestrator Process record
|
|
proc = Process.objects.create(
|
|
machine=machine,
|
|
process_type=Process.TypeChoices.ORCHESTRATOR,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=os.getpid(), # Use current PID so it appears alive
|
|
started_at=timezone.now(),
|
|
cmd=['archivebox', 'manage', 'orchestrator'],
|
|
)
|
|
|
|
try:
|
|
# Should detect running orchestrator
|
|
self.assertTrue(Orchestrator.is_running())
|
|
finally:
|
|
# Clean up
|
|
proc.status = Process.StatusChoices.EXITED
|
|
proc.save()
|
|
|
|
def test_orchestrator_uses_process_for_is_running(self):
|
|
"""Orchestrator.is_running should use Process.get_running_count."""
|
|
from archivebox.machine.models import Process
|
|
|
|
# Verify is_running uses Process model, not pid files
|
|
with patch.object(Process, 'get_running_count') as mock_count:
|
|
mock_count.return_value = 1
|
|
|
|
result = Orchestrator.is_running()
|
|
|
|
# Should have called Process.get_running_count with orchestrator type
|
|
mock_count.assert_called()
|
|
self.assertTrue(result)
|
|
|
|
|
|
class TestProcessBasedWorkerTracking(TestCase):
|
|
"""Test Process model methods that replace pid_utils functionality."""
|
|
|
|
def setUp(self):
|
|
"""Reset caches."""
|
|
import archivebox.machine.models as models
|
|
models._CURRENT_MACHINE = None
|
|
models._CURRENT_PROCESS = None
|
|
|
|
def test_process_current_creates_record(self):
|
|
"""Process.current() should create a Process record for current PID."""
|
|
from archivebox.machine.models import Process
|
|
|
|
proc = Process.current()
|
|
|
|
self.assertIsNotNone(proc)
|
|
self.assertEqual(proc.pid, os.getpid())
|
|
self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
|
|
self.assertIsNotNone(proc.machine)
|
|
self.assertIsNotNone(proc.started_at)
|
|
|
|
def test_process_current_caches_result(self):
|
|
"""Process.current() should return cached Process within interval."""
|
|
from archivebox.machine.models import Process
|
|
|
|
proc1 = Process.current()
|
|
proc2 = Process.current()
|
|
|
|
self.assertEqual(proc1.id, proc2.id)
|
|
|
|
def test_process_get_running_count(self):
|
|
"""Process.get_running_count should count running processes by type."""
|
|
from archivebox.machine.models import Process, Machine
|
|
|
|
machine = Machine.current()
|
|
|
|
# Create some worker processes
|
|
for i in range(3):
|
|
Process.objects.create(
|
|
machine=machine,
|
|
process_type=Process.TypeChoices.WORKER,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=99990 + i, # Fake PIDs
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
count = Process.get_running_count(process_type=Process.TypeChoices.WORKER)
|
|
self.assertGreaterEqual(count, 3)
|
|
|
|
def test_process_get_next_worker_id(self):
|
|
"""Process.get_next_worker_id should return count of running workers."""
|
|
from archivebox.machine.models import Process, Machine
|
|
|
|
machine = Machine.current()
|
|
|
|
# Create 2 worker processes
|
|
for i in range(2):
|
|
Process.objects.create(
|
|
machine=machine,
|
|
process_type=Process.TypeChoices.WORKER,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=99980 + i,
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
|
|
self.assertGreaterEqual(next_id, 2)
|
|
|
|
def test_process_cleanup_stale_running(self):
|
|
"""Process.cleanup_stale_running should mark stale processes as exited."""
|
|
from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW
|
|
|
|
machine = Machine.current()
|
|
|
|
# Create a stale process (old started_at, fake PID)
|
|
stale_proc = Process.objects.create(
|
|
machine=machine,
|
|
process_type=Process.TypeChoices.WORKER,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=999999, # Fake PID that doesn't exist
|
|
started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
|
|
)
|
|
|
|
cleaned = Process.cleanup_stale_running()
|
|
|
|
self.assertGreaterEqual(cleaned, 1)
|
|
|
|
stale_proc.refresh_from_db()
|
|
self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED)
|
|
|
|
def test_process_get_running(self):
|
|
"""Process.get_running should return queryset of running processes."""
|
|
from archivebox.machine.models import Process, Machine
|
|
|
|
machine = Machine.current()
|
|
|
|
# Create a running process
|
|
proc = Process.objects.create(
|
|
machine=machine,
|
|
process_type=Process.TypeChoices.HOOK,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=99970,
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
running = Process.get_running(process_type=Process.TypeChoices.HOOK)
|
|
|
|
self.assertIn(proc, running)
|
|
|
|
def test_process_type_detection(self):
|
|
"""Process._detect_process_type should detect process type from argv."""
|
|
from archivebox.machine.models import Process
|
|
|
|
# Test detection logic
|
|
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
|
|
result = Process._detect_process_type()
|
|
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
|
|
|
|
with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
|
|
result = Process._detect_process_type()
|
|
self.assertEqual(result, Process.TypeChoices.CLI)
|
|
|
|
with patch('sys.argv', ['supervisord', '-c', 'config.ini']):
|
|
result = Process._detect_process_type()
|
|
self.assertEqual(result, Process.TypeChoices.SUPERVISORD)
|
|
|
|
|
|
class TestProcessLifecycle(TestCase):
|
|
"""Test Process model lifecycle methods."""
|
|
|
|
def setUp(self):
|
|
"""Reset caches and create a machine."""
|
|
import archivebox.machine.models as models
|
|
models._CURRENT_MACHINE = None
|
|
models._CURRENT_PROCESS = None
|
|
self.machine = models.Machine.current()
|
|
|
|
def test_process_is_running_property(self):
|
|
"""Process.is_running should check actual OS process."""
|
|
from archivebox.machine.models import Process
|
|
|
|
# Create a process with current PID (should be running)
|
|
proc = Process.objects.create(
|
|
machine=self.machine,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=os.getpid(),
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
# Should be running (current process exists)
|
|
self.assertTrue(proc.is_running)
|
|
|
|
# Create a process with fake PID
|
|
fake_proc = Process.objects.create(
|
|
machine=self.machine,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=999999,
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
# Should not be running (PID doesn't exist)
|
|
self.assertFalse(fake_proc.is_running)
|
|
|
|
def test_process_poll(self):
|
|
"""Process.poll should check and update exit status."""
|
|
from archivebox.machine.models import Process
|
|
|
|
# Create a process with fake PID (already exited)
|
|
proc = Process.objects.create(
|
|
machine=self.machine,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=999999,
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
exit_code = proc.poll()
|
|
|
|
# Should have detected exit and updated status
|
|
self.assertIsNotNone(exit_code)
|
|
proc.refresh_from_db()
|
|
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
|
|
|
|
def test_process_terminate_already_dead(self):
|
|
"""Process.terminate should handle already-dead processes."""
|
|
from archivebox.machine.models import Process
|
|
|
|
# Create a process with fake PID
|
|
proc = Process.objects.create(
|
|
machine=self.machine,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=999999,
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
result = proc.terminate()
|
|
|
|
# Should return False (was already dead)
|
|
self.assertFalse(result)
|
|
|
|
proc.refresh_from_db()
|
|
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
|
|
|
|
def test_process_tree_traversal(self):
|
|
"""Process parent/children relationships should work."""
|
|
from archivebox.machine.models import Process
|
|
|
|
# Create parent process
|
|
parent = Process.objects.create(
|
|
machine=self.machine,
|
|
process_type=Process.TypeChoices.CLI,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=1,
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
# Create child process
|
|
child = Process.objects.create(
|
|
machine=self.machine,
|
|
parent=parent,
|
|
process_type=Process.TypeChoices.WORKER,
|
|
status=Process.StatusChoices.RUNNING,
|
|
pid=2,
|
|
started_at=timezone.now(),
|
|
)
|
|
|
|
# Test relationships
|
|
self.assertEqual(child.parent, parent)
|
|
self.assertIn(child, parent.children.all())
|
|
self.assertEqual(child.root, parent)
|
|
self.assertEqual(child.depth, 1)
|
|
self.assertEqual(parent.depth, 0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
pytest.main([__file__, '-v'])
|