Files
ArchiveBox/archivebox/workers/tests/test_orchestrator.py
Claude 9bf7a520a0 Update tests for new Process model-based architecture
- Remove pid_utils tests (module deleted in dev)
- Update orchestrator tests to use Process model for tracking
- Add tests for Process.current(), cleanup_stale_running(), terminate()
- Add tests for Process hierarchy (parent/child, root, depth)
- Add tests for Process.get_running(), get_running_count()
- Add tests for ProcessMachine state machine
- Update machine model tests to match current API (from_jsonl vs from_json)
2025-12-31 11:51:42 +00:00

454 lines
16 KiB
Python

"""
Unit tests for the Orchestrator and Worker classes.
Tests cover:
1. Orchestrator lifecycle (startup, shutdown)
2. Queue polling and worker spawning
3. Idle detection and exit logic
4. Worker registration and management
5. Process model methods (replacing old pid_utils)
"""
import os
import tempfile
import time
from pathlib import Path
from datetime import timedelta
from unittest.mock import patch, MagicMock
import pytest
from django.test import TestCase
from django.utils import timezone
from archivebox.workers.orchestrator import Orchestrator
class TestOrchestratorUnit(TestCase):
"""Unit tests for Orchestrator class (mocked dependencies)."""
def test_orchestrator_creation(self):
"""Orchestrator should initialize with correct defaults."""
orchestrator = Orchestrator(exit_on_idle=True)
self.assertTrue(orchestrator.exit_on_idle)
self.assertEqual(orchestrator.idle_count, 0)
self.assertIsNone(orchestrator.pid_file)
def test_orchestrator_repr(self):
"""Orchestrator __repr__ should include PID."""
orchestrator = Orchestrator()
repr_str = repr(orchestrator)
self.assertIn('Orchestrator', repr_str)
self.assertIn(str(os.getpid()), repr_str)
def test_has_pending_work(self):
"""has_pending_work should check if any queue has items."""
orchestrator = Orchestrator()
self.assertFalse(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 0}))
self.assertTrue(orchestrator.has_pending_work({'crawl': 0, 'snapshot': 5}))
self.assertTrue(orchestrator.has_pending_work({'crawl': 10, 'snapshot': 0}))
def test_should_exit_not_exit_on_idle(self):
"""should_exit should return False when exit_on_idle is False."""
orchestrator = Orchestrator(exit_on_idle=False)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
def test_should_exit_pending_work(self):
"""should_exit should return False when there's pending work."""
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 5}))
@patch.object(Orchestrator, 'has_running_workers')
def test_should_exit_running_workers(self, mock_has_workers):
"""should_exit should return False when workers are running."""
mock_has_workers.return_value = True
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = 100
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
@patch.object(Orchestrator, 'has_running_workers')
@patch.object(Orchestrator, 'has_future_work')
def test_should_exit_idle_timeout(self, mock_future, mock_workers):
"""should_exit should return True after idle timeout with no work."""
mock_workers.return_value = False
mock_future.return_value = False
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT
self.assertTrue(orchestrator.should_exit({'crawl': 0, 'snapshot': 0}))
@patch.object(Orchestrator, 'has_running_workers')
@patch.object(Orchestrator, 'has_future_work')
def test_should_exit_below_idle_timeout(self, mock_future, mock_workers):
"""should_exit should return False below idle timeout."""
mock_workers.return_value = False
mock_future.return_value = False
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.idle_count = orchestrator.IDLE_TIMEOUT - 1
self.assertFalse(orchestrator.should_exit({'crawl': 0}))
def test_should_spawn_worker_no_queue(self):
"""should_spawn_worker should return False when queue is empty."""
orchestrator = Orchestrator()
# Create a mock worker class
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 0))
def test_should_spawn_worker_at_limit(self):
"""should_spawn_worker should return False when at per-type limit."""
orchestrator = Orchestrator()
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = [{}] * orchestrator.MAX_WORKERS_PER_TYPE
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_at_total_limit(self, mock_total):
"""should_spawn_worker should return False when at total limit."""
orchestrator = Orchestrator()
mock_total.return_value = orchestrator.MAX_TOTAL_WORKERS
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_success(self, mock_total):
"""should_spawn_worker should return True when conditions are met."""
orchestrator = Orchestrator()
mock_total.return_value = 0
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = []
mock_worker.MAX_CONCURRENT_TASKS = 5
self.assertTrue(orchestrator.should_spawn_worker(mock_worker, 10))
@patch.object(Orchestrator, 'get_total_worker_count')
def test_should_spawn_worker_enough_workers(self, mock_total):
"""should_spawn_worker should return False when enough workers for queue."""
orchestrator = Orchestrator()
mock_total.return_value = 2
mock_worker = MagicMock()
mock_worker.get_running_workers.return_value = [{}] # 1 worker running
mock_worker.MAX_CONCURRENT_TASKS = 5 # Can handle 5 items
# Queue size (3) <= running_workers (1) * MAX_CONCURRENT_TASKS (5)
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3))
class TestOrchestratorWithProcess(TestCase):
"""Test Orchestrator using Process model for tracking."""
def setUp(self):
"""Reset process cache."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_is_running_no_orchestrator(self):
"""is_running should return False when no orchestrator process exists."""
from archivebox.machine.models import Process
# Clean up any stale processes first
Process.cleanup_stale_running()
# Mark any running orchestrators as exited for clean test state
Process.objects.filter(
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING
).update(status=Process.StatusChoices.EXITED)
self.assertFalse(Orchestrator.is_running())
def test_is_running_with_orchestrator_process(self):
"""is_running should return True when orchestrator Process exists."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create an orchestrator Process record
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(), # Use current PID so it appears alive
started_at=timezone.now(),
cmd=['archivebox', 'manage', 'orchestrator'],
)
try:
# Should detect running orchestrator
self.assertTrue(Orchestrator.is_running())
finally:
# Clean up
proc.status = Process.StatusChoices.EXITED
proc.save()
def test_orchestrator_uses_process_for_is_running(self):
"""Orchestrator.is_running should use Process.get_running_count."""
from archivebox.machine.models import Process
# Verify is_running uses Process model, not pid files
with patch.object(Process, 'get_running_count') as mock_count:
mock_count.return_value = 1
result = Orchestrator.is_running()
# Should have called Process.get_running_count with orchestrator type
mock_count.assert_called()
self.assertTrue(result)
class TestProcessBasedWorkerTracking(TestCase):
"""Test Process model methods that replace pid_utils functionality."""
def setUp(self):
"""Reset caches."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_process_current_creates_record(self):
"""Process.current() should create a Process record for current PID."""
from archivebox.machine.models import Process
proc = Process.current()
self.assertIsNotNone(proc)
self.assertEqual(proc.pid, os.getpid())
self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
self.assertIsNotNone(proc.machine)
self.assertIsNotNone(proc.started_at)
def test_process_current_caches_result(self):
"""Process.current() should return cached Process within interval."""
from archivebox.machine.models import Process
proc1 = Process.current()
proc2 = Process.current()
self.assertEqual(proc1.id, proc2.id)
def test_process_get_running_count(self):
"""Process.get_running_count should count running processes by type."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create some worker processes
for i in range(3):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99990 + i, # Fake PIDs
started_at=timezone.now(),
)
count = Process.get_running_count(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(count, 3)
def test_process_get_next_worker_id(self):
"""Process.get_next_worker_id should return count of running workers."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create 2 worker processes
for i in range(2):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99980 + i,
started_at=timezone.now(),
)
next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(next_id, 2)
def test_process_cleanup_stale_running(self):
"""Process.cleanup_stale_running should mark stale processes as exited."""
from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW
machine = Machine.current()
# Create a stale process (old started_at, fake PID)
stale_proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=999999, # Fake PID that doesn't exist
started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
)
cleaned = Process.cleanup_stale_running()
self.assertGreaterEqual(cleaned, 1)
stale_proc.refresh_from_db()
self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED)
def test_process_get_running(self):
"""Process.get_running should return queryset of running processes."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create a running process
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99970,
started_at=timezone.now(),
)
running = Process.get_running(process_type=Process.TypeChoices.HOOK)
self.assertIn(proc, running)
def test_process_type_detection(self):
"""Process._detect_process_type should detect process type from argv."""
from archivebox.machine.models import Process
# Test detection logic
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.CLI)
with patch('sys.argv', ['supervisord', '-c', 'config.ini']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.SUPERVISORD)
class TestProcessLifecycle(TestCase):
"""Test Process model lifecycle methods."""
def setUp(self):
"""Reset caches and create a machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
self.machine = models.Machine.current()
def test_process_is_running_property(self):
"""Process.is_running should check actual OS process."""
from archivebox.machine.models import Process
# Create a process with current PID (should be running)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(),
started_at=timezone.now(),
)
# Should be running (current process exists)
self.assertTrue(proc.is_running)
# Create a process with fake PID
fake_proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
# Should not be running (PID doesn't exist)
self.assertFalse(fake_proc.is_running)
def test_process_poll(self):
"""Process.poll should check and update exit status."""
from archivebox.machine.models import Process
# Create a process with fake PID (already exited)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
exit_code = proc.poll()
# Should have detected exit and updated status
self.assertIsNotNone(exit_code)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_terminate_already_dead(self):
"""Process.terminate should handle already-dead processes."""
from archivebox.machine.models import Process
# Create a process with fake PID
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
result = proc.terminate()
# Should return False (was already dead)
self.assertFalse(result)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_tree_traversal(self):
"""Process parent/children relationships should work."""
from archivebox.machine.models import Process
# Create parent process
parent = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
pid=1,
started_at=timezone.now(),
)
# Create child process
child = Process.objects.create(
machine=self.machine,
parent=parent,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=2,
started_at=timezone.now(),
)
# Test relationships
self.assertEqual(child.parent, parent)
self.assertIn(child, parent.children.all())
self.assertEqual(child.root, parent)
self.assertEqual(child.depth, 1)
self.assertEqual(parent.depth, 0)
if __name__ == '__main__':
pytest.main([__file__, '-v'])