Refactor and improve behaviour

This commit is contained in:
Alexander Wainwright
2026-02-01 20:55:56 +10:00
parent 9f7108d22a
commit 35f6137c4f

View File

@@ -1,166 +1,279 @@
from __future__ import annotations
import time
import logging
import math
import time
from abc import ABC, abstractmethod
from collections import deque
from dataclasses import astuple, dataclass
from typing import TYPE_CHECKING
from emulsion.si_format import format_si
from emulsion.timeformat import format_time
if TYPE_CHECKING:
from blessed import Terminal
def format_eta(seconds: int) -> str:
if seconds < 0:
raise ValueError('seconds must be non-negative')
hours, remainder = divmod(seconds, 3600)
minutes, secs = divmod(remainder, 60)
if hours > 0:
return f'{hours}h {minutes}m'
if minutes > 0:
return f'{minutes}m {secs}s'
return f'{secs}s'
class SmoothingFunction(ABC):
@abstractmethod
def update(self, x: float | None) -> float | None:
pass
class EMA:
class EMA(SmoothingFunction):
def __init__(
self, alpha: float = 0.1, initial: float | None = None
self, alpha: float = 0.01, initial: float | None = None
) -> None:
if not 0 < alpha <= 1:
raise ValueError('alpha must be in (0, 1]')
self.alpha = alpha
self.value = initial
self.smooth_val = initial
def update(self, x: float) -> float:
if self.value is None:
self.value = x
else:
self.value = self.alpha * x + (1 - self.alpha) * self.value
return self.value
def update(self, x: float | None) -> float | None:
if x and self.smooth_val:
self.smooth_val = (
self.alpha * x + (1 - self.alpha) * self.smooth_val
)
elif x:
self.smooth_val = x
return self.smooth_val
class ProgressBar:
def __init__(self, term: Terminal) -> None:
self.term = term
self._last_ticks = 0
self._last_rendered_ticks = 0
self._high_res_progress = False
self._smoothing = EMA()
self.spinner_chars = '▁▂▃▄▅▆▇█▇▆▅▄▃▁'
self.tick_frames = [
'',
'',
'',
'',
'',
'',
'',
'',
]
self.empty_char = ' '
self.full_block = ''
self.n_tick_frames = len(self.tick_frames)
self._tick_deltas = deque(maxlen=self.n_tick_frames)
self.start_time = time.time()
def update(self, current: int, total: int, width: int) -> str:
"""
Renders a Braille bar with dynamic resolution scaling.
Snaps to whole blocks if advancement is too fast to prevent aliasing.
"""
t = self.term
class RollingWindow(SmoothingFunction):
def __init__(self, size: int = 10) -> None:
self._buffer = deque(maxlen=size)
def update(self, x: float | None) -> float | None:
if x is not None:
self._buffer.append(x)
if len(self._buffer) == 0:
return None
return sum(self._buffer) / len(self._buffer)
class TimeBasedEMA(SmoothingFunction):
def __init__(
self, time_window: float = 40.0, initial: float | None = None
) -> None:
if time_window <= 0:
raise ValueError('time_window must be > 0')
self.tau = time_window
self.smooth_val = initial
self.last_time: float | None = None
def update(self, x: float | None) -> float | None:
now = time.monotonic()
if x is None:
return self.smooth_val
if self.smooth_val is None or self.last_time is None:
self.smooth_val = x
self.last_time = now
return x
dt = now - self.last_time
self.last_time = now
# Guard against extremely fast updates or clock quirks
if dt <= 0:
return self.smooth_val
alpha = 1.0 - math.exp(-dt / self.tau)
self.smooth_val = alpha * x + (1.0 - alpha) * self.smooth_val
return self.smooth_val
@dataclass
class PhysicsState:
rate: float | None = None
eta: float | None = None
percent: float = 0.0
def __iter__(self) -> tuple[
float | None,
float | None,
float,
]:
return iter(astuple(self))
class Physics:
def __init__(self) -> None:
# state variables
self._state = PhysicsState()
# last run data
self._last_time: float | None = None
self._last_count: float | None = None
# other shit
# self._smoothing = RollingWindow()
self._smoothing: SmoothingFunction = TimeBasedEMA()
self._start_time = time.time()
def update(self, count: int, total: int) -> PhysicsState:
"""Calculate the rate, eta and percent"""
now = time.time()
# on updates to make if the count has not changed
if count == self._last_count:
return self._state
# percentage complete
if total == 0:
percent = 0.0
self._state.percent = 0.0
else:
percent = current / total
self._state.percent = count / total
# Spinner animation
spinner_idx = int(time.time() * 10) % len(self.spinner_chars)
spinner = t.bold(self.spinner_chars[spinner_idx])
# Stats text
elapsed = time.time() - self.start_time
rate = current / elapsed if elapsed > 0 else 0
smooth_rate = self._smoothing.update(rate)
# processing rate
rate: float | None = None
try:
eta = int((total - current) / smooth_rate)
eta_s = format_eta(eta)
if self._last_count and self._last_time:
rate = (count - self._last_count) / (now - self._last_time)
except ZeroDivisionError:
eta_s = '???'
logging.debug('zero division')
rate = None
STATS_WIDTH = 50
self._state.rate = self._smoothing.update(rate)
rate_val = format_si(smooth_rate, sigfigs=2, width=3).strip()
if self._state.rate:
self._state.eta = int((total - count) / self._state.rate)
parts = [
f'{current}/{total}',
f'{int(percent * 100)}%',
f'eta {eta_s}',
f'{rate_val} it/s'
]
logging.debug('eta: %f', self._state.eta)
separator = ""
content = separator.join(parts)
self._last_count = count
self._last_time = now
stats = f'{content: <{STATS_WIDTH}}'
return self._state
bar_area_width = max(10, width - len(stats) - 4)
total_ticks = bar_area_width * self.n_tick_frames
class BrailleBar:
"""The windy man. The long mover."""
def __init__(self) -> None:
self.tick_frames = ['', '', '', '', '', '', '', '']
self.full_block = ''
self.empty_char = ' '
self.n_tick_frames = len(self.tick_frames)
# Internal display state
self._last_width: int | None = None
self._last_ticks: float = 0.0
self._last_rendered_ticks: int = 0
self._high_res_progress: bool = False
def render(self, percent: float, width: int) -> str:
total_ticks = width * self.n_tick_frames
current_ticks_fraction = percent * total_ticks
current_ticks = int(current_ticks_fraction)
logging.debug('%d, %d', current_ticks, total_ticks)
# check if we are moving faster than the tick size
# Hysteresis / Anti-flicker logic. Check if we are moving faster than
# the tick size
if current_ticks_fraction != self._last_ticks:
delta = abs(current_ticks_fraction - self._last_ticks)
self._tick_deltas.append(delta)
logging.debug(str(self._tick_deltas))
logging.debug(f'delta: {delta}')
self._high_res_progress = delta <= 1
delta_ticks = abs(current_ticks_fraction - self._last_ticks)
self._high_res_progress = delta_ticks <= 1
self._last_ticks = current_ticks_fraction
logging.debug(f'high res {self._high_res_progress}')
# if we are not high res, let's lop off the remainder. we don't
# If we are not high res, let's lop off the remainder. We don't
# explicitly set the remainder to zero so that we can avoid flickering
# when on the cusp of high res and low res
if not self._high_res_progress:
current_ticks = current_ticks - (
# we mod my half here as we can cleanly render at half steps
# in braille
current_ticks % (-(-self.n_tick_frames // 2))
)
# we mod my half here as we can cleanly render at half steps
# in braille
step = -(-self.n_tick_frames // 2)
current_ticks -= current_ticks % step
# latch to make sure we don't go backwards if we switch from high res to
# low res
current_ticks = max(self._last_rendered_ticks, current_ticks)
# latch to make sure we don't go backwards if we switch resolution
# but release the latch if the terminal has resized
if self._last_width == width:
current_ticks = max(self._last_rendered_ticks, current_ticks)
self._last_rendered_ticks = current_ticks
self._last_width = width
remainder_idx = current_ticks % self.n_tick_frames
full_blocks_count = current_ticks // self.n_tick_frames
# String construction
partial_char = ''
if full_blocks_count < bar_area_width and remainder_idx > 0:
if full_blocks_count < width and remainder_idx > 0:
partial_char = self.tick_frames[remainder_idx]
# Fill the rest
# Note: We subtract len(partial_char) which is 0 or 1
empty_blocks_count = (
bar_area_width - full_blocks_count - len(partial_char)
)
empty_blocks_count = width - full_blocks_count - len(partial_char)
bar_segment = (
return (
(self.full_block * full_blocks_count)
+ partial_char
+ (self.empty_char * empty_blocks_count)
)
return f'{spinner}{t.bold(bar_segment)}{stats}'
class ProgressBar:
def __init__(self, term: Terminal) -> None:
self.term: Terminal = term
self.physics = Physics()
self.bar_renderer = BrailleBar()
self.spinner_chars = '▁▂▃▄▅▆▇█▇▆▅▄▃▂▁'
def _get_spinner(self, now: float) -> str:
spinner_idx = int(now * 10) % len(self.spinner_chars)
return self.spinner_chars[spinner_idx]
def _render_stats(
self,
current: int,
total: int,
rate: float,
eta: float,
percent: float,
) -> str:
eta_s = format_time(int(eta)) if eta is not None else '???'
rate_val = format_si(rate or 0, sigfigs=2, width=3).strip()
parts = [
f'{current}/{total}',
f'{int(percent * 100)}%',
f'eta {eta_s}',
f'{rate_val} it/s',
]
separator = ''
content = separator.join(parts)
STATS_WIDTH = 50
return f'{content: <{STATS_WIDTH}}'
def update(self, current: int, total: int, width: int) -> str:
"""Renders a Braille bar with dynamic resolution scaling.
Snaps to whole blocks if advancement is too fast to prevent aliasing.
Arguments:
current (int): The number of steps completed
total (int): The total number of steps
width (int): The terminal width
Returns:
str: The rendered progress bar
"""
now = time.time()
# Spinner animation
spinner = self._get_spinner(now)
# calculate rate info
rate, eta, percent = self.physics.update(current, total)
# render that section
stats = self._render_stats(current, total, rate, eta, percent)
bar_area_width = max(10, width - len(stats) - 4)
bar_segment = self.bar_renderer.render(percent, bar_area_width)
return f'{spinner}{bar_segment}{stats}'