Files
ArchiveBox/archivebox/misc/progress_layout.py
2026-03-15 20:12:27 -07:00

862 lines
34 KiB
Python

"""
Rich Layout-based live progress display for ArchiveBox orchestrator.
Shows a comprehensive dashboard with:
- Top: Crawl queue status (full width)
- Middle: Crawl queue tree with hook outputs
- Bottom: Running process logs (dynamic panels)
"""
__package__ = 'archivebox.misc'
from datetime import datetime, timezone
import os
import re
from typing import List, Optional, Any
from collections import deque
from pathlib import Path
from rich import box
from rich.console import Group, RenderableType
from rich.layout import Layout
from rich.columns import Columns
from rich.panel import Panel
from rich.text import Text
from rich.table import Table
from rich.tree import Tree
from rich.cells import cell_len
from archivebox.config import VERSION
_RICH_TAG_RE = re.compile(r'\[/?[^\]]+\]')
def _strip_rich(text: str) -> str:
return _RICH_TAG_RE.sub('', text or '').strip()
class CrawlQueuePanel:
"""Display crawl queue status across full width."""
def __init__(self):
self.orchestrator_status = "Idle"
self.crawl_queue_count = 0
self.crawl_workers_count = 0
self.binary_queue_count = 0
self.binary_workers_count = 0
self.max_crawl_workers = 8
self.crawl_id: Optional[str] = None
def __rich__(self) -> RenderableType:
grid = Table.grid(expand=True)
grid.add_column(justify="left", ratio=1)
grid.add_column(justify="center", ratio=1)
grid.add_column(justify="center", ratio=1)
grid.add_column(justify="right", ratio=1)
# Left: ArchiveBox version + timestamp
left_text = Text()
left_text.append("ArchiveBox ", style="bold cyan")
left_text.append(f"v{VERSION}", style="bold yellow")
left_text.append(f"{datetime.now(timezone.utc).strftime('%H:%M:%S')}", style="grey53")
# Center-left: Crawl + Binary queue status
queue_style = "yellow" if self.crawl_queue_count > 0 else "grey53"
center_left_text = Text()
center_left_text.append("Crawls: ", style="white")
center_left_text.append(str(self.crawl_queue_count), style=f"bold {queue_style}")
center_left_text.append(" queued", style="grey53")
center_left_text.append(" • Binaries: ", style="white")
binary_queue_style = "yellow" if self.binary_queue_count > 0 else "grey53"
center_left_text.append(str(self.binary_queue_count), style=f"bold {binary_queue_style}")
center_left_text.append(" queued", style="grey53")
# Center-right: Worker status
worker_style = "green" if self.crawl_workers_count > 0 else "grey53"
center_right_text = Text()
center_right_text.append("Workers: ", style="white")
center_right_text.append(f"{self.crawl_workers_count}/{self.max_crawl_workers}", style=f"bold {worker_style}")
center_right_text.append(" crawl", style="grey53")
binary_worker_style = "green" if self.binary_workers_count > 0 else "grey53"
center_right_text.append("", style="grey53")
center_right_text.append(str(self.binary_workers_count), style=f"bold {binary_worker_style}")
center_right_text.append(" binary", style="grey53")
# Right: Orchestrator status
status_color = "green" if self.crawl_workers_count > 0 else "grey53"
right_text = Text()
right_text.append("Status: ", style="white")
right_text.append(self.orchestrator_status, style=f"bold {status_color}")
if self.crawl_id:
right_text.append(f" [{self.crawl_id[:8]}]", style="grey53")
grid.add_row(left_text, center_left_text, center_right_text, right_text)
return Panel(grid, style="white on blue", box=box.HORIZONTALS)
class ProcessLogPanel:
"""Display logs for a running Process."""
def __init__(self, process: Any, max_lines: int = 8, compact: bool | None = None, bg_terminating: bool = False):
self.process = process
self.max_lines = max_lines
self.compact = compact
self.bg_terminating = bg_terminating
def __rich__(self) -> RenderableType:
completed_line = self._completed_output_line()
if completed_line:
style = "green" if self._completed_ok() else "yellow"
return Text(completed_line, style=style)
is_pending = self._is_pending()
output_line = '' if is_pending else self._output_line()
stdout_lines = []
stderr_lines = []
try:
stdout_lines = list(self.process.tail_stdout(lines=self.max_lines, follow=False))
stderr_lines = list(self.process.tail_stderr(lines=self.max_lines, follow=False))
except Exception:
stdout_lines = []
stderr_lines = []
header_lines = []
chrome_launch_line = self._chrome_launch_line(stderr_lines, stdout_lines)
if chrome_launch_line:
header_lines.append(Text(chrome_launch_line, style="grey53"))
if output_line:
header_lines.append(Text(output_line, style="grey53"))
log_lines = []
for line in stdout_lines:
if line:
log_lines.append(Text(line, style="white"))
for line in stderr_lines:
if line:
log_lines.append(Text(line, style="cyan"))
max_body = max(1, self.max_lines - len(header_lines))
if not log_lines:
log_lines = []
lines = header_lines + log_lines[-max_body:]
content = Group(*lines) if lines else Text("")
title = self._title()
border_style = self._border_style(is_pending=is_pending)
height = 2 if is_pending else None
return Panel(
content,
title=title,
border_style=border_style,
box=box.HORIZONTALS,
padding=(0, 1),
height=height,
)
def plain_lines(self) -> list[str]:
completed_line = self._completed_output_line()
if completed_line:
return [completed_line]
lines = []
if not self._is_pending():
output_line = self._output_line()
if output_line:
lines.append(output_line)
try:
stdout_lines = list(self.process.tail_stdout(lines=self.max_lines, follow=False))
stderr_lines = list(self.process.tail_stderr(lines=self.max_lines, follow=False))
except Exception:
stdout_lines = []
stderr_lines = []
for line in stdout_lines:
if line:
lines.append(line)
for line in stderr_lines:
if line:
lines.append(line)
return lines
def _title(self) -> str:
process_type = getattr(self.process, 'process_type', 'process')
worker_type = getattr(self.process, 'worker_type', '')
pid = getattr(self.process, 'pid', None)
label = process_type
if process_type == 'worker' and worker_type:
label, worker_suffix = self._worker_label(worker_type)
elif process_type == 'hook':
try:
cmd = getattr(self.process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else 'hook'
plugin_name = hook_path.parent.name if hook_path and hook_path.parent.name else 'hook'
except Exception:
hook_name = 'hook'
plugin_name = 'hook'
label = f"{plugin_name}/{hook_name}"
worker_suffix = ''
else:
worker_suffix = ''
url = self._extract_url()
url_suffix = f" url={self._abbrev_url(url)}" if url else ""
time_suffix = self._elapsed_suffix()
title_style = "grey53" if self._is_pending() else "bold white"
if pid:
return f"[{title_style}]{label}[/{title_style}] [grey53]pid={pid}{worker_suffix}{url_suffix}{time_suffix}[/grey53]"
return f"[{title_style}]{label}[/{title_style}]{f' [grey53]{worker_suffix.strip()} {url_suffix.strip()}{time_suffix}[/grey53]' if (worker_suffix or url_suffix or time_suffix) else ''}".rstrip()
def _is_background_hook(self) -> bool:
if getattr(self.process, 'process_type', '') != 'hook':
return False
try:
cmd = getattr(self.process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
return '.bg.' in hook_name
except Exception:
return False
def _is_pending(self) -> bool:
status = getattr(self.process, 'status', '')
if status in ('queued', 'pending', 'backoff'):
return True
if getattr(self.process, 'process_type', '') == 'hook' and not getattr(self.process, 'pid', None):
return True
return False
def _completed_ok(self) -> bool:
exit_code = getattr(self.process, 'exit_code', None)
return exit_code in (0, None)
def _completed_output_line(self) -> str:
status = getattr(self.process, 'status', '')
if status != 'exited':
return ''
output_line = self._output_line()
if not output_line:
return ''
if not self._has_output_files():
return ''
return output_line
def _has_output_files(self) -> bool:
pwd = getattr(self.process, 'pwd', None)
if not pwd:
return False
try:
base = Path(pwd)
if not base.exists():
return False
ignore = {'stdout.log', 'stderr.log', 'cmd.sh', 'process.pid', 'hook.pid', 'listener.pid'}
for path in base.rglob('*'):
if path.is_file() and path.name not in ignore:
return True
except Exception:
return False
return False
def _border_style(self, is_pending: bool) -> str:
if is_pending:
return "grey53"
status = getattr(self.process, 'status', '')
if status == 'exited':
exit_code = getattr(self.process, 'exit_code', None)
return "green" if exit_code in (0, None) else "yellow"
is_hook = getattr(self.process, 'process_type', '') == 'hook'
if is_hook and not self._is_background_hook():
return "green"
if is_hook and self._is_background_hook() and self.bg_terminating:
return "red"
return "cyan"
def _worker_label(self, worker_type: str) -> tuple[str, str]:
cmd = getattr(self.process, 'cmd', []) or []
if worker_type == 'crawl':
crawl_id = self._extract_arg(cmd, '--crawl-id')
suffix = ''
if crawl_id:
suffix = f" id={str(crawl_id)[-8:]}"
try:
from archivebox.crawls.models import Crawl
crawl = Crawl.objects.filter(id=crawl_id).first()
if crawl:
urls = crawl.get_urls_list()
if urls:
url_list = self._abbrev_urls(urls)
suffix += f" urls={url_list}"
except Exception:
pass
return 'crawl', suffix
if worker_type == 'snapshot':
snapshot_id = self._extract_arg(cmd, '--snapshot-id')
suffix = ''
if snapshot_id:
suffix = f" id={str(snapshot_id)[-8:]}"
try:
from archivebox.core.models import Snapshot
snap = Snapshot.objects.filter(id=snapshot_id).first()
if snap and snap.url:
suffix += f" url={self._abbrev_url(snap.url, max_len=48)}"
except Exception:
pass
return 'snapshot', suffix
return f"worker:{worker_type}", ''
@staticmethod
def _extract_arg(cmd: list[str], key: str) -> str | None:
for i, part in enumerate(cmd):
if part.startswith(f'{key}='):
return part.split('=', 1)[1]
if part == key and i + 1 < len(cmd):
return cmd[i + 1]
return None
def _abbrev_urls(self, urls: list[str], max_len: int = 48) -> str:
if not urls:
return ''
if len(urls) == 1:
return self._abbrev_url(urls[0], max_len=max_len)
first = self._abbrev_url(urls[0], max_len=max_len)
return f"{first},+{len(urls) - 1}"
def _extract_url(self) -> str:
url = getattr(self.process, 'url', None)
if url:
return str(url)
cmd = getattr(self.process, 'cmd', []) or []
for i, part in enumerate(cmd):
if part.startswith('--url='):
return part.split('=', 1)[1].strip()
if part == '--url' and i + 1 < len(cmd):
return str(cmd[i + 1]).strip()
return ''
def _abbrev_url(self, url: str, max_len: int = 48) -> str:
if not url:
return ''
if len(url) <= max_len:
return url
return f"{url[:max_len - 3]}..."
def _chrome_launch_line(self, stderr_lines: list[str], stdout_lines: list[str]) -> str:
try:
cmd = getattr(self.process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
if 'chrome_launch' not in hook_name:
return ''
pid = ''
ws = ''
for line in stderr_lines + stdout_lines:
if not ws and 'CDP URL:' in line:
ws = line.split('CDP URL:', 1)[1].strip()
if not pid and 'PID:' in line:
pid = line.split('PID:', 1)[1].strip()
if pid and ws:
return f"Chrome pid={pid} {ws}"
if ws:
return f"Chrome {ws}"
if pid:
return f"Chrome pid={pid}"
try:
from archivebox import DATA_DIR
base = Path(DATA_DIR)
pwd = getattr(self.process, 'pwd', None)
if pwd:
chrome_dir = Path(pwd)
if not chrome_dir.is_absolute():
chrome_dir = (base / chrome_dir).resolve()
cdp_file = chrome_dir / 'cdp_url.txt'
pid_file = chrome_dir / 'chrome.pid'
if cdp_file.exists():
ws = cdp_file.read_text().strip()
if pid_file.exists():
pid = pid_file.read_text().strip()
if pid and ws:
return f"Chrome pid={pid} {ws}"
if ws:
return f"Chrome {ws}"
if pid:
return f"Chrome pid={pid}"
except Exception:
pass
except Exception:
return ''
return ''
def _elapsed_suffix(self) -> str:
started_at = getattr(self.process, 'started_at', None)
timeout = getattr(self.process, 'timeout', None)
if not started_at or not timeout:
return ''
try:
now = datetime.now(timezone.utc) if started_at.tzinfo else datetime.now()
elapsed = int((now - started_at).total_seconds())
elapsed = max(elapsed, 0)
return f" [{elapsed}/{int(timeout)}s]"
except Exception:
return ''
def _output_line(self) -> str:
pwd = getattr(self.process, 'pwd', None)
if not pwd:
return ''
try:
from archivebox import DATA_DIR
rel = Path(pwd)
base = Path(DATA_DIR)
if rel.is_absolute():
try:
rel = rel.relative_to(base)
except Exception:
pass
rel_str = f"./{rel}" if not str(rel).startswith("./") else str(rel)
return f"{rel_str}"
except Exception:
return f"{pwd}"
class WorkerLogPanel:
"""Display worker logs by tailing stdout/stderr from Process."""
def __init__(self, title: str, empty_message: str, running_message: str, max_lines: int = 8):
self.title = title
self.empty_message = empty_message
self.running_message = running_message
self.log_lines: deque = deque(maxlen=max_lines * 2) # Allow more buffer
self.max_lines = max_lines
self.last_stdout_pos = 0 # Track file position for efficient tailing
self.last_stderr_pos = 0
self.last_process_running = False
def update_from_process(self, process: Any):
"""Update logs by tailing the Process stdout/stderr files."""
if not process:
self.last_process_running = False
return
# Use Process tail helpers for consistency
try:
self.last_process_running = bool(getattr(process, 'is_running', False))
stdout_lines = list(process.tail_stdout(lines=self.max_lines, follow=False))
stderr_lines = list(process.tail_stderr(lines=self.max_lines, follow=False))
except Exception:
return
self.log_lines.clear()
# Preserve ordering by showing stdout then stderr
for line in stdout_lines:
if line:
self.log_lines.append(('stdout', line))
for line in stderr_lines:
if line:
self.log_lines.append(('stderr', line))
def __rich__(self) -> Panel:
if not self.log_lines:
message = self.running_message if self.last_process_running else self.empty_message
content = Text(message, style="grey53", justify="center")
else:
# Get the last max_lines for display
display_lines = list(self.log_lines)[-self.max_lines:]
lines = []
for stream, message in display_lines:
line = Text()
# Color code by stream - stderr is usually debug output
if stream == 'stderr':
# Rich formatted logs from stderr
line.append(message, style="cyan")
else:
line.append(message, style="white")
lines.append(line)
content = Group(*lines)
return Panel(
content,
title=f"[bold cyan]{self.title}",
border_style="cyan",
box=box.HORIZONTALS,
)
class CrawlQueueTreePanel:
"""Display crawl queue with snapshots + hook summary in a tree view."""
def __init__(self, max_crawls: int = 8, max_snapshots: int = 16):
self.crawls: list[dict[str, Any]] = []
self.max_crawls = max_crawls
self.max_snapshots = max_snapshots
def update_crawls(self, crawls: list[dict[str, Any]]) -> None:
"""Update crawl tree data."""
self.crawls = crawls[:self.max_crawls]
def __rich__(self) -> Panel:
if not self.crawls:
content = Text("No active crawls", style="grey53", justify="center")
else:
trees = []
for crawl in self.crawls:
crawl_status = crawl.get('status', '')
crawl_label = crawl.get('label', '')
crawl_id = crawl.get('id', '')[:8]
crawl_text = Text(f"{self._status_icon(crawl_status)} {crawl_id} {crawl_label}", style="white")
crawl_tree = Tree(crawl_text, guide_style="grey53")
snapshots = crawl.get('snapshots', [])[:self.max_snapshots]
for snap in snapshots:
snap_status = snap.get('status', '')
snap_label = snap.get('label', '')
snap_text = Text(f"{self._status_icon(snap_status)} {snap_label}", style="white")
snap_node = crawl_tree.add(snap_text)
output_path = snap.get('output_path', '')
if output_path:
snap_node.add(Text(output_path, style="grey53"))
hooks = snap.get('hooks', []) or []
for hook in hooks:
status = hook.get('status', '')
path = hook.get('path', '')
size = hook.get('size', '')
elapsed = hook.get('elapsed', '')
timeout = hook.get('timeout', '')
is_bg = hook.get('is_bg', False)
is_running = hook.get('is_running', False)
is_pending = hook.get('is_pending', False)
icon, color = self._hook_style(status, is_bg=is_bg, is_running=is_running, is_pending=is_pending)
stats = self._hook_stats(size=size, elapsed=elapsed, timeout=timeout, status=status)
line = Text(f"{icon} {path}{stats}", style=color)
stderr_tail = hook.get('stderr', '')
if stderr_tail:
left_str = f"{icon} {path}{stats}"
avail = self._available_width(left_str, indent=16)
trunc = getattr(self, "_truncate_tail", self._truncate_to_width)
stderr_tail = trunc(stderr_tail, avail)
if not stderr_tail:
snap_node.add(line)
continue
row = Table.grid(expand=True)
row.add_column(justify="left", ratio=1)
row.add_column(justify="right")
row.add_row(line, Text(stderr_tail, style="grey70"))
snap_node.add(row)
else:
snap_node.add(line)
trees.append(crawl_tree)
content = Group(*trees)
return Panel(
content,
title="[bold white]Crawl Queue",
border_style="white",
box=box.HORIZONTALS,
)
@staticmethod
def _status_icon(status: str) -> str:
if status in ('queued', 'pending'):
return ''
if status in ('started', 'running'):
return ''
if status in ('sealed', 'done', 'completed'):
return ''
if status in ('failed', 'error'):
return ''
return ''
@staticmethod
def _hook_style(status: str, is_bg: bool = False, is_running: bool = False, is_pending: bool = False) -> tuple[str, str]:
if status == 'succeeded':
return '', 'green'
if status == 'failed':
return '', 'red'
if status == 'skipped':
return '', 'grey53'
if is_pending:
return '⌛️', 'grey53'
if is_running and is_bg:
return '', 'cyan'
if is_running:
return '▶️', 'cyan'
if status == 'started':
return '▶️', 'cyan'
return '', 'grey53'
@staticmethod
def _hook_stats(size: str = '', elapsed: str = '', timeout: str = '', status: str = '') -> str:
if status in ('succeeded', 'failed', 'skipped'):
parts = []
if size:
parts.append(size)
if elapsed:
parts.append(elapsed)
if not parts:
return ''
return f" ({' | '.join(parts)})"
if elapsed or timeout:
size_part = '...' if elapsed or timeout else ''
time_part = ''
if elapsed and timeout:
time_part = f"{elapsed}/{timeout}"
elif elapsed:
time_part = f"{elapsed}"
return f" ({size_part} | {time_part})" if time_part else f" ({size_part})"
return ''
@staticmethod
def _terminal_width() -> int:
try:
return os.get_terminal_size().columns
except OSError:
return 120
@staticmethod
def _truncate_to_width(text: str, max_width: int) -> str:
if not text or max_width <= 0:
return ''
t = Text(text)
t.truncate(max_width, overflow="ellipsis")
return t.plain
@staticmethod
def _truncate_tail(text: str, max_width: int) -> str:
if not text or max_width <= 0:
return ''
if cell_len(text) <= max_width:
return text
if max_width <= 1:
return ''
return f"{text[-(max_width - 1):]}"
def _available_width(self, left_text: str, indent: int = 0) -> int:
width = self._terminal_width()
base = max(0, width - cell_len(left_text) - indent - 6)
cap = max(0, (width * 2) // 5)
return max(0, min(base, cap))
class ArchiveBoxProgressLayout:
"""
Main layout manager for ArchiveBox orchestrator progress display.
Layout structure:
┌─────────────────────────────────────────────────────────────┐
│ Crawl Queue (full width) │
├─────────────────────────────────────────────────────────────┤
│ Crawl Queue Tree (hooks + outputs) │
├─────────────────────────────────────────────────────────────┤
│ Running Process Logs (dynamic panels) │
└─────────────────────────────────────────────────────────────┘
"""
def __init__(self, crawl_id: Optional[str] = None):
self.crawl_id = crawl_id
self.start_time = datetime.now(timezone.utc)
# Create components
self.crawl_queue = CrawlQueuePanel()
self.crawl_queue.crawl_id = crawl_id
self.process_panels: List[ProcessLogPanel] = []
self.crawl_queue_tree = CrawlQueueTreePanel(max_crawls=8, max_snapshots=16)
# Create layout
self.layout = self._make_layout()
def _make_layout(self) -> Layout:
"""Define the layout structure."""
layout = Layout(name="root")
# Top-level split: crawl_queue, crawl_tree, processes
layout.split(
Layout(name="crawl_queue", size=3),
Layout(name="crawl_tree", size=20),
Layout(name="processes", ratio=1),
)
# Assign components to layout sections
layout["crawl_queue"].update(self.crawl_queue)
layout["crawl_tree"].update(self.crawl_queue_tree)
layout["processes"].update(Columns([]))
return layout
def update_orchestrator_status(
self,
status: str,
crawl_queue_count: int = 0,
crawl_workers_count: int = 0,
binary_queue_count: int = 0,
binary_workers_count: int = 0,
max_crawl_workers: int = 8,
):
"""Update orchestrator status in the crawl queue panel."""
self.crawl_queue.orchestrator_status = status
self.crawl_queue.crawl_queue_count = crawl_queue_count
self.crawl_queue.crawl_workers_count = crawl_workers_count
self.crawl_queue.binary_queue_count = binary_queue_count
self.crawl_queue.binary_workers_count = binary_workers_count
self.crawl_queue.max_crawl_workers = max_crawl_workers
def update_process_panels(self, processes: List[Any], pending: Optional[List[Any]] = None) -> None:
"""Update process panels to show all running processes."""
panels = []
all_processes = list(processes) + list(pending or [])
fg_running = False
for process in processes:
if getattr(process, 'process_type', '') != 'hook':
continue
try:
cmd = getattr(process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
if '.bg.' in hook_name:
continue
if '.bg.' not in hook_name:
fg_running = True
break
except Exception:
continue
fg_pending = False
for process in (pending or []):
if getattr(process, 'process_type', '') != 'hook':
continue
try:
cmd = getattr(process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
if '.bg.' in hook_name:
continue
if '.bg.' not in hook_name:
fg_pending = True
break
except Exception:
continue
bg_terminating = bool(processes) and not fg_running and not fg_pending
for process in all_processes:
is_hook = getattr(process, 'process_type', '') == 'hook'
is_bg = False
if is_hook:
try:
cmd = getattr(process, 'cmd', [])
hook_path = Path(cmd[1]) if len(cmd) > 1 else None
hook_name = hook_path.name if hook_path else ''
is_bg = '.bg.' in hook_name
except Exception:
is_bg = False
if is_hook and is_bg:
continue
if not self._has_log_lines(process):
continue
is_pending = getattr(process, 'status', '') in ('queued', 'pending', 'backoff') or (is_hook and not getattr(process, 'pid', None))
max_lines = 2 if is_pending else (4 if is_bg else 7)
panels.append(ProcessLogPanel(process, max_lines=max_lines, compact=is_bg, bg_terminating=bg_terminating))
if not panels:
self.layout["processes"].size = 0
self.layout["processes"].update(Text(""))
self.process_panels = []
return
self.process_panels = panels
self.layout["processes"].size = None
self.layout["processes"].ratio = 1
self.layout["processes"].update(Columns(panels, equal=True, expand=True))
def update_crawl_tree(self, crawls: list[dict[str, Any]]) -> None:
"""Update the crawl queue tree panel."""
self.crawl_queue_tree.update_crawls(crawls)
# Auto-size crawl tree panel to content
line_count = 0
for crawl in crawls:
line_count += 1
for snap in crawl.get('snapshots', []) or []:
line_count += 1
if snap.get('output_path'):
line_count += 1
for _ in snap.get('hooks', []) or []:
line_count += 1
self.layout["crawl_tree"].size = max(4, line_count + 2)
def log_event(self, message: str, style: str = "white") -> None:
"""Add an event to the orchestrator log."""
return
def get_layout(self) -> Layout:
"""Get the Rich Layout object for rendering."""
return self.layout
def plain_lines(self) -> list[tuple[str, str]]:
lines: list[tuple[str, str]] = []
queue = self.crawl_queue
queue_line = (
f"Status: {queue.orchestrator_status} | Crawls: {queue.crawl_queue_count} queued | "
f"Binaries: {queue.binary_queue_count} queued | Workers: {queue.crawl_workers_count}/{queue.max_crawl_workers} "
f"crawl, {queue.binary_workers_count} binary"
)
lines.append(("crawl_queue", queue_line))
for panel in self.process_panels:
title = _strip_rich(panel._title())
for line in panel.plain_lines():
if line:
lines.append((title or "process", line))
for crawl in self.crawl_queue_tree.crawls:
crawl_line = f"{self.crawl_queue_tree._status_icon(crawl.get('status', ''))} {crawl.get('id', '')[:8]} {crawl.get('label', '')}".strip()
lines.append(("crawl_tree", crawl_line))
for snap in crawl.get('snapshots', []):
snap_line = f" {self.crawl_queue_tree._status_icon(snap.get('status', ''))} {snap.get('label', '')}".rstrip()
lines.append(("crawl_tree", snap_line))
output_path = snap.get('output_path', '')
if output_path:
lines.append(("crawl_tree", f" {output_path}"))
for hook in snap.get('hooks', []) or []:
status = hook.get('status', '')
path = hook.get('path', '')
icon, _ = self.crawl_queue_tree._hook_style(
status,
is_bg=hook.get('is_bg', False),
is_running=hook.get('is_running', False),
is_pending=hook.get('is_pending', False),
)
stats = self.crawl_queue_tree._hook_stats(
size=hook.get('size', ''),
elapsed=hook.get('elapsed', ''),
timeout=hook.get('timeout', ''),
status=status,
)
stderr_tail = hook.get('stderr', '')
hook_line = f" {icon} {path}{stats}".strip()
if stderr_tail:
avail = self.crawl_queue_tree._available_width(hook_line, indent=16)
trunc = getattr(self.crawl_queue_tree, "_truncate_tail", self.crawl_queue_tree._truncate_to_width)
stderr_tail = trunc(stderr_tail, avail)
if stderr_tail:
hook_line = f"{hook_line} {stderr_tail}"
if hook_line:
lines.append(("crawl_tree", hook_line))
return lines
@staticmethod
def _has_log_lines(process: Any) -> bool:
try:
stdout_lines = list(process.tail_stdout(lines=1, follow=False))
if any(line.strip() for line in stdout_lines):
return True
stderr_lines = list(process.tail_stderr(lines=1, follow=False))
if any(line.strip() for line in stderr_lines):
return True
except Exception:
return False
return False