Update tests for new Process model-based architecture

- Remove pid_utils tests (module deleted in dev)
- Update orchestrator tests to use Process model for tracking
- Add tests for Process.current(), cleanup_stale_running(), terminate()
- Add tests for Process hierarchy (parent/child, root, depth)
- Add tests for Process.get_running(), get_running_count()
- Add tests for ProcessMachine state machine
- Update machine model tests to match current API (from_jsonl vs from_json)
This commit is contained in:
Claude
2025-12-31 11:51:42 +00:00
parent a063d8cd43
commit 9bf7a520a0
2 changed files with 547 additions and 369 deletions

View File

@@ -6,193 +6,23 @@ Tests cover:
2. Queue polling and worker spawning
3. Idle detection and exit logic
4. Worker registration and management
5. PID file utilities
5. Process model methods (replacing old pid_utils)
"""
import os
import tempfile
import time
import signal
from pathlib import Path
from datetime import timedelta
from unittest.mock import patch, MagicMock
import pytest
from django.test import TestCase, override_settings
from django.test import TestCase
from django.utils import timezone
from archivebox.workers.pid_utils import (
get_pid_dir,
write_pid_file,
read_pid_file,
remove_pid_file,
is_process_alive,
get_all_pid_files,
get_all_worker_pids,
cleanup_stale_pid_files,
get_running_worker_count,
get_next_worker_id,
stop_worker,
)
from archivebox.workers.orchestrator import Orchestrator
class TestPidUtils(TestCase):
"""Test PID file utility functions."""
def setUp(self):
"""Create a temporary directory for PID files."""
self.temp_dir = tempfile.mkdtemp()
self.pid_dir_patch = patch(
'archivebox.workers.pid_utils.get_pid_dir',
return_value=Path(self.temp_dir)
)
self.pid_dir_patch.start()
def tearDown(self):
"""Clean up temporary directory."""
self.pid_dir_patch.stop()
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_write_pid_file_orchestrator(self):
"""write_pid_file should create orchestrator.pid for orchestrator."""
pid_file = write_pid_file('orchestrator')
self.assertTrue(pid_file.exists())
self.assertEqual(pid_file.name, 'orchestrator.pid')
content = pid_file.read_text().strip().split('\n')
self.assertEqual(int(content[0]), os.getpid())
self.assertEqual(content[1], 'orchestrator')
def test_write_pid_file_worker(self):
"""write_pid_file should create numbered pid file for workers."""
pid_file = write_pid_file('snapshot', worker_id=3)
self.assertTrue(pid_file.exists())
self.assertEqual(pid_file.name, 'snapshot_worker_3.pid')
def test_write_pid_file_with_extractor(self):
"""write_pid_file should include extractor in content."""
pid_file = write_pid_file('archiveresult', worker_id=0, extractor='singlefile')
content = pid_file.read_text().strip().split('\n')
self.assertEqual(content[2], 'singlefile')
def test_read_pid_file_valid(self):
"""read_pid_file should parse valid PID files."""
pid_file = write_pid_file('snapshot', worker_id=1)
info = read_pid_file(pid_file)
self.assertIsNotNone(info)
self.assertEqual(info['pid'], os.getpid())
self.assertEqual(info['worker_type'], 'snapshot')
self.assertEqual(info['pid_file'], pid_file)
self.assertIsNotNone(info['started_at'])
def test_read_pid_file_invalid(self):
"""read_pid_file should return None for invalid files."""
invalid_file = Path(self.temp_dir) / 'invalid.pid'
invalid_file.write_text('not valid')
info = read_pid_file(invalid_file)
self.assertIsNone(info)
def test_read_pid_file_nonexistent(self):
"""read_pid_file should return None for nonexistent files."""
info = read_pid_file(Path(self.temp_dir) / 'nonexistent.pid')
self.assertIsNone(info)
def test_remove_pid_file(self):
"""remove_pid_file should delete the file."""
pid_file = write_pid_file('test', worker_id=0)
self.assertTrue(pid_file.exists())
remove_pid_file(pid_file)
self.assertFalse(pid_file.exists())
def test_remove_pid_file_nonexistent(self):
"""remove_pid_file should not raise for nonexistent files."""
# Should not raise
remove_pid_file(Path(self.temp_dir) / 'nonexistent.pid')
def test_is_process_alive_current(self):
"""is_process_alive should return True for current process."""
self.assertTrue(is_process_alive(os.getpid()))
def test_is_process_alive_dead(self):
"""is_process_alive should return False for dead processes."""
# PID 999999 is unlikely to exist
self.assertFalse(is_process_alive(999999))
def test_get_all_pid_files(self):
"""get_all_pid_files should return all .pid files."""
write_pid_file('orchestrator')
write_pid_file('snapshot', worker_id=0)
write_pid_file('crawl', worker_id=1)
files = get_all_pid_files()
self.assertEqual(len(files), 3)
def test_get_all_worker_pids(self):
"""get_all_worker_pids should return info for live workers."""
write_pid_file('snapshot', worker_id=0)
write_pid_file('crawl', worker_id=1)
workers = get_all_worker_pids()
# All should be alive since they're current process PID
self.assertEqual(len(workers), 2)
def test_get_all_worker_pids_filtered(self):
"""get_all_worker_pids should filter by worker type."""
write_pid_file('snapshot', worker_id=0)
write_pid_file('snapshot', worker_id=1)
write_pid_file('crawl', worker_id=0)
snapshot_workers = get_all_worker_pids('snapshot')
self.assertEqual(len(snapshot_workers), 2)
crawl_workers = get_all_worker_pids('crawl')
self.assertEqual(len(crawl_workers), 1)
def test_cleanup_stale_pid_files(self):
"""cleanup_stale_pid_files should remove files for dead processes."""
# Create a PID file with a dead PID
stale_file = Path(self.temp_dir) / 'stale_worker_0.pid'
stale_file.write_text('999999\nstale\n\n2024-01-01T00:00:00+00:00\n')
# Create a valid PID file (current process)
write_pid_file('valid', worker_id=0)
removed = cleanup_stale_pid_files()
self.assertEqual(removed, 1)
self.assertFalse(stale_file.exists())
def test_get_running_worker_count(self):
"""get_running_worker_count should count workers of a type."""
write_pid_file('snapshot', worker_id=0)
write_pid_file('snapshot', worker_id=1)
write_pid_file('crawl', worker_id=0)
self.assertEqual(get_running_worker_count('snapshot'), 2)
self.assertEqual(get_running_worker_count('crawl'), 1)
self.assertEqual(get_running_worker_count('archiveresult'), 0)
def test_get_next_worker_id(self):
"""get_next_worker_id should find lowest unused ID."""
write_pid_file('snapshot', worker_id=0)
write_pid_file('snapshot', worker_id=1)
write_pid_file('snapshot', worker_id=3) # Skip 2
next_id = get_next_worker_id('snapshot')
self.assertEqual(next_id, 2)
def test_get_next_worker_id_empty(self):
"""get_next_worker_id should return 0 if no workers exist."""
next_id = get_next_worker_id('snapshot')
self.assertEqual(next_id, 0)
class TestOrchestratorUnit(TestCase):
"""Unit tests for Orchestrator class (mocked dependencies)."""
@@ -323,41 +153,300 @@ class TestOrchestratorUnit(TestCase):
self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3))
class TestOrchestratorIsRunning(TestCase):
"""Test Orchestrator.is_running() class method."""
class TestOrchestratorWithProcess(TestCase):
"""Test Orchestrator using Process model for tracking."""
def setUp(self):
"""Create a temporary directory for PID files."""
self.temp_dir = tempfile.mkdtemp()
self.pid_dir_patch = patch(
'archivebox.workers.pid_utils.get_pid_dir',
return_value=Path(self.temp_dir)
"""Reset process cache."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_is_running_no_orchestrator(self):
"""is_running should return False when no orchestrator process exists."""
from archivebox.machine.models import Process
# Clean up any stale processes first
Process.cleanup_stale_running()
# Mark any running orchestrators as exited for clean test state
Process.objects.filter(
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING
).update(status=Process.StatusChoices.EXITED)
self.assertFalse(Orchestrator.is_running())
def test_is_running_with_orchestrator_process(self):
"""is_running should return True when orchestrator Process exists."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create an orchestrator Process record
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.ORCHESTRATOR,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(), # Use current PID so it appears alive
started_at=timezone.now(),
cmd=['archivebox', 'manage', 'orchestrator'],
)
self.pid_dir_patch.start()
def tearDown(self):
"""Clean up."""
self.pid_dir_patch.stop()
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
try:
# Should detect running orchestrator
self.assertTrue(Orchestrator.is_running())
finally:
# Clean up
proc.status = Process.StatusChoices.EXITED
proc.save()
def test_is_running_no_pid_file(self):
"""is_running should return False when no orchestrator PID file."""
self.assertFalse(Orchestrator.is_running())
def test_orchestrator_uses_process_for_is_running(self):
"""Orchestrator.is_running should use Process.get_running_count."""
from archivebox.machine.models import Process
def test_is_running_with_live_orchestrator(self):
"""is_running should return True when orchestrator PID file exists."""
write_pid_file('orchestrator')
self.assertTrue(Orchestrator.is_running())
# Verify is_running uses Process model, not pid files
with patch.object(Process, 'get_running_count') as mock_count:
mock_count.return_value = 1
def test_is_running_with_dead_orchestrator(self):
"""is_running should return False when orchestrator process is dead."""
# Create a PID file with a dead PID
pid_file = Path(self.temp_dir) / 'orchestrator.pid'
pid_file.write_text('999999\norchestrator\n\n2024-01-01T00:00:00+00:00\n')
result = Orchestrator.is_running()
# The get_all_worker_pids filters out dead processes
self.assertFalse(Orchestrator.is_running())
# Should have called Process.get_running_count with orchestrator type
mock_count.assert_called()
self.assertTrue(result)
class TestProcessBasedWorkerTracking(TestCase):
"""Test Process model methods that replace pid_utils functionality."""
def setUp(self):
"""Reset caches."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
def test_process_current_creates_record(self):
"""Process.current() should create a Process record for current PID."""
from archivebox.machine.models import Process
proc = Process.current()
self.assertIsNotNone(proc)
self.assertEqual(proc.pid, os.getpid())
self.assertEqual(proc.status, Process.StatusChoices.RUNNING)
self.assertIsNotNone(proc.machine)
self.assertIsNotNone(proc.started_at)
def test_process_current_caches_result(self):
"""Process.current() should return cached Process within interval."""
from archivebox.machine.models import Process
proc1 = Process.current()
proc2 = Process.current()
self.assertEqual(proc1.id, proc2.id)
def test_process_get_running_count(self):
"""Process.get_running_count should count running processes by type."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create some worker processes
for i in range(3):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99990 + i, # Fake PIDs
started_at=timezone.now(),
)
count = Process.get_running_count(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(count, 3)
def test_process_get_next_worker_id(self):
"""Process.get_next_worker_id should return count of running workers."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create 2 worker processes
for i in range(2):
Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=99980 + i,
started_at=timezone.now(),
)
next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
self.assertGreaterEqual(next_id, 2)
def test_process_cleanup_stale_running(self):
"""Process.cleanup_stale_running should mark stale processes as exited."""
from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW
machine = Machine.current()
# Create a stale process (old started_at, fake PID)
stale_proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=999999, # Fake PID that doesn't exist
started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1),
)
cleaned = Process.cleanup_stale_running()
self.assertGreaterEqual(cleaned, 1)
stale_proc.refresh_from_db()
self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED)
def test_process_get_running(self):
"""Process.get_running should return queryset of running processes."""
from archivebox.machine.models import Process, Machine
machine = Machine.current()
# Create a running process
proc = Process.objects.create(
machine=machine,
process_type=Process.TypeChoices.HOOK,
status=Process.StatusChoices.RUNNING,
pid=99970,
started_at=timezone.now(),
)
running = Process.get_running(process_type=Process.TypeChoices.HOOK)
self.assertIn(proc, running)
def test_process_type_detection(self):
"""Process._detect_process_type should detect process type from argv."""
from archivebox.machine.models import Process
# Test detection logic
with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR)
with patch('sys.argv', ['archivebox', 'add', 'http://example.com']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.CLI)
with patch('sys.argv', ['supervisord', '-c', 'config.ini']):
result = Process._detect_process_type()
self.assertEqual(result, Process.TypeChoices.SUPERVISORD)
class TestProcessLifecycle(TestCase):
"""Test Process model lifecycle methods."""
def setUp(self):
"""Reset caches and create a machine."""
import archivebox.machine.models as models
models._CURRENT_MACHINE = None
models._CURRENT_PROCESS = None
self.machine = models.Machine.current()
def test_process_is_running_property(self):
"""Process.is_running should check actual OS process."""
from archivebox.machine.models import Process
# Create a process with current PID (should be running)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=os.getpid(),
started_at=timezone.now(),
)
# Should be running (current process exists)
self.assertTrue(proc.is_running)
# Create a process with fake PID
fake_proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
# Should not be running (PID doesn't exist)
self.assertFalse(fake_proc.is_running)
def test_process_poll(self):
"""Process.poll should check and update exit status."""
from archivebox.machine.models import Process
# Create a process with fake PID (already exited)
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
exit_code = proc.poll()
# Should have detected exit and updated status
self.assertIsNotNone(exit_code)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_terminate_already_dead(self):
"""Process.terminate should handle already-dead processes."""
from archivebox.machine.models import Process
# Create a process with fake PID
proc = Process.objects.create(
machine=self.machine,
status=Process.StatusChoices.RUNNING,
pid=999999,
started_at=timezone.now(),
)
result = proc.terminate()
# Should return False (was already dead)
self.assertFalse(result)
proc.refresh_from_db()
self.assertEqual(proc.status, Process.StatusChoices.EXITED)
def test_process_tree_traversal(self):
"""Process parent/children relationships should work."""
from archivebox.machine.models import Process
# Create parent process
parent = Process.objects.create(
machine=self.machine,
process_type=Process.TypeChoices.CLI,
status=Process.StatusChoices.RUNNING,
pid=1,
started_at=timezone.now(),
)
# Create child process
child = Process.objects.create(
machine=self.machine,
parent=parent,
process_type=Process.TypeChoices.WORKER,
status=Process.StatusChoices.RUNNING,
pid=2,
started_at=timezone.now(),
)
# Test relationships
self.assertEqual(child.parent, parent)
self.assertIn(child, parent.children.all())
self.assertEqual(child.root, parent)
self.assertEqual(child.depth, 1)
self.assertEqual(parent.depth, 0)
if __name__ == '__main__':