remove Seed model in favor of Crawl as template

This commit is contained in:
Nick Sweeting
2025-12-25 01:52:38 -08:00
parent 28e6c5bb65
commit bb53228ebf
30 changed files with 785 additions and 690 deletions

View File

@@ -26,6 +26,9 @@ CONFIG_FILE_NAME = "supervisord.conf"
PID_FILE_NAME = "supervisord.pid"
WORKERS_DIR_NAME = "workers"
# Global reference to supervisord process for cleanup
_supervisord_proc = None
ORCHESTRATOR_WORKER = {
"name": "worker_orchestrator",
"command": "archivebox manage orchestrator", # runs forever by default
@@ -78,7 +81,7 @@ def create_supervisord_config():
config_content = f"""
[supervisord]
nodaemon = true
environment = IS_SUPERVISORD_PARENT="true"
environment = IS_SUPERVISORD_PARENT="true",COLUMNS="200"
pidfile = {PID_FILE}
logfile = {LOG_FILE}
childlogdir = {CONSTANTS.LOGS_DIR}
@@ -143,11 +146,27 @@ def get_existing_supervisord_process():
return None
def stop_existing_supervisord_process():
global _supervisord_proc
SOCK_FILE = get_sock_file()
PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
try:
# if pid file exists, load PID int
# First try to stop via the global proc reference
if _supervisord_proc and _supervisord_proc.poll() is None:
try:
print(f"[🦸‍♂️] Stopping supervisord process (pid={_supervisord_proc.pid})...")
_supervisord_proc.terminate()
try:
_supervisord_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
_supervisord_proc.kill()
_supervisord_proc.wait(timeout=2)
except (BaseException, BrokenPipeError, IOError, KeyboardInterrupt):
pass
_supervisord_proc = None
return
# Fallback: if pid file exists, load PID int and kill that process
try:
pid = int(PID_FILE.read_text())
except (FileNotFoundError, ValueError):
@@ -156,8 +175,25 @@ def stop_existing_supervisord_process():
try:
print(f"[🦸‍♂️] Stopping supervisord process (pid={pid})...")
proc = psutil.Process(pid)
# Kill the entire process group to ensure all children are stopped
children = proc.children(recursive=True)
proc.terminate()
# Also terminate all children
for child in children:
try:
child.terminate()
except psutil.NoSuchProcess:
pass
proc.wait(timeout=5)
# Kill any remaining children
for child in children:
try:
if child.is_running():
child.kill()
except psutil.NoSuchProcess:
pass
except psutil.NoSuchProcess:
pass
except (BaseException, BrokenPipeError, IOError, KeyboardInterrupt):
pass
finally:
@@ -174,7 +210,7 @@ def start_new_supervisord_process(daemonize=False):
LOG_FILE = CONSTANTS.LOGS_DIR / LOG_FILE_NAME
CONFIG_FILE = SOCK_FILE.parent / CONFIG_FILE_NAME
PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
print(f"[🦸‍♂️] Supervisord starting{' in background' if daemonize else ''}...")
pretty_log_path = pretty_path(LOG_FILE)
print(f" > Writing supervisord logs to: {pretty_log_path}")
@@ -182,50 +218,54 @@ def start_new_supervisord_process(daemonize=False):
print(f' > Using supervisord config file: {pretty_path(CONFIG_FILE)}')
print(f" > Using supervisord UNIX socket: {pretty_path(SOCK_FILE)}")
print()
# clear out existing stale state files
shutil.rmtree(WORKERS_DIR, ignore_errors=True)
PID_FILE.unlink(missing_ok=True)
get_sock_file().unlink(missing_ok=True)
CONFIG_FILE.unlink(missing_ok=True)
# create the supervisord config file
create_supervisord_config()
# Start supervisord
# panel = Panel(f"Starting supervisord with config: {SUPERVISORD_CONFIG_FILE}")
# with Live(panel, refresh_per_second=1) as live:
subprocess.Popen(
f"supervisord --configuration={CONFIG_FILE}",
stdin=None,
shell=True,
start_new_session=daemonize,
)
# Open log file for supervisord output
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
log_handle = open(LOG_FILE, 'a')
def exit_signal_handler(signum, frame):
if signum == 2:
STDERR.print("\n[🛑] Got Ctrl+C. Terminating child processes...")
elif signum != 13:
STDERR.print(f"\n[🦸‍♂️] Supervisord got stop signal ({signal.strsignal(signum)}). Terminating child processes...")
stop_existing_supervisord_process()
raise SystemExit(0)
if daemonize:
# Start supervisord in background (daemon mode)
subprocess.Popen(
f"supervisord --configuration={CONFIG_FILE}",
stdin=None,
stdout=log_handle,
stderr=log_handle,
shell=True,
start_new_session=True,
)
time.sleep(2)
return get_existing_supervisord_process()
else:
# Start supervisord in FOREGROUND - this will block until supervisord exits
# supervisord with nodaemon=true will run in foreground and handle signals properly
# When supervisord gets SIGINT/SIGTERM, it will stop all child processes before exiting
proc = subprocess.Popen(
f"supervisord --configuration={CONFIG_FILE}",
stdin=None,
stdout=log_handle,
stderr=log_handle,
shell=True,
start_new_session=False, # Keep in same process group so signals propagate
)
# Monitor for termination signals and cleanup child processes
if not daemonize:
try:
signal.signal(signal.SIGINT, exit_signal_handler)
signal.signal(signal.SIGHUP, exit_signal_handler)
signal.signal(signal.SIGPIPE, exit_signal_handler)
signal.signal(signal.SIGTERM, exit_signal_handler)
except Exception:
# signal handlers only work in main thread
pass
# otherwise supervisord will containue in background even if parent proc is ends (aka daemon mode)
# Store the process so we can wait on it later
global _supervisord_proc
_supervisord_proc = proc
time.sleep(2)
# Wait a bit for supervisord to start up
time.sleep(2)
return get_existing_supervisord_process()
return get_existing_supervisord_process()
def get_or_create_supervisord_process(daemonize=False):
SOCK_FILE = get_sock_file()
@@ -353,9 +393,15 @@ def tail_worker_logs(log_path: str):
pass
def tail_multiple_worker_logs(log_files: list[str], follow=True):
"""Tail multiple log files simultaneously, interleaving their output."""
import select
def tail_multiple_worker_logs(log_files: list[str], follow=True, proc=None):
"""Tail multiple log files simultaneously, interleaving their output.
Args:
log_files: List of log file paths to tail
follow: Whether to keep following (True) or just read existing content (False)
proc: Optional subprocess.Popen object - stop tailing when this process exits
"""
import re
from pathlib import Path
# Convert relative paths to absolute paths
@@ -377,48 +423,53 @@ def tail_multiple_worker_logs(log_files: list[str], follow=True):
for log_path in log_paths:
try:
f = open(log_path, 'r')
# Seek to end of file if following
if follow:
f.seek(0, 2) # Seek to end
file_handles.append((log_path.name, f))
# Don't seek to end - show recent content so user sees something
# Go to end minus 4KB to show some recent logs
f.seek(0, 2) # Go to end first
file_size = f.tell()
if file_size > 4096:
f.seek(file_size - 4096)
f.readline() # Skip partial line
else:
f.seek(0) # Small file, read from start
file_handles.append((log_path, f))
print(f" [tailing {log_path.name}]")
except Exception as e:
print(f"[yellow]Warning: Could not open {log_path}: {e}[/yellow]")
sys.stderr.write(f"Warning: Could not open {log_path}: {e}\n")
if not file_handles:
print("[red]No log files could be opened[/red]")
sys.stderr.write("No log files could be opened\n")
return
# Print which logs we're tailing
log_names = [name for name, _ in file_handles]
print(f"[dim]Tailing: {', '.join(log_names)}[/dim]")
print()
try:
while follow:
# Read available lines from all files
for log_name, f in file_handles:
line = f.readline()
if line:
# Colorize based on log source
if 'orchestrator' in log_name.lower():
color = 'cyan'
elif 'daphne' in log_name.lower():
color = 'green'
else:
color = 'white'
# Check if the monitored process has exited
if proc is not None and proc.poll() is not None:
print(f"\n[server process exited with code {proc.returncode}]")
break
had_output = False
# Read ALL available lines from all files (not just one per iteration)
for log_path, f in file_handles:
while True:
line = f.readline()
if not line:
break # No more lines available in this file
had_output = True
# Strip ANSI codes if present (supervisord does this but just in case)
import re
line_clean = re.sub(r'\x1b\[[0-9;]*m', '', line.rstrip())
if line_clean:
print(f'[{color}][{log_name}][/{color}] {line_clean}')
print(line_clean)
# Small sleep to avoid busy-waiting
time.sleep(0.1)
# Small sleep to avoid busy-waiting (only when no output)
if not had_output:
time.sleep(0.05)
except (KeyboardInterrupt, BrokenPipeError, IOError):
print("\n[yellow][i] Stopped tailing logs[/i][/yellow]")
pass # Let the caller handle the cleanup message
except SystemExit:
pass
finally:
@@ -451,6 +502,8 @@ def watch_worker(supervisor, daemon_name, interval=5):
def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
global _supervisord_proc
supervisor = get_or_create_supervisord_process(daemonize=daemonize)
bg_workers = [
@@ -466,36 +519,50 @@ def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
if not daemonize:
try:
watch_worker(supervisor, "worker_daphne")
# Tail worker logs while supervisord runs
sys.stdout.write('Tailing worker logs (Ctrl+C to stop)...\n\n')
sys.stdout.flush()
tail_multiple_worker_logs(
log_files=['logs/worker_daphne.log', 'logs/worker_orchestrator.log'],
follow=True,
proc=_supervisord_proc, # Stop tailing when supervisord exits
)
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except BaseException as e:
STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
raise
STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping gracefully...")
finally:
stop_worker(supervisor, "worker_daphne")
# Ensure supervisord and all children are stopped
stop_existing_supervisord_process()
time.sleep(0.5)
def start_cli_workers(watch=False):
global _supervisord_proc
supervisor = get_or_create_supervisord_process(daemonize=False)
start_worker(supervisor, ORCHESTRATOR_WORKER)
if watch:
try:
watch_worker(supervisor, ORCHESTRATOR_WORKER['name'])
# Block on supervisord process - it will handle signals and stop children
if _supervisord_proc:
_supervisord_proc.wait()
else:
# Fallback to watching worker if no proc reference
watch_worker(supervisor, ORCHESTRATOR_WORKER['name'])
except (KeyboardInterrupt, BrokenPipeError, IOError):
STDERR.print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except BaseException as e:
STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping orchestrator gracefully...")
raise
STDERR.print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping gracefully...")
finally:
stop_worker(supervisor, ORCHESTRATOR_WORKER['name'])
# Ensure supervisord and all children are stopped
stop_existing_supervisord_process()
time.sleep(0.5)
return [ORCHESTRATOR_WORKER]