From 35f6137c4ff23d231d9394789bf5e1daf2dab141 Mon Sep 17 00:00:00 2001 From: Alexander Wainwright Date: Sun, 1 Feb 2026 20:55:56 +1000 Subject: [PATCH] Refactor and improve behaviour --- src/emulsion/progressbar.py | 323 ++++++++++++++++++++++++------------ 1 file changed, 218 insertions(+), 105 deletions(-) diff --git a/src/emulsion/progressbar.py b/src/emulsion/progressbar.py index 9db7658..87774b2 100644 --- a/src/emulsion/progressbar.py +++ b/src/emulsion/progressbar.py @@ -1,166 +1,279 @@ from __future__ import annotations -import time import logging - +import math +import time +from abc import ABC, abstractmethod from collections import deque +from dataclasses import astuple, dataclass from typing import TYPE_CHECKING from emulsion.si_format import format_si +from emulsion.timeformat import format_time if TYPE_CHECKING: from blessed import Terminal -def format_eta(seconds: int) -> str: - if seconds < 0: - raise ValueError('seconds must be non-negative') - - hours, remainder = divmod(seconds, 3600) - minutes, secs = divmod(remainder, 60) - - if hours > 0: - return f'{hours}h {minutes}m' - if minutes > 0: - return f'{minutes}m {secs}s' - return f'{secs}s' +class SmoothingFunction(ABC): + @abstractmethod + def update(self, x: float | None) -> float | None: + pass -class EMA: +class EMA(SmoothingFunction): def __init__( - self, alpha: float = 0.1, initial: float | None = None + self, alpha: float = 0.01, initial: float | None = None ) -> None: if not 0 < alpha <= 1: raise ValueError('alpha must be in (0, 1]') self.alpha = alpha - self.value = initial + self.smooth_val = initial - def update(self, x: float) -> float: - if self.value is None: - self.value = x - else: - self.value = self.alpha * x + (1 - self.alpha) * self.value - return self.value + def update(self, x: float | None) -> float | None: + if x and self.smooth_val: + self.smooth_val = ( + self.alpha * x + (1 - self.alpha) * self.smooth_val + ) + elif x: + self.smooth_val = x + return self.smooth_val -class ProgressBar: - def __init__(self, term: Terminal) -> None: - self.term = term - self._last_ticks = 0 - self._last_rendered_ticks = 0 - self._high_res_progress = False - self._smoothing = EMA() - self.spinner_chars = '▁▂▃▄▅▆▇█▇▆▅▄▃▁' - self.tick_frames = [ - '⠀', - '⡀', - '⡄', - '⡆', - '⡇', - '⡏', - '⡟', - '⡿', - ] - self.empty_char = ' ' - self.full_block = '⣿' - self.n_tick_frames = len(self.tick_frames) - self._tick_deltas = deque(maxlen=self.n_tick_frames) - self.start_time = time.time() - def update(self, current: int, total: int, width: int) -> str: - """ - Renders a Braille bar with dynamic resolution scaling. - Snaps to whole blocks if advancement is too fast to prevent aliasing. - """ - t = self.term +class RollingWindow(SmoothingFunction): + def __init__(self, size: int = 10) -> None: + self._buffer = deque(maxlen=size) + + def update(self, x: float | None) -> float | None: + if x is not None: + self._buffer.append(x) + if len(self._buffer) == 0: + return None + return sum(self._buffer) / len(self._buffer) + + +class TimeBasedEMA(SmoothingFunction): + def __init__( + self, time_window: float = 40.0, initial: float | None = None + ) -> None: + if time_window <= 0: + raise ValueError('time_window must be > 0') + self.tau = time_window + self.smooth_val = initial + self.last_time: float | None = None + + def update(self, x: float | None) -> float | None: + now = time.monotonic() + + if x is None: + return self.smooth_val + + if self.smooth_val is None or self.last_time is None: + self.smooth_val = x + self.last_time = now + return x + + dt = now - self.last_time + self.last_time = now + + # Guard against extremely fast updates or clock quirks + if dt <= 0: + return self.smooth_val + + alpha = 1.0 - math.exp(-dt / self.tau) + self.smooth_val = alpha * x + (1.0 - alpha) * self.smooth_val + + return self.smooth_val + + +@dataclass +class PhysicsState: + rate: float | None = None + eta: float | None = None + percent: float = 0.0 + + def __iter__(self) -> tuple[ + float | None, + float | None, + float, + ]: + return iter(astuple(self)) + + +class Physics: + def __init__(self) -> None: + # state variables + self._state = PhysicsState() + + # last run data + self._last_time: float | None = None + self._last_count: float | None = None + + # other shit + # self._smoothing = RollingWindow() + self._smoothing: SmoothingFunction = TimeBasedEMA() + self._start_time = time.time() + + def update(self, count: int, total: int) -> PhysicsState: + """Calculate the rate, eta and percent""" + now = time.time() + + # on updates to make if the count has not changed + if count == self._last_count: + return self._state + + # percentage complete if total == 0: - percent = 0.0 + self._state.percent = 0.0 else: - percent = current / total + self._state.percent = count / total - # Spinner animation - spinner_idx = int(time.time() * 10) % len(self.spinner_chars) - spinner = t.bold(self.spinner_chars[spinner_idx]) - - # Stats text - elapsed = time.time() - self.start_time - rate = current / elapsed if elapsed > 0 else 0 - smooth_rate = self._smoothing.update(rate) + # processing rate + rate: float | None = None try: - eta = int((total - current) / smooth_rate) - eta_s = format_eta(eta) + if self._last_count and self._last_time: + rate = (count - self._last_count) / (now - self._last_time) except ZeroDivisionError: - eta_s = '???' + logging.debug('zero division') + rate = None - STATS_WIDTH = 50 + self._state.rate = self._smoothing.update(rate) - rate_val = format_si(smooth_rate, sigfigs=2, width=3).strip() + if self._state.rate: + self._state.eta = int((total - count) / self._state.rate) - parts = [ - f'{current}/{total}', - f'{int(percent * 100)}%', - f'eta {eta_s}', - f'{rate_val} it/s' - ] + logging.debug('eta: %f', self._state.eta) - separator = " • " - content = separator.join(parts) + self._last_count = count + self._last_time = now - stats = f'{content: <{STATS_WIDTH}}' + return self._state - bar_area_width = max(10, width - len(stats) - 4) - total_ticks = bar_area_width * self.n_tick_frames +class BrailleBar: + """The windy man. The long mover.""" + + def __init__(self) -> None: + self.tick_frames = ['⠀', '⡀', '⡄', '⡆', '⡇', '⡏', '⡟', '⡿'] + self.full_block = '⣿' + self.empty_char = ' ' + self.n_tick_frames = len(self.tick_frames) + + # Internal display state + self._last_width: int | None = None + self._last_ticks: float = 0.0 + self._last_rendered_ticks: int = 0 + self._high_res_progress: bool = False + + def render(self, percent: float, width: int) -> str: + total_ticks = width * self.n_tick_frames current_ticks_fraction = percent * total_ticks current_ticks = int(current_ticks_fraction) - logging.debug('%d, %d', current_ticks, total_ticks) - # check if we are moving faster than the tick size + # Hysteresis / Anti-flicker logic. Check if we are moving faster than + # the tick size if current_ticks_fraction != self._last_ticks: - delta = abs(current_ticks_fraction - self._last_ticks) - self._tick_deltas.append(delta) - logging.debug(str(self._tick_deltas)) - - logging.debug(f'delta: {delta}') - - self._high_res_progress = delta <= 1 - + delta_ticks = abs(current_ticks_fraction - self._last_ticks) + self._high_res_progress = delta_ticks <= 1 self._last_ticks = current_ticks_fraction - logging.debug(f'high res {self._high_res_progress}') - - # if we are not high res, let's lop off the remainder. we don't + # If we are not high res, let's lop off the remainder. We don't # explicitly set the remainder to zero so that we can avoid flickering # when on the cusp of high res and low res if not self._high_res_progress: - current_ticks = current_ticks - ( - # we mod my half here as we can cleanly render at half steps - # in braille - current_ticks % (-(-self.n_tick_frames // 2)) - ) + # we mod my half here as we can cleanly render at half steps + # in braille + step = -(-self.n_tick_frames // 2) + current_ticks -= current_ticks % step - # latch to make sure we don't go backwards if we switch from high res to - # low res - current_ticks = max(self._last_rendered_ticks, current_ticks) + # latch to make sure we don't go backwards if we switch resolution + # but release the latch if the terminal has resized + if self._last_width == width: + current_ticks = max(self._last_rendered_ticks, current_ticks) self._last_rendered_ticks = current_ticks + self._last_width = width remainder_idx = current_ticks % self.n_tick_frames full_blocks_count = current_ticks // self.n_tick_frames + # String construction partial_char = '' - if full_blocks_count < bar_area_width and remainder_idx > 0: + if full_blocks_count < width and remainder_idx > 0: partial_char = self.tick_frames[remainder_idx] # Fill the rest # Note: We subtract len(partial_char) which is 0 or 1 - empty_blocks_count = ( - bar_area_width - full_blocks_count - len(partial_char) - ) + empty_blocks_count = width - full_blocks_count - len(partial_char) - bar_segment = ( + return ( (self.full_block * full_blocks_count) + partial_char + (self.empty_char * empty_blocks_count) ) - return f'{spinner} │{t.bold(bar_segment)}│ {stats}' + +class ProgressBar: + def __init__(self, term: Terminal) -> None: + self.term: Terminal = term + self.physics = Physics() + self.bar_renderer = BrailleBar() + self.spinner_chars = '▁▂▃▄▅▆▇█▇▆▅▄▃▂▁' + + def _get_spinner(self, now: float) -> str: + spinner_idx = int(now * 10) % len(self.spinner_chars) + return self.spinner_chars[spinner_idx] + + def _render_stats( + self, + current: int, + total: int, + rate: float, + eta: float, + percent: float, + ) -> str: + eta_s = format_time(int(eta)) if eta is not None else '???' + + rate_val = format_si(rate or 0, sigfigs=2, width=3).strip() + + parts = [ + f'{current}/{total}', + f'{int(percent * 100)}%', + f'eta {eta_s}', + f'{rate_val} it/s', + ] + + separator = ' • ' + content = separator.join(parts) + + STATS_WIDTH = 50 + return f'{content: <{STATS_WIDTH}}' + + def update(self, current: int, total: int, width: int) -> str: + """Renders a Braille bar with dynamic resolution scaling. + + Snaps to whole blocks if advancement is too fast to prevent aliasing. + + Arguments: + current (int): The number of steps completed + total (int): The total number of steps + width (int): The terminal width + + Returns: + str: The rendered progress bar + """ + now = time.time() + + # Spinner animation + spinner = self._get_spinner(now) + + # calculate rate info + rate, eta, percent = self.physics.update(current, total) + # render that section + stats = self._render_stats(current, total, rate, eta, percent) + + bar_area_width = max(10, width - len(stats) - 4) + + bar_segment = self.bar_renderer.render(percent, bar_area_width) + + return f'{spinner} │{bar_segment}│ {stats}'