better tui

This commit is contained in:
Nick Sweeting
2026-01-19 01:53:32 -08:00
parent 1cb2d5070e
commit b5bbc3b549
9 changed files with 690 additions and 109 deletions

View File

@@ -30,6 +30,7 @@ import time
from typing import Type
from datetime import timedelta
from multiprocessing import Process as MPProcess
from pathlib import Path
from django.utils import timezone
@@ -457,12 +458,14 @@ class Orchestrator:
# Enable progress layout only in TTY + foreground mode
show_progress = IS_TTY and self.exit_on_idle
plain_output = not IS_TTY
self.on_startup()
if not show_progress:
# No progress layout - just run normally
self._run_orchestrator_loop(None)
# No progress layout - optionally emit plain lines for non-TTY output
progress_layout = ArchiveBoxProgressLayout(crawl_id=self.crawl_id) if plain_output else None
self._run_orchestrator_loop(progress_layout, plain_output=plain_output)
else:
# Redirect worker subprocess output to /dev/null
devnull_fd = os.open(os.devnull, os.O_WRONLY)
@@ -497,7 +500,7 @@ class Orchestrator:
screen=True,
console=orchestrator_console,
):
self._run_orchestrator_loop(progress_layout)
self._run_orchestrator_loop(progress_layout, plain_output=False)
# Restore original console
logging_module.CONSOLE = original_console
@@ -515,11 +518,12 @@ class Orchestrator:
pass
# stdout_for_console is closed by orchestrator_console
def _run_orchestrator_loop(self, progress_layout):
def _run_orchestrator_loop(self, progress_layout, plain_output: bool = False):
"""Run the main orchestrator loop with optional progress display."""
last_queue_sizes = {}
last_snapshot_count = None
tick_count = 0
last_plain_lines: set[tuple[str, str]] = set()
# Track snapshot progress to detect changes
snapshot_progress = {} # snapshot_id -> (total, completed, current_plugin)
@@ -591,6 +595,22 @@ class Orchestrator:
def _abbrev(text: str, max_len: int = 80) -> str:
return text if len(text) <= max_len else f"{text[:max_len - 3]}..."
def _format_size(num_bytes: int | None) -> str:
if not num_bytes:
return ''
size = float(num_bytes)
for unit in ('b', 'kb', 'mb', 'gb', 'tb'):
if size < 1024 or unit == 'tb':
return f"{size:.1f}{unit}"
size /= 1024
return ''
def _format_seconds(total_seconds: float | None) -> str:
if total_seconds is None:
return ''
seconds = max(0.0, float(total_seconds))
return f"{seconds:.1f}s"
tree_data: list[dict] = []
for crawl in crawls:
urls = crawl.get_urls_list()
@@ -614,28 +634,174 @@ class Orchestrator:
active_snaps.append(s)
for snap in active_snaps:
total = snap.archiveresult_set.count()
completed = snap.archiveresult_set.filter(status__in=[
ArchiveResult.StatusChoices.SUCCEEDED,
ArchiveResult.StatusChoices.SKIPPED,
ArchiveResult.StatusChoices.FAILED,
]).count()
running = snap.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED).count()
try:
from archivebox.config.configset import get_config
from archivebox.hooks import discover_hooks
hooks_list = discover_hooks('Snapshot', config=get_config(snapshot=snap))
total_hooks = len(hooks_list)
snap_config = get_config(snapshot=snap)
hooks_list = discover_hooks('Snapshot', config=snap_config)
hooks_by_snapshot[str(snap.id)] = hooks_list
from archivebox.hooks import get_plugin_special_config
hook_timeouts = {}
for hook_path in hooks_list:
plugin_name = hook_path.parent.name
try:
hook_timeouts[hook_path.name] = int(get_plugin_special_config(plugin_name, snap_config)['timeout'])
except Exception:
pass
except Exception:
total_hooks = total
pending = max(total_hooks - completed - running, 0)
snap_label = _abbrev(snap.url or str(snap.id), max_len=60)
hooks_list = []
hook_timeouts = {}
try:
from archivebox import DATA_DIR
data_dir = Path(DATA_DIR)
snap_path = snap.output_dir
try:
rel = Path(snap_path)
if rel.is_absolute():
rel = rel.relative_to(data_dir)
snap_path = f"./{rel}" if not str(rel).startswith("./") else str(rel)
except Exception:
snap_path = str(snap_path)
ars = list(
snap.archiveresult_set.select_related('process').order_by('start_ts')
)
ar_by_hook = {ar.hook_name: ar for ar in ars if ar.hook_name}
except Exception:
snap_path = ''
ar_by_hook = {}
plugin_hooks: dict[str, list[dict]] = {}
now = timezone.now()
for hook_path in hooks_list:
hook_name = hook_path.name
is_bg = '.bg.' in hook_name
ar = ar_by_hook.get(hook_name)
status = 'pending'
is_running = False
is_pending = True
elapsed = ''
timeout = ''
size = ''
if ar:
if ar.status == ArchiveResult.StatusChoices.STARTED:
status = 'started'
is_running = True
is_pending = False
start_ts = ar.start_ts or (ar.process.started_at if ar.process_id and ar.process else None)
if start_ts:
elapsed = _format_seconds((now - start_ts).total_seconds())
hook_timeout = None
if ar.process_id and ar.process and ar.process.timeout:
hook_timeout = ar.process.timeout
hook_timeout = hook_timeout or hook_timeouts.get(hook_name)
if hook_timeout:
timeout = _format_seconds(hook_timeout)
else:
status = ar.status
is_pending = False
start_ts = ar.start_ts or (ar.process.started_at if ar.process_id and ar.process else None)
end_ts = ar.end_ts or (ar.process.ended_at if ar.process_id and ar.process else None)
if start_ts and end_ts:
elapsed = _format_seconds((end_ts - start_ts).total_seconds())
size = _format_size(getattr(ar, 'output_size', None))
else:
hook_timeout = hook_timeouts.get(hook_name)
if hook_timeout:
timeout = _format_seconds(hook_timeout)
elapsed = _format_seconds(0)
plugin_name = hook_path.parent.name
if plugin_name in ('plugins', '.'):
plugin_name = hook_name.split('__')[-1].split('.')[0]
plugin_hooks.setdefault(plugin_name, []).append({
'status': status,
'size': size,
'elapsed': elapsed,
'timeout': timeout,
'is_bg': is_bg,
'is_running': is_running,
'is_pending': is_pending,
'hook_name': hook_name,
})
hooks = []
for plugin_name, hook_entries in plugin_hooks.items():
running = next((h for h in hook_entries if h['is_running']), None)
pending = next((h for h in hook_entries if h['is_pending']), None)
any_failed = any(h['status'] == ArchiveResult.StatusChoices.FAILED for h in hook_entries)
any_succeeded = any(h['status'] == ArchiveResult.StatusChoices.SUCCEEDED for h in hook_entries)
any_skipped = any(h['status'] == ArchiveResult.StatusChoices.SKIPPED for h in hook_entries)
if running:
status = 'started'
is_running = True
is_pending = False
is_bg = running['is_bg']
elapsed = running.get('elapsed', '')
timeout = running.get('timeout', '')
size = ''
elif pending:
status = 'pending'
is_running = False
is_pending = True
is_bg = pending['is_bg']
elapsed = pending.get('elapsed', '') or _format_seconds(0)
timeout = pending.get('timeout', '')
size = ''
else:
is_running = False
is_pending = False
is_bg = any(h['is_bg'] for h in hook_entries)
if any_failed:
status = 'failed'
elif any_succeeded:
status = 'succeeded'
elif any_skipped:
status = 'skipped'
else:
status = 'skipped'
total_elapsed = 0.0
has_elapsed = False
for h in hook_entries:
if h.get('elapsed'):
try:
total_elapsed += float(h['elapsed'].rstrip('s'))
has_elapsed = True
except Exception:
pass
elapsed = _format_seconds(total_elapsed) if has_elapsed else ''
max_output = 0
# Use the largest output_size we already computed on ArchiveResult
ar_sizes = [
ar_by_hook[h['hook_name']].output_size
for h in hook_entries
if h.get('hook_name') in ar_by_hook and getattr(ar_by_hook[h['hook_name']], 'output_size', 0)
]
if ar_sizes:
max_output = max(ar_sizes)
size = _format_size(max_output) if max_output else ''
timeout = ''
hooks.append({
'status': status,
'path': f"./{plugin_name}",
'size': size,
'elapsed': elapsed,
'timeout': timeout,
'is_bg': is_bg,
'is_running': is_running,
'is_pending': is_pending,
})
snap_label = _abbrev(f"{str(snap.id)[-8:]} {snap.url or ''}".strip(), max_len=80)
snapshots.append({
'id': str(snap.id),
'status': snap.status,
'label': snap_label,
'hooks': {'completed': completed, 'running': running, 'pending': pending} if total else {},
'output_path': snap_path,
'hooks': hooks,
})
pending_snapshot_candidates.append(snap)
@@ -837,6 +1003,16 @@ class Orchestrator:
if snapshot_id in snapshot_progress:
del snapshot_progress[snapshot_id]
if plain_output:
plain_lines = progress_layout.plain_lines()
new_lines = [line for line in plain_lines if line not in last_plain_lines]
if new_lines:
ts = timezone.now().strftime("%Y-%m-%d %H:%M:%S")
for panel, line in new_lines:
if line:
print(f"[{ts}] [{panel}] {line}")
last_plain_lines = set(plain_lines)
# Track idle state
has_pending = self.has_pending_work(queue_sizes)
has_running = self.has_running_workers()

View File

@@ -254,8 +254,7 @@ def start_new_supervisord_process(daemonize=False):
shell=True,
start_new_session=True,
)
time.sleep(2)
return get_existing_supervisord_process()
return wait_for_supervisord_ready()
else:
# Start supervisord in FOREGROUND - this will block until supervisord exits
# supervisord with nodaemon=true will run in foreground and handle signals properly
@@ -273,10 +272,19 @@ def start_new_supervisord_process(daemonize=False):
global _supervisord_proc
_supervisord_proc = proc
# Wait a bit for supervisord to start up
time.sleep(2)
return wait_for_supervisord_ready()
return get_existing_supervisord_process()
def wait_for_supervisord_ready(max_wait_sec: float = 5.0, interval_sec: float = 0.1):
"""Poll for supervisord readiness without a fixed startup sleep."""
deadline = time.monotonic() + max_wait_sec
supervisor = None
while time.monotonic() < deadline:
supervisor = get_existing_supervisord_process()
if supervisor is not None:
return supervisor
time.sleep(interval_sec)
return supervisor
def get_or_create_supervisord_process(daemonize=False):
@@ -287,17 +295,16 @@ def get_or_create_supervisord_process(daemonize=False):
if supervisor is None:
stop_existing_supervisord_process()
supervisor = start_new_supervisord_process(daemonize=daemonize)
time.sleep(0.5)
# wait up to 5s in case supervisord is slow to start
if not supervisor:
for _ in range(10):
for _ in range(50):
if supervisor is not None:
print()
break
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(0.5)
time.sleep(0.1)
supervisor = get_existing_supervisord_process()
else:
print()
@@ -328,9 +335,7 @@ def start_worker(supervisor, daemon, lazy=False):
for added in added:
supervisor.addProcessGroup(added)
time.sleep(1)
for _ in range(10):
for _ in range(25):
procs = supervisor.getAllProcessInfo()
for proc in procs:
if proc['name'] == daemon["name"]:
@@ -345,8 +350,8 @@ def start_worker(supervisor, daemon, lazy=False):
print(f" - Worker {daemon['name']}: started {proc['statename']} ({proc['description']})")
return proc
# retry in a second in case it's slow to launch
time.sleep(0.5)
# retry in a moment in case it's slow to launch
time.sleep(0.2)
raise Exception(f"Failed to start worker {daemon['name']}! Only found: {procs}")