Fix cubic review issues: process_type detection, cmd storage, PID cleanup, and migration

- Fix Process.current() to store psutil cmdline instead of sys.argv for accurate validation
- Fix worker process_type detection: explicitly set to WORKER after registration
- Fix ArchiveResultWorker.start() to use Process.TypeChoices.WORKER consistently
- Fix migration to be explicitly irreversible (SQLite doesn't support DROP COLUMN)
- Fix get_running_workers() to return process_id instead of incorrectly named worker_id
- Fix safe_kill_process() to wait for termination and escalate to SIGKILL if needed
- Fix migration to include all indexes in state_operations (parent_id, process_type)
- Fix documentation to use Machine.current() scoping and StatusChoices constants

Co-authored-by: Nick Sweeting <pirate@users.noreply.github.com>
This commit is contained in:
claude[bot]
2025-12-31 11:42:07 +00:00
parent 5121b0e5f9
commit b2132d1f14
6 changed files with 88 additions and 26 deletions

View File

@@ -1728,8 +1728,8 @@ The goal is to consolidate all subprocess management into `Process` model method
| `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` |
| `remove_pid_file(path)` | Manual cleanup in `Process.kill()` and legacy hook cleanup code |
| `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` |
| `get_all_pid_files()` | `Process.objects.filter(status='running')` |
| `get_all_worker_pids(type)` | `Process.objects.filter(process_type=type, status='running')` |
| `get_all_pid_files()` | `Process.objects.filter(machine=Machine.current(), status=Process.StatusChoices.RUNNING)` |
| `get_all_worker_pids(type)` | `Process.objects.filter(machine=Machine.current(), process_type=type, status=Process.StatusChoices.RUNNING)` |
| `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` |
| `get_running_worker_count(type)` | `Process.objects.filter(...).count()` |
| `get_next_worker_id(type)` | Use `Max(worker_id)+1` under transaction or DB sequence to avoid race conditions |
@@ -1808,7 +1808,7 @@ for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
def cleanup(self):
# Kill all running child processes for this crawl
for snapshot in self.snapshot_set.all():
for ar in snapshot.archiveresult_set.filter(status='started'):
for ar in snapshot.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED):
if ar.process_id:
# Kill hook process and all its children
ar.process.kill()

View File

@@ -424,9 +424,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
if self.OUTPUT_DIR.exists():
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
cmd_file = pid_file.parent / 'cmd.sh'
# Only delete PID file if kill succeeded or process is already dead
# safe_kill_process now waits for termination and escalates to SIGKILL
# Returns True only if process is confirmed dead
killed = safe_kill_process(pid_file, cmd_file)
if killed or not pid_file.exists():
if killed:
pid_file.unlink(missing_ok=True)
# Run on_CrawlEnd hooks

View File

@@ -30,17 +30,9 @@ class Migration(migrations.Migration):
-- Add composite index for machine + pid + started_at (for PID reuse protection)
CREATE INDEX IF NOT EXISTS machine_process_machine_pid_started_idx ON machine_process(machine_id, pid, started_at);
""",
reverse_sql="""
DROP INDEX IF EXISTS machine_process_machine_pid_started_idx;
DROP INDEX IF EXISTS machine_process_parent_status_idx;
DROP INDEX IF EXISTS machine_process_process_type_idx;
DROP INDEX IF EXISTS machine_process_parent_id_idx;
-- SQLite doesn't support DROP COLUMN directly, but we record the intent
-- In practice, this migration is forward-only for SQLite
-- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN process_type;
-- For PostgreSQL/MySQL: ALTER TABLE machine_process DROP COLUMN parent_id;
"""
# Migration is irreversible due to SQLite limitations
# SQLite doesn't support DROP COLUMN, would require table rebuild
reverse_sql=migrations.RunSQL.noop
),
],
state_operations=[
@@ -75,7 +67,21 @@ class Migration(migrations.Migration):
max_length=16,
),
),
# Add indexes
# Add indexes - must match the SQL index names exactly
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['parent'],
name='machine_process_parent_id_idx',
),
),
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['process_type'],
name='machine_process_process_type_idx',
),
),
migrations.AddIndex(
model_name='process',
index=models.Index(

View File

@@ -812,6 +812,16 @@ class Process(ModelWithHealthStats):
parent = cls._find_parent_process(machine)
process_type = cls._detect_process_type()
# Use psutil cmdline if available (matches what proc() will validate against)
# Otherwise fall back to sys.argv
cmd = sys.argv
if PSUTIL_AVAILABLE:
try:
os_proc = psutil.Process(current_pid)
cmd = os_proc.cmdline()
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Use psutil start time if available (more accurate than timezone.now())
if os_start_time:
started_at = datetime.fromtimestamp(os_start_time, tz=timezone.get_current_timezone())
@@ -822,7 +832,7 @@ class Process(ModelWithHealthStats):
machine=machine,
parent=parent,
process_type=process_type,
cmd=sys.argv,
cmd=cmd,
pwd=os.getcwd(),
pid=current_pid,
started_at=started_at,

View File

@@ -70,15 +70,54 @@ def write_cmd_file(cmd_file: Path, cmd: list[str]):
pass
def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15) -> bool:
"""Kill process after validation. Returns True if killed."""
def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15, timeout: float = 3.0) -> bool:
"""
Kill process after validation, with graceful wait and SIGKILL escalation.
Returns True only if process is confirmed dead (either already dead or killed successfully).
"""
import time
import signal
if not validate_pid_file(pid_file, cmd_file):
pid_file.unlink(missing_ok=True) # Clean stale file
return False
return True # Process already dead, consider it killed
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, signal_num)
return True
except (OSError, ValueError, ProcessLookupError):
# Send initial signal (SIGTERM by default)
try:
os.kill(pid, signal_num)
except ProcessLookupError:
# Process already dead
return True
# Wait for process to terminate gracefully
start_time = time.time()
while time.time() - start_time < timeout:
try:
os.kill(pid, 0) # Check if process still exists
time.sleep(0.1)
except ProcessLookupError:
# Process terminated
return True
# Process didn't terminate, escalate to SIGKILL
try:
os.kill(pid, signal.SIGKILL)
time.sleep(0.5) # Brief wait after SIGKILL
# Verify it's dead
try:
os.kill(pid, 0)
# Process still alive after SIGKILL - this is unusual
return False
except ProcessLookupError:
# Process finally dead
return True
except ProcessLookupError:
# Process died between timeout and SIGKILL
return True
except (OSError, ValueError):
return False

View File

@@ -131,6 +131,10 @@ class Worker:
self.pid = os.getpid()
# Register this worker process in the database
self.db_process = Process.current()
# Explicitly set process_type to WORKER to prevent mis-detection
if self.db_process.process_type != Process.TypeChoices.WORKER:
self.db_process.process_type = Process.TypeChoices.WORKER
self.db_process.save(update_fields=['process_type'])
# Determine worker type for logging
worker_type_name = self.__class__.__name__
@@ -312,10 +316,12 @@ class Worker:
Process.cleanup_stale_running()
# Convert Process objects to dicts to match the expected API contract
processes = Process.get_running(process_type=Process.TypeChoices.WORKER)
# Note: worker_id is not stored on Process model, it's dynamically generated
# We return process_id (UUID) and pid (OS process ID) instead
return [
{
'pid': p.pid,
'worker_id': p.id,
'process_id': str(p.id), # UUID of Process record
'started_at': p.started_at.isoformat() if p.started_at else None,
'status': p.status,
}
@@ -420,7 +426,7 @@ class ArchiveResultWorker(Worker):
from archivebox.machine.models import Process
if worker_id is None:
worker_id = Process.get_next_worker_id(process_type=cls.name)
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
# Use module-level function for pickling compatibility
proc = MPProcess(