Files
emulsion/src/emulsion/batch_runner.py
Alexander Wainwright f4aacb9b26 Initial re-write
2026-01-31 10:59:38 +10:00

98 lines
2.4 KiB
Python

from concurrent.futures import Future, ThreadPoolExecutor
from emulsion.executor import Executor, Task
class BatchRunner:
"""
Controller that manages the execution of tasks via a ThreadPoolExecutor.
It allows the UI (View) to poll for status updates, decoupling the
execution logic from the display logic.
"""
def __init__(
self,
executor: Executor,
tasks: list[Task],
dry_run: bool,
workers: int = 1,
) -> None:
self.executor = executor
self.tasks = tasks
self.dry_run = dry_run
self.workers = workers
# State
self.pending_tasks = list(tasks)
self.completed_tasks: list[
tuple[Task, bool, str]
] = [] # (Task, Success, Msg)
# Internals
self._pool: ThreadPoolExecutor | None = None
self._futures: dict[Future[tuple[bool, str]], Task] = {}
self._started = False
def start(self) -> None:
"""Starts the background workers."""
if self._started:
return
self._started = True
self._pool = ThreadPoolExecutor(max_workers=self.workers)
for task in self.tasks:
future = self._pool.submit(
self.executor.execute_task, task, self.dry_run
)
self._futures[future] = task
def update(self) -> list[tuple[Task, bool, str]]:
"""
Checks for completed tasks.
Returns a list of newly completed tasks since the last call.
"""
if not self._started or not self._futures:
return []
# Check for completed futures
# We use a quick check logic. Since as_completed is blocking-ish or
# iterator based, we might just want to check `done()` on known futures
# to be non-blocking for the TUI loop.
newly_completed = []
done_futures = []
for future, task in self._futures.items():
if future.done():
try:
success, msg = future.result()
except Exception as e:
success = False
msg = str(e)
self.completed_tasks.append((task, success, msg))
newly_completed.append((task, success, msg))
if task in self.pending_tasks:
self.pending_tasks.remove(task)
done_futures.append(future)
# Clean up processed futures
for f in done_futures:
del self._futures[f]
return newly_completed
def is_done(self) -> bool:
return self._started and len(self.pending_tasks) == 0
def shutdown(self) -> None:
if self._pool:
self._pool.shutdown(wait=False, cancel_futures=True)
@property
def progress(self) -> tuple[int, int]:
"""Returns (completed_count, total_count)"""
return len(self.completed_tasks), len(self.tasks)