Delete pid_utils.py and migrate to Process model

DELETED:
- workers/pid_utils.py (-192 lines) - replaced by Process model methods

SIMPLIFIED:
- crawls/models.py Crawl.cleanup() (80 lines -> 10 lines)
- hooks.py: deleted process_is_alive() and kill_process() (-45 lines)

UPDATED to use Process model:
- core/models.py: Snapshot.cleanup() and has_running_background_hooks()
- machine/models.py: Binary.cleanup()
- workers/worker.py: Worker.on_startup/shutdown, get_running_workers, start
- workers/orchestrator.py: Orchestrator.on_startup/shutdown, is_running

All subprocess management now uses:
- Process.current() for registering current process
- Process.get_running() / get_running_count() for querying
- Process.cleanup_stale_running() for cleanup
- safe_kill_process() for validated PID killing

Total line reduction: ~250 lines
This commit is contained in:
Claude
2025-12-31 10:15:22 +00:00
parent 2d3a2fec57
commit b822352fc3
7 changed files with 63 additions and 359 deletions

View File

@@ -30,7 +30,7 @@ __package__ = 'archivebox.workers'
import os
import time
from typing import Type
from multiprocessing import Process
from multiprocessing import Process as MPProcess
from django.utils import timezone
@@ -38,12 +38,6 @@ from rich import print
from archivebox.misc.logging_util import log_worker_event
from .worker import Worker, CrawlWorker, SnapshotWorker, ArchiveResultWorker
from .pid_utils import (
write_pid_file,
remove_pid_file,
get_all_worker_pids,
cleanup_stale_pid_files,
)
def _run_orchestrator_process(exit_on_idle: bool) -> None:
@@ -85,16 +79,20 @@ class Orchestrator:
@classmethod
def is_running(cls) -> bool:
"""Check if an orchestrator is already running."""
workers = get_all_worker_pids('orchestrator')
return len(workers) > 0
from archivebox.machine.models import Process
return Process.get_running_count(process_type='orchestrator') > 0
def on_startup(self) -> None:
"""Called when orchestrator starts."""
self.pid = os.getpid()
self.pid_file = write_pid_file('orchestrator', worker_id=0)
from archivebox.machine.models import Process
# Clean up any stale PID files from previous runs
stale_count = cleanup_stale_pid_files()
self.pid = os.getpid()
# Register orchestrator process in database
self.db_process = Process.current()
# Clean up any stale Process records from previous runs
stale_count = Process.cleanup_stale_running()
# Collect startup metadata
metadata = {
@@ -112,11 +110,15 @@ class Orchestrator:
pid=self.pid,
metadata=metadata,
)
def on_shutdown(self, error: BaseException | None = None) -> None:
"""Called when orchestrator shuts down."""
if self.pid_file:
remove_pid_file(self.pid_file)
# Update Process record status
if hasattr(self, 'db_process') and self.db_process:
self.db_process.exit_code = 1 if error else 0
self.db_process.status = self.db_process.StatusChoices.EXITED
self.db_process.ended_at = timezone.now()
self.db_process.save()
log_worker_event(
worker_type='Orchestrator',
@@ -125,10 +127,12 @@ class Orchestrator:
pid=self.pid,
error=error if error and not isinstance(error, KeyboardInterrupt) else None,
)
def get_total_worker_count(self) -> int:
"""Get total count of running workers across all types."""
cleanup_stale_pid_files()
from archivebox.machine.models import Process
Process.cleanup_stale_running()
return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES)
def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool:
@@ -287,7 +291,7 @@ class Orchestrator:
Returns the PID of the new process.
"""
# Use module-level function to avoid pickle errors with local functions
proc = Process(
proc = MPProcess(
target=_run_orchestrator_process,
args=(self.exit_on_idle,),
name='orchestrator'

View File

@@ -1,191 +0,0 @@
"""
PID file utilities for tracking worker and orchestrator processes.
PID files are stored in data/tmp/workers/ and contain:
- Line 1: PID
- Line 2: Worker type (orchestrator, crawl, snapshot, archiveresult)
- Line 3: Extractor filter (optional, for archiveresult workers)
- Line 4: Started at ISO timestamp
"""
__package__ = 'archivebox.workers'
import os
import signal
from pathlib import Path
from datetime import datetime, timezone
from django.conf import settings
def get_pid_dir() -> Path:
"""Get the directory for PID files, creating it if needed."""
pid_dir = Path(settings.DATA_DIR) / 'tmp' / 'workers'
pid_dir.mkdir(parents=True, exist_ok=True)
return pid_dir
def write_pid_file(worker_type: str, worker_id: int = 0, extractor: str | None = None) -> Path:
"""
Write a PID file for the current process.
Returns the path to the PID file.
"""
pid_dir = get_pid_dir()
if worker_type == 'orchestrator':
pid_file = pid_dir / 'orchestrator.pid'
else:
pid_file = pid_dir / f'{worker_type}_worker_{worker_id}.pid'
content = f"{os.getpid()}\n{worker_type}\n{extractor or ''}\n{datetime.now(timezone.utc).isoformat()}\n"
pid_file.write_text(content)
return pid_file
def read_pid_file(path: Path) -> dict | None:
"""
Read and parse a PID file.
Returns dict with pid, worker_type, extractor, started_at or None if invalid.
"""
try:
if not path.exists():
return None
lines = path.read_text().strip().split('\n')
if len(lines) < 4:
return None
return {
'pid': int(lines[0]),
'worker_type': lines[1],
'extractor': lines[2] or None,
'started_at': datetime.fromisoformat(lines[3]),
'pid_file': path,
}
except (ValueError, IndexError, OSError):
return None
def remove_pid_file(path: Path) -> None:
"""Remove a PID file if it exists."""
try:
path.unlink(missing_ok=True)
except OSError:
pass
def is_process_alive(pid: int) -> bool:
"""Check if a process with the given PID is still running."""
try:
os.kill(pid, 0) # Signal 0 doesn't kill, just checks
return True
except (OSError, ProcessLookupError):
return False
def get_all_pid_files() -> list[Path]:
"""Get all PID files in the workers directory."""
pid_dir = get_pid_dir()
return list(pid_dir.glob('*.pid'))
def get_all_worker_pids(worker_type: str | None = None) -> list[dict]:
"""
Get info about all running workers.
Optionally filter by worker_type.
"""
workers = []
for pid_file in get_all_pid_files():
info = read_pid_file(pid_file)
if info is None:
continue
# Skip if process is dead
if not is_process_alive(info['pid']):
continue
# Filter by type if specified
if worker_type and info['worker_type'] != worker_type:
continue
workers.append(info)
return workers
def cleanup_stale_pid_files() -> int:
"""
Remove PID files for processes that are no longer running.
Returns the number of stale files removed.
"""
removed = 0
for pid_file in get_all_pid_files():
info = read_pid_file(pid_file)
if info is None:
# Invalid PID file, remove it
remove_pid_file(pid_file)
removed += 1
continue
if not is_process_alive(info['pid']):
remove_pid_file(pid_file)
removed += 1
return removed
def get_running_worker_count(worker_type: str) -> int:
"""Get the count of running workers of a specific type."""
return len(get_all_worker_pids(worker_type))
def get_next_worker_id(worker_type: str) -> int:
"""Get the next available worker ID for a given type."""
existing_ids = set()
for pid_file in get_all_pid_files():
# Parse worker ID from filename like "snapshot_worker_3.pid"
name = pid_file.stem
if name.startswith(f'{worker_type}_worker_'):
try:
worker_id = int(name.split('_')[-1])
existing_ids.add(worker_id)
except ValueError:
continue
# Find the lowest unused ID
next_id = 0
while next_id in existing_ids:
next_id += 1
return next_id
def stop_worker(pid: int, graceful: bool = True) -> bool:
"""
Stop a worker process.
If graceful=True, sends SIGTERM first, then SIGKILL after timeout.
Returns True if process was stopped.
"""
if not is_process_alive(pid):
return True
try:
if graceful:
os.kill(pid, signal.SIGTERM)
# Give it a moment to shut down
import time
for _ in range(10): # Wait up to 1 second
time.sleep(0.1)
if not is_process_alive(pid):
return True
# Force kill if still running
os.kill(pid, signal.SIGKILL)
else:
os.kill(pid, signal.SIGKILL)
return True
except (OSError, ProcessLookupError):
return True # Process already dead

View File

@@ -17,7 +17,7 @@ import traceback
from typing import ClassVar, Any
from datetime import timedelta
from pathlib import Path
from multiprocessing import Process, cpu_count
from multiprocessing import Process as MPProcess, cpu_count
from django.db.models import QuerySet
from django.utils import timezone
@@ -26,13 +26,6 @@ from django.conf import settings
from rich import print
from archivebox.misc.logging_util import log_worker_event
from .pid_utils import (
write_pid_file,
remove_pid_file,
get_all_worker_pids,
get_next_worker_id,
cleanup_stale_pid_files,
)
CPU_COUNT = cpu_count()
@@ -133,8 +126,11 @@ class Worker:
def on_startup(self) -> None:
"""Called when worker starts."""
from archivebox.machine.models import Process
self.pid = os.getpid()
self.pid_file = write_pid_file(self.name, self.worker_id)
# Register this worker process in the database
self.db_process = Process.current()
# Determine worker type for logging
worker_type_name = self.__class__.__name__
@@ -160,9 +156,12 @@ class Worker:
def on_shutdown(self, error: BaseException | None = None) -> None:
"""Called when worker shuts down."""
# Remove PID file
if self.pid_file:
remove_pid_file(self.pid_file)
# Update Process record status
if hasattr(self, 'db_process') and self.db_process:
self.db_process.exit_code = 1 if error else 0
self.db_process.status = self.db_process.StatusChoices.EXITED
self.db_process.ended_at = timezone.now()
self.db_process.save()
# Determine worker type for logging
worker_type_name = self.__class__.__name__
@@ -288,11 +287,13 @@ class Worker:
Fork a new worker as a subprocess.
Returns the PID of the new process.
"""
from archivebox.machine.models import Process
if worker_id is None:
worker_id = get_next_worker_id(cls.name)
worker_id = Process.get_next_worker_id(process_type=cls.name)
# Use module-level function for pickling compatibility
proc = Process(
proc = MPProcess(
target=_run_worker,
args=(cls.name, worker_id, daemon),
kwargs=kwargs,
@@ -304,15 +305,19 @@ class Worker:
return proc.pid
@classmethod
def get_running_workers(cls) -> list[dict]:
def get_running_workers(cls) -> list:
"""Get info about all running workers of this type."""
cleanup_stale_pid_files()
return get_all_worker_pids(cls.name)
from archivebox.machine.models import Process
Process.cleanup_stale_running()
return list(Process.get_running(process_type=cls.name))
@classmethod
def get_worker_count(cls) -> int:
"""Get count of running workers of this type."""
return len(cls.get_running_workers())
from archivebox.machine.models import Process
return Process.get_running_count(process_type=cls.name)
class CrawlWorker(Worker):
@@ -402,11 +407,13 @@ class ArchiveResultWorker(Worker):
@classmethod
def start(cls, worker_id: int | None = None, daemon: bool = False, plugin: str | None = None, **kwargs: Any) -> int:
"""Fork a new worker as subprocess with optional plugin filter."""
from archivebox.machine.models import Process
if worker_id is None:
worker_id = get_next_worker_id(cls.name)
worker_id = Process.get_next_worker_id(process_type=cls.name)
# Use module-level function for pickling compatibility
proc = Process(
proc = MPProcess(
target=_run_worker,
args=(cls.name, worker_id, daemon),
kwargs={'plugin': plugin, **kwargs},