diff --git a/archivebox/machine/tests/test_machine_models.py b/archivebox/machine/tests/test_machine_models.py index bfbe2968..427c98d8 100644 --- a/archivebox/machine/tests/test_machine_models.py +++ b/archivebox/machine/tests/test_machine_models.py @@ -5,18 +5,20 @@ Tests cover: 1. Machine model creation and current() method 2. NetworkInterface model and network detection 3. Binary model lifecycle and state machine -4. Process model lifecycle and state machine +4. Process model lifecycle, hierarchy, and state machine 5. JSONL serialization/deserialization 6. Manager methods +7. Process tracking methods (replacing pid_utils) """ import os -import tempfile +import sys from pathlib import Path from datetime import timedelta +from unittest.mock import patch import pytest -from django.test import TestCase, override_settings +from django.test import TestCase from django.utils import timezone from archivebox.machine.models import ( @@ -27,11 +29,8 @@ from archivebox.machine.models import ( BinaryMachine, ProcessMachine, MACHINE_RECHECK_INTERVAL, - NETWORK_INTERFACE_RECHECK_INTERVAL, - BINARY_RECHECK_INTERVAL, - _CURRENT_MACHINE, - _CURRENT_INTERFACE, - _CURRENT_BINARIES, + PROCESS_RECHECK_INTERVAL, + PID_REUSE_WINDOW, ) @@ -76,55 +75,23 @@ class TestMachineModel(TestCase): # Should have fetched/updated the machine (same GUID) self.assertEqual(machine1.guid, machine2.guid) - def test_machine_to_json(self): - """Machine.to_json() should serialize correctly.""" - machine = Machine.current() - json_data = machine.to_json() - - self.assertEqual(json_data['type'], 'Machine') - self.assertEqual(json_data['id'], str(machine.id)) - self.assertEqual(json_data['guid'], machine.guid) - self.assertEqual(json_data['hostname'], machine.hostname) - self.assertIn('os_arch', json_data) - self.assertIn('os_family', json_data) - - def test_machine_to_jsonl(self): - """Machine.to_jsonl() should yield JSON records.""" - machine = Machine.current() - records = list(machine.to_jsonl()) - - self.assertEqual(len(records), 1) - self.assertEqual(records[0]['type'], 'Machine') - self.assertEqual(records[0]['id'], str(machine.id)) - - def test_machine_to_jsonl_deduplication(self): - """Machine.to_jsonl() should deduplicate with seen set.""" - machine = Machine.current() - seen = set() - - records1 = list(machine.to_jsonl(seen=seen)) - records2 = list(machine.to_jsonl(seen=seen)) - - self.assertEqual(len(records1), 1) - self.assertEqual(len(records2), 0) # Already seen - - def test_machine_from_json_update(self): - """Machine.from_json() should update machine config.""" - machine = Machine.current() + def test_machine_from_jsonl_update(self): + """Machine.from_jsonl() should update machine config.""" + Machine.current() # Ensure machine exists record = { '_method': 'update', 'key': 'WGET_BINARY', 'value': '/usr/bin/wget', } - result = Machine.from_json(record) + result = Machine.from_jsonl(record) self.assertIsNotNone(result) self.assertEqual(result.config.get('WGET_BINARY'), '/usr/bin/wget') - def test_machine_from_json_invalid(self): - """Machine.from_json() should return None for invalid records.""" - result = Machine.from_json({'invalid': 'record'}) + def test_machine_from_jsonl_invalid(self): + """Machine.from_jsonl() should return None for invalid records.""" + result = Machine.from_jsonl({'invalid': 'record'}) self.assertIsNone(result) def test_machine_manager_current(self): @@ -150,7 +117,6 @@ class TestNetworkInterfaceModel(TestCase): self.assertIsNotNone(interface) self.assertIsNotNone(interface.id) self.assertIsNotNone(interface.machine) - # IP addresses should be populated self.assertIsNotNone(interface.ip_local) def test_networkinterface_current_returns_cached(self): @@ -160,17 +126,6 @@ class TestNetworkInterfaceModel(TestCase): self.assertEqual(interface1.id, interface2.id) - def test_networkinterface_to_json(self): - """NetworkInterface.to_json() should serialize correctly.""" - interface = NetworkInterface.current() - json_data = interface.to_json() - - self.assertEqual(json_data['type'], 'NetworkInterface') - self.assertEqual(json_data['id'], str(interface.id)) - self.assertEqual(json_data['machine_id'], str(interface.machine_id)) - self.assertIn('ip_local', json_data) - self.assertIn('ip_public', json_data) - def test_networkinterface_manager_current(self): """NetworkInterface.objects.current() should return current interface.""" interface = NetworkInterface.objects.current() @@ -178,7 +133,7 @@ class TestNetworkInterfaceModel(TestCase): class TestBinaryModel(TestCase): - """Test the Binary model and BinaryMachine state machine.""" + """Test the Binary model.""" def setUp(self): """Reset cached binaries and create a machine.""" @@ -211,67 +166,10 @@ class TestBinaryModel(TestCase): self.assertTrue(binary.is_valid) - def test_binary_to_json(self): - """Binary.to_json() should serialize correctly.""" - binary = Binary.objects.create( - machine=self.machine, - name='wget', - abspath='/usr/bin/wget', - version='1.21', - binprovider='apt', - ) - json_data = binary.to_json() - - self.assertEqual(json_data['type'], 'Binary') - self.assertEqual(json_data['name'], 'wget') - self.assertEqual(json_data['abspath'], '/usr/bin/wget') - self.assertEqual(json_data['version'], '1.21') - - def test_binary_from_json_queued(self): - """Binary.from_json() should create queued binary from binaries.jsonl format.""" - record = { - 'name': 'curl', - 'binproviders': 'apt,brew', - 'overrides': {'apt': {'packages': ['curl']}}, - } - - binary = Binary.from_json(record) - - self.assertIsNotNone(binary) - self.assertEqual(binary.name, 'curl') - self.assertEqual(binary.binproviders, 'apt,brew') - self.assertEqual(binary.status, Binary.StatusChoices.QUEUED) - - def test_binary_from_json_installed(self): - """Binary.from_json() should update binary from hook output format.""" - # First create queued binary - Binary.objects.create( - machine=self.machine, - name='node', - ) - - # Then update with hook output - record = { - 'name': 'node', - 'abspath': '/usr/bin/node', - 'version': '18.0.0', - 'binprovider': 'apt', - } - - binary = Binary.from_json(record) - - self.assertIsNotNone(binary) - self.assertEqual(binary.abspath, '/usr/bin/node') - self.assertEqual(binary.version, '18.0.0') - self.assertEqual(binary.status, Binary.StatusChoices.SUCCEEDED) - def test_binary_manager_get_valid_binary(self): """BinaryManager.get_valid_binary() should find valid binaries.""" # Create invalid binary (no abspath) - Binary.objects.create( - machine=self.machine, - name='wget', - ) + Binary.objects.create(machine=self.machine, name='wget') # Create valid binary Binary.objects.create( @@ -288,10 +186,7 @@ class TestBinaryModel(TestCase): def test_binary_update_and_requeue(self): """Binary.update_and_requeue() should update fields and save.""" - binary = Binary.objects.create( - machine=self.machine, - name='test', - ) + binary = Binary.objects.create(machine=self.machine, name='test') old_modified = binary.modified_at binary.update_and_requeue( @@ -328,7 +223,6 @@ class TestBinaryStateMachine(TestCase): sm = BinaryMachine(self.binary) self.assertTrue(sm.can_start()) - # Binary without binproviders self.binary.binproviders = '' self.binary.save() sm = BinaryMachine(self.binary) @@ -336,12 +230,13 @@ class TestBinaryStateMachine(TestCase): class TestProcessModel(TestCase): - """Test the Process model and ProcessMachine state machine.""" + """Test the Process model.""" def setUp(self): """Create a machine for process tests.""" import archivebox.machine.models as models models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None self.machine = Machine.current() def test_process_creation(self): @@ -358,63 +253,24 @@ class TestProcessModel(TestCase): self.assertIsNone(process.pid) self.assertIsNone(process.exit_code) - def test_process_to_json(self): - """Process.to_json() should serialize correctly.""" + def test_process_to_jsonl(self): + """Process.to_jsonl() should serialize correctly.""" process = Process.objects.create( machine=self.machine, cmd=['echo', 'hello'], pwd='/tmp', timeout=60, ) - json_data = process.to_json() + json_data = process.to_jsonl() self.assertEqual(json_data['type'], 'Process') self.assertEqual(json_data['cmd'], ['echo', 'hello']) self.assertEqual(json_data['pwd'], '/tmp') self.assertEqual(json_data['timeout'], 60) - def test_process_to_jsonl_with_binary(self): - """Process.to_jsonl() should include related binary.""" - binary = Binary.objects.create( - machine=self.machine, - name='echo', - abspath='/bin/echo', - version='1.0', - ) - process = Process.objects.create( - machine=self.machine, - cmd=['echo', 'hello'], - binary=binary, - ) - - records = list(process.to_jsonl(binary=True)) - - self.assertEqual(len(records), 2) - types = {r['type'] for r in records} - self.assertIn('Process', types) - self.assertIn('Binary', types) - - def test_process_manager_create_for_archiveresult(self): - """ProcessManager.create_for_archiveresult() should create process.""" - # This test would require an ArchiveResult, which is complex to set up - # For now, test the direct creation path - process = Process.objects.create( - machine=self.machine, - pwd='/tmp/test', - cmd=['wget', 'http://example.com'], - timeout=120, - ) - - self.assertEqual(process.pwd, '/tmp/test') - self.assertEqual(process.cmd, ['wget', 'http://example.com']) - self.assertEqual(process.timeout, 120) - def test_process_update_and_requeue(self): """Process.update_and_requeue() should update fields and save.""" - process = Process.objects.create( - machine=self.machine, - cmd=['test'], - ) + process = Process.objects.create(machine=self.machine, cmd=['test']) old_modified = process.modified_at process.update_and_requeue( @@ -429,6 +285,240 @@ class TestProcessModel(TestCase): self.assertIsNotNone(process.started_at) +class TestProcessCurrent(TestCase): + """Test Process.current() method.""" + + def setUp(self): + """Reset caches.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + + def test_process_current_creates_record(self): + """Process.current() should create a Process for current PID.""" + proc = Process.current() + + self.assertIsNotNone(proc) + self.assertEqual(proc.pid, os.getpid()) + self.assertEqual(proc.status, Process.StatusChoices.RUNNING) + self.assertIsNotNone(proc.machine) + self.assertIsNotNone(proc.started_at) + + def test_process_current_caches(self): + """Process.current() should cache the result.""" + proc1 = Process.current() + proc2 = Process.current() + + self.assertEqual(proc1.id, proc2.id) + + def test_process_detect_type_orchestrator(self): + """_detect_process_type should detect orchestrator.""" + with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR) + + def test_process_detect_type_cli(self): + """_detect_process_type should detect CLI commands.""" + with patch('sys.argv', ['archivebox', 'add', 'http://example.com']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.CLI) + + def test_process_detect_type_worker(self): + """_detect_process_type should detect workers.""" + with patch('sys.argv', ['python', '-m', 'crawl_worker']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.WORKER) + + +class TestProcessHierarchy(TestCase): + """Test Process parent/child relationships.""" + + def setUp(self): + """Create machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + self.machine = Machine.current() + + def test_process_parent_child(self): + """Process should track parent/child relationships.""" + parent = Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.CLI, + status=Process.StatusChoices.RUNNING, + pid=1, + started_at=timezone.now(), + ) + + child = Process.objects.create( + machine=self.machine, + parent=parent, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=2, + started_at=timezone.now(), + ) + + self.assertEqual(child.parent, parent) + self.assertIn(child, parent.children.all()) + + def test_process_root(self): + """Process.root should return the root of the hierarchy.""" + root = Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.CLI, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + child = Process.objects.create( + machine=self.machine, + parent=root, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + grandchild = Process.objects.create( + machine=self.machine, + parent=child, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + + self.assertEqual(grandchild.root, root) + self.assertEqual(child.root, root) + self.assertEqual(root.root, root) + + def test_process_depth(self): + """Process.depth should return depth in tree.""" + root = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + child = Process.objects.create( + machine=self.machine, + parent=root, + status=Process.StatusChoices.RUNNING, + started_at=timezone.now(), + ) + + self.assertEqual(root.depth, 0) + self.assertEqual(child.depth, 1) + + +class TestProcessLifecycle(TestCase): + """Test Process lifecycle methods.""" + + def setUp(self): + """Create machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + self.machine = Machine.current() + + def test_process_is_running_current_pid(self): + """is_running should be True for current PID.""" + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=os.getpid(), + started_at=timezone.now(), + ) + + self.assertTrue(proc.is_running) + + def test_process_is_running_fake_pid(self): + """is_running should be False for non-existent PID.""" + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + self.assertFalse(proc.is_running) + + def test_process_poll_detects_exit(self): + """poll() should detect exited process.""" + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + exit_code = proc.poll() + + self.assertIsNotNone(exit_code) + proc.refresh_from_db() + self.assertEqual(proc.status, Process.StatusChoices.EXITED) + + def test_process_terminate_dead_process(self): + """terminate() should handle already-dead process.""" + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + result = proc.terminate() + + self.assertFalse(result) + proc.refresh_from_db() + self.assertEqual(proc.status, Process.StatusChoices.EXITED) + + +class TestProcessClassMethods(TestCase): + """Test Process class methods for querying.""" + + def setUp(self): + """Create machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + self.machine = Machine.current() + + def test_get_running(self): + """get_running should return running processes.""" + proc = Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + pid=99999, + started_at=timezone.now(), + ) + + running = Process.get_running(process_type=Process.TypeChoices.HOOK) + + self.assertIn(proc, running) + + def test_get_running_count(self): + """get_running_count should count running processes.""" + for i in range(3): + Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + pid=99900 + i, + started_at=timezone.now(), + ) + + count = Process.get_running_count(process_type=Process.TypeChoices.HOOK) + self.assertGreaterEqual(count, 3) + + def test_cleanup_stale_running(self): + """cleanup_stale_running should mark stale processes as exited.""" + stale = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1), + ) + + cleaned = Process.cleanup_stale_running() + + self.assertGreaterEqual(cleaned, 1) + stale.refresh_from_db() + self.assertEqual(stale.status, Process.StatusChoices.EXITED) + + class TestProcessStateMachine(TestCase): """Test the ProcessMachine state machine.""" @@ -453,7 +543,6 @@ class TestProcessStateMachine(TestCase): sm = ProcessMachine(self.process) self.assertTrue(sm.can_start()) - # Process without cmd self.process.cmd = [] self.process.save() sm = ProcessMachine(self.process) diff --git a/archivebox/workers/tests/test_orchestrator.py b/archivebox/workers/tests/test_orchestrator.py index 033ac087..d54331ec 100644 --- a/archivebox/workers/tests/test_orchestrator.py +++ b/archivebox/workers/tests/test_orchestrator.py @@ -6,193 +6,23 @@ Tests cover: 2. Queue polling and worker spawning 3. Idle detection and exit logic 4. Worker registration and management -5. PID file utilities +5. Process model methods (replacing old pid_utils) """ import os import tempfile import time -import signal from pathlib import Path +from datetime import timedelta from unittest.mock import patch, MagicMock import pytest -from django.test import TestCase, override_settings +from django.test import TestCase +from django.utils import timezone -from archivebox.workers.pid_utils import ( - get_pid_dir, - write_pid_file, - read_pid_file, - remove_pid_file, - is_process_alive, - get_all_pid_files, - get_all_worker_pids, - cleanup_stale_pid_files, - get_running_worker_count, - get_next_worker_id, - stop_worker, -) from archivebox.workers.orchestrator import Orchestrator -class TestPidUtils(TestCase): - """Test PID file utility functions.""" - - def setUp(self): - """Create a temporary directory for PID files.""" - self.temp_dir = tempfile.mkdtemp() - self.pid_dir_patch = patch( - 'archivebox.workers.pid_utils.get_pid_dir', - return_value=Path(self.temp_dir) - ) - self.pid_dir_patch.start() - - def tearDown(self): - """Clean up temporary directory.""" - self.pid_dir_patch.stop() - import shutil - shutil.rmtree(self.temp_dir, ignore_errors=True) - - def test_write_pid_file_orchestrator(self): - """write_pid_file should create orchestrator.pid for orchestrator.""" - pid_file = write_pid_file('orchestrator') - - self.assertTrue(pid_file.exists()) - self.assertEqual(pid_file.name, 'orchestrator.pid') - - content = pid_file.read_text().strip().split('\n') - self.assertEqual(int(content[0]), os.getpid()) - self.assertEqual(content[1], 'orchestrator') - - def test_write_pid_file_worker(self): - """write_pid_file should create numbered pid file for workers.""" - pid_file = write_pid_file('snapshot', worker_id=3) - - self.assertTrue(pid_file.exists()) - self.assertEqual(pid_file.name, 'snapshot_worker_3.pid') - - def test_write_pid_file_with_extractor(self): - """write_pid_file should include extractor in content.""" - pid_file = write_pid_file('archiveresult', worker_id=0, extractor='singlefile') - - content = pid_file.read_text().strip().split('\n') - self.assertEqual(content[2], 'singlefile') - - def test_read_pid_file_valid(self): - """read_pid_file should parse valid PID files.""" - pid_file = write_pid_file('snapshot', worker_id=1) - info = read_pid_file(pid_file) - - self.assertIsNotNone(info) - self.assertEqual(info['pid'], os.getpid()) - self.assertEqual(info['worker_type'], 'snapshot') - self.assertEqual(info['pid_file'], pid_file) - self.assertIsNotNone(info['started_at']) - - def test_read_pid_file_invalid(self): - """read_pid_file should return None for invalid files.""" - invalid_file = Path(self.temp_dir) / 'invalid.pid' - invalid_file.write_text('not valid') - - info = read_pid_file(invalid_file) - self.assertIsNone(info) - - def test_read_pid_file_nonexistent(self): - """read_pid_file should return None for nonexistent files.""" - info = read_pid_file(Path(self.temp_dir) / 'nonexistent.pid') - self.assertIsNone(info) - - def test_remove_pid_file(self): - """remove_pid_file should delete the file.""" - pid_file = write_pid_file('test', worker_id=0) - self.assertTrue(pid_file.exists()) - - remove_pid_file(pid_file) - self.assertFalse(pid_file.exists()) - - def test_remove_pid_file_nonexistent(self): - """remove_pid_file should not raise for nonexistent files.""" - # Should not raise - remove_pid_file(Path(self.temp_dir) / 'nonexistent.pid') - - def test_is_process_alive_current(self): - """is_process_alive should return True for current process.""" - self.assertTrue(is_process_alive(os.getpid())) - - def test_is_process_alive_dead(self): - """is_process_alive should return False for dead processes.""" - # PID 999999 is unlikely to exist - self.assertFalse(is_process_alive(999999)) - - def test_get_all_pid_files(self): - """get_all_pid_files should return all .pid files.""" - write_pid_file('orchestrator') - write_pid_file('snapshot', worker_id=0) - write_pid_file('crawl', worker_id=1) - - files = get_all_pid_files() - self.assertEqual(len(files), 3) - - def test_get_all_worker_pids(self): - """get_all_worker_pids should return info for live workers.""" - write_pid_file('snapshot', worker_id=0) - write_pid_file('crawl', worker_id=1) - - workers = get_all_worker_pids() - # All should be alive since they're current process PID - self.assertEqual(len(workers), 2) - - def test_get_all_worker_pids_filtered(self): - """get_all_worker_pids should filter by worker type.""" - write_pid_file('snapshot', worker_id=0) - write_pid_file('snapshot', worker_id=1) - write_pid_file('crawl', worker_id=0) - - snapshot_workers = get_all_worker_pids('snapshot') - self.assertEqual(len(snapshot_workers), 2) - - crawl_workers = get_all_worker_pids('crawl') - self.assertEqual(len(crawl_workers), 1) - - def test_cleanup_stale_pid_files(self): - """cleanup_stale_pid_files should remove files for dead processes.""" - # Create a PID file with a dead PID - stale_file = Path(self.temp_dir) / 'stale_worker_0.pid' - stale_file.write_text('999999\nstale\n\n2024-01-01T00:00:00+00:00\n') - - # Create a valid PID file (current process) - write_pid_file('valid', worker_id=0) - - removed = cleanup_stale_pid_files() - - self.assertEqual(removed, 1) - self.assertFalse(stale_file.exists()) - - def test_get_running_worker_count(self): - """get_running_worker_count should count workers of a type.""" - write_pid_file('snapshot', worker_id=0) - write_pid_file('snapshot', worker_id=1) - write_pid_file('crawl', worker_id=0) - - self.assertEqual(get_running_worker_count('snapshot'), 2) - self.assertEqual(get_running_worker_count('crawl'), 1) - self.assertEqual(get_running_worker_count('archiveresult'), 0) - - def test_get_next_worker_id(self): - """get_next_worker_id should find lowest unused ID.""" - write_pid_file('snapshot', worker_id=0) - write_pid_file('snapshot', worker_id=1) - write_pid_file('snapshot', worker_id=3) # Skip 2 - - next_id = get_next_worker_id('snapshot') - self.assertEqual(next_id, 2) - - def test_get_next_worker_id_empty(self): - """get_next_worker_id should return 0 if no workers exist.""" - next_id = get_next_worker_id('snapshot') - self.assertEqual(next_id, 0) - - class TestOrchestratorUnit(TestCase): """Unit tests for Orchestrator class (mocked dependencies).""" @@ -323,41 +153,300 @@ class TestOrchestratorUnit(TestCase): self.assertFalse(orchestrator.should_spawn_worker(mock_worker, 3)) -class TestOrchestratorIsRunning(TestCase): - """Test Orchestrator.is_running() class method.""" +class TestOrchestratorWithProcess(TestCase): + """Test Orchestrator using Process model for tracking.""" def setUp(self): - """Create a temporary directory for PID files.""" - self.temp_dir = tempfile.mkdtemp() - self.pid_dir_patch = patch( - 'archivebox.workers.pid_utils.get_pid_dir', - return_value=Path(self.temp_dir) + """Reset process cache.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + + def test_is_running_no_orchestrator(self): + """is_running should return False when no orchestrator process exists.""" + from archivebox.machine.models import Process + + # Clean up any stale processes first + Process.cleanup_stale_running() + + # Mark any running orchestrators as exited for clean test state + Process.objects.filter( + process_type=Process.TypeChoices.ORCHESTRATOR, + status=Process.StatusChoices.RUNNING + ).update(status=Process.StatusChoices.EXITED) + + self.assertFalse(Orchestrator.is_running()) + + def test_is_running_with_orchestrator_process(self): + """is_running should return True when orchestrator Process exists.""" + from archivebox.machine.models import Process, Machine + + machine = Machine.current() + + # Create an orchestrator Process record + proc = Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.ORCHESTRATOR, + status=Process.StatusChoices.RUNNING, + pid=os.getpid(), # Use current PID so it appears alive + started_at=timezone.now(), + cmd=['archivebox', 'manage', 'orchestrator'], ) - self.pid_dir_patch.start() - def tearDown(self): - """Clean up.""" - self.pid_dir_patch.stop() - import shutil - shutil.rmtree(self.temp_dir, ignore_errors=True) + try: + # Should detect running orchestrator + self.assertTrue(Orchestrator.is_running()) + finally: + # Clean up + proc.status = Process.StatusChoices.EXITED + proc.save() - def test_is_running_no_pid_file(self): - """is_running should return False when no orchestrator PID file.""" - self.assertFalse(Orchestrator.is_running()) + def test_orchestrator_uses_process_for_is_running(self): + """Orchestrator.is_running should use Process.get_running_count.""" + from archivebox.machine.models import Process - def test_is_running_with_live_orchestrator(self): - """is_running should return True when orchestrator PID file exists.""" - write_pid_file('orchestrator') - self.assertTrue(Orchestrator.is_running()) + # Verify is_running uses Process model, not pid files + with patch.object(Process, 'get_running_count') as mock_count: + mock_count.return_value = 1 - def test_is_running_with_dead_orchestrator(self): - """is_running should return False when orchestrator process is dead.""" - # Create a PID file with a dead PID - pid_file = Path(self.temp_dir) / 'orchestrator.pid' - pid_file.write_text('999999\norchestrator\n\n2024-01-01T00:00:00+00:00\n') + result = Orchestrator.is_running() - # The get_all_worker_pids filters out dead processes - self.assertFalse(Orchestrator.is_running()) + # Should have called Process.get_running_count with orchestrator type + mock_count.assert_called() + self.assertTrue(result) + + +class TestProcessBasedWorkerTracking(TestCase): + """Test Process model methods that replace pid_utils functionality.""" + + def setUp(self): + """Reset caches.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + + def test_process_current_creates_record(self): + """Process.current() should create a Process record for current PID.""" + from archivebox.machine.models import Process + + proc = Process.current() + + self.assertIsNotNone(proc) + self.assertEqual(proc.pid, os.getpid()) + self.assertEqual(proc.status, Process.StatusChoices.RUNNING) + self.assertIsNotNone(proc.machine) + self.assertIsNotNone(proc.started_at) + + def test_process_current_caches_result(self): + """Process.current() should return cached Process within interval.""" + from archivebox.machine.models import Process + + proc1 = Process.current() + proc2 = Process.current() + + self.assertEqual(proc1.id, proc2.id) + + def test_process_get_running_count(self): + """Process.get_running_count should count running processes by type.""" + from archivebox.machine.models import Process, Machine + + machine = Machine.current() + + # Create some worker processes + for i in range(3): + Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=99990 + i, # Fake PIDs + started_at=timezone.now(), + ) + + count = Process.get_running_count(process_type=Process.TypeChoices.WORKER) + self.assertGreaterEqual(count, 3) + + def test_process_get_next_worker_id(self): + """Process.get_next_worker_id should return count of running workers.""" + from archivebox.machine.models import Process, Machine + + machine = Machine.current() + + # Create 2 worker processes + for i in range(2): + Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=99980 + i, + started_at=timezone.now(), + ) + + next_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER) + self.assertGreaterEqual(next_id, 2) + + def test_process_cleanup_stale_running(self): + """Process.cleanup_stale_running should mark stale processes as exited.""" + from archivebox.machine.models import Process, Machine, PID_REUSE_WINDOW + + machine = Machine.current() + + # Create a stale process (old started_at, fake PID) + stale_proc = Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=999999, # Fake PID that doesn't exist + started_at=timezone.now() - PID_REUSE_WINDOW - timedelta(hours=1), + ) + + cleaned = Process.cleanup_stale_running() + + self.assertGreaterEqual(cleaned, 1) + + stale_proc.refresh_from_db() + self.assertEqual(stale_proc.status, Process.StatusChoices.EXITED) + + def test_process_get_running(self): + """Process.get_running should return queryset of running processes.""" + from archivebox.machine.models import Process, Machine + + machine = Machine.current() + + # Create a running process + proc = Process.objects.create( + machine=machine, + process_type=Process.TypeChoices.HOOK, + status=Process.StatusChoices.RUNNING, + pid=99970, + started_at=timezone.now(), + ) + + running = Process.get_running(process_type=Process.TypeChoices.HOOK) + + self.assertIn(proc, running) + + def test_process_type_detection(self): + """Process._detect_process_type should detect process type from argv.""" + from archivebox.machine.models import Process + + # Test detection logic + with patch('sys.argv', ['archivebox', 'manage', 'orchestrator']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.ORCHESTRATOR) + + with patch('sys.argv', ['archivebox', 'add', 'http://example.com']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.CLI) + + with patch('sys.argv', ['supervisord', '-c', 'config.ini']): + result = Process._detect_process_type() + self.assertEqual(result, Process.TypeChoices.SUPERVISORD) + + +class TestProcessLifecycle(TestCase): + """Test Process model lifecycle methods.""" + + def setUp(self): + """Reset caches and create a machine.""" + import archivebox.machine.models as models + models._CURRENT_MACHINE = None + models._CURRENT_PROCESS = None + self.machine = models.Machine.current() + + def test_process_is_running_property(self): + """Process.is_running should check actual OS process.""" + from archivebox.machine.models import Process + + # Create a process with current PID (should be running) + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=os.getpid(), + started_at=timezone.now(), + ) + + # Should be running (current process exists) + self.assertTrue(proc.is_running) + + # Create a process with fake PID + fake_proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + # Should not be running (PID doesn't exist) + self.assertFalse(fake_proc.is_running) + + def test_process_poll(self): + """Process.poll should check and update exit status.""" + from archivebox.machine.models import Process + + # Create a process with fake PID (already exited) + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + exit_code = proc.poll() + + # Should have detected exit and updated status + self.assertIsNotNone(exit_code) + proc.refresh_from_db() + self.assertEqual(proc.status, Process.StatusChoices.EXITED) + + def test_process_terminate_already_dead(self): + """Process.terminate should handle already-dead processes.""" + from archivebox.machine.models import Process + + # Create a process with fake PID + proc = Process.objects.create( + machine=self.machine, + status=Process.StatusChoices.RUNNING, + pid=999999, + started_at=timezone.now(), + ) + + result = proc.terminate() + + # Should return False (was already dead) + self.assertFalse(result) + + proc.refresh_from_db() + self.assertEqual(proc.status, Process.StatusChoices.EXITED) + + def test_process_tree_traversal(self): + """Process parent/children relationships should work.""" + from archivebox.machine.models import Process + + # Create parent process + parent = Process.objects.create( + machine=self.machine, + process_type=Process.TypeChoices.CLI, + status=Process.StatusChoices.RUNNING, + pid=1, + started_at=timezone.now(), + ) + + # Create child process + child = Process.objects.create( + machine=self.machine, + parent=parent, + process_type=Process.TypeChoices.WORKER, + status=Process.StatusChoices.RUNNING, + pid=2, + started_at=timezone.now(), + ) + + # Test relationships + self.assertEqual(child.parent, parent) + self.assertIn(child, parent.children.all()) + self.assertEqual(child.root, parent) + self.assertEqual(child.depth, 1) + self.assertEqual(parent.depth, 0) if __name__ == '__main__':