diff --git a/archivebox/cli/archivebox_config.py b/archivebox/cli/archivebox_config.py index c96c0bde..aa576658 100644 --- a/archivebox/cli/archivebox_config.py +++ b/archivebox/cli/archivebox_config.py @@ -69,8 +69,9 @@ def config(*keys, # Display core config sections for config_section in CONFIGS.values(): - if hasattr(config_section, 'toml_section_header'): - print(f'[grey53]\\[{config_section.toml_section_header}][/grey53]') + section_header = getattr(config_section, 'toml_section_header', '') + if isinstance(section_header, str) and section_header: + print(f'[grey53]\\[{section_header}][/grey53]') else: print('[grey53]\\[CONSTANTS] # (read-only)[/grey53]') diff --git a/archivebox/cli/archivebox_init.py b/archivebox/cli/archivebox_init.py index 5cb6b283..90a50fa5 100755 --- a/archivebox/cli/archivebox_init.py +++ b/archivebox/cli/archivebox_init.py @@ -33,7 +33,7 @@ def init(force: bool=False, quick: bool=False, install: bool=False) -> None: from archivebox.config import CONSTANTS, VERSION, DATA_DIR from archivebox.config.common import SERVER_CONFIG from archivebox.config.collection import write_config_file - from archivebox.misc.legacy import parse_json_main_index, parse_json_links_details, SnapshotDict + from archivebox.misc.legacy import parse_json_main_index, parse_json_links_details from archivebox.misc.db import apply_migrations # if os.access(out_dir / CONSTANTS.JSON_INDEX_FILENAME, os.F_OK): diff --git a/archivebox/cli/archivebox_server.py b/archivebox/cli/archivebox_server.py index d3a31a3c..62c5bb95 100644 --- a/archivebox/cli/archivebox_server.py +++ b/archivebox/cli/archivebox_server.py @@ -121,13 +121,15 @@ def server(runserver_args: Iterable[str]=(SERVER_CONFIG.BIND_ADDR,), supervisor = get_existing_supervisord_process() if supervisor: daphne_proc = get_worker(supervisor, 'worker_daphne') + daphne_state = daphne_proc.get('statename') if isinstance(daphne_proc, dict) else None # If daphne is already running, error out - if daphne_proc and daphne_proc.get('statename') == 'RUNNING': + if daphne_state == 'RUNNING': orchestrator_proc = get_worker(supervisor, 'worker_orchestrator') + orchestrator_state = orchestrator_proc.get('statename') if isinstance(orchestrator_proc, dict) else None print('[red][X] Error: ArchiveBox server is already running[/red]') print(f' [green]√[/green] Web server (worker_daphne) is RUNNING on [deep_sky_blue4][link=http://{host}:{port}]http://{host}:{port}[/link][/deep_sky_blue4]') - if orchestrator_proc and orchestrator_proc.get('statename') == 'RUNNING': + if orchestrator_state == 'RUNNING': print(' [green]√[/green] Background worker (worker_orchestrator) is RUNNING') print() print('[yellow]To stop the existing server, run:[/yellow]') diff --git a/archivebox/cli/archivebox_version.py b/archivebox/cli/archivebox_version.py index c89298f9..1015111d 100755 --- a/archivebox/cli/archivebox_version.py +++ b/archivebox/cli/archivebox_version.py @@ -152,7 +152,7 @@ def version(quiet: bool=False, prnt('[deep_sky_blue3][i] Code locations:[/deep_sky_blue3]') try: for name, path in get_code_locations().items(): - if isinstance(path, dict): + if isinstance(name, str) and isinstance(path, dict): prnt(printable_folder_status(name, path), overflow='ignore', crop=False) except Exception as e: prnt(f' [red]Error getting code locations: {e}[/red]') @@ -162,7 +162,7 @@ def version(quiet: bool=False, prnt('[bright_yellow][i] Data locations:[/bright_yellow]') try: for name, path in get_data_locations().items(): - if isinstance(path, dict): + if isinstance(name, str) and isinstance(path, dict): prnt(printable_folder_status(name, path), overflow='ignore', crop=False) except Exception as e: prnt(f' [red]Error getting data locations: {e}[/red]') diff --git a/archivebox/workers/models.py b/archivebox/workers/models.py index e8edf0ec..ec78531a 100644 --- a/archivebox/workers/models.py +++ b/archivebox/workers/models.py @@ -28,18 +28,18 @@ ObjectStateList = Iterable[ObjectState] class BaseModelWithStateMachine(models.Model, MachineMixin): id: models.UUIDField - StatusChoices: ClassVar[Type[models.TextChoices]] + StatusChoices: ClassVar[Type[DefaultStatusChoices]] # status: models.CharField # retry_at: models.DateTimeField - state_machine_name: ClassVar[str] - state_field_name: ClassVar[str] - state_machine_attr: ClassVar[str] = 'sm' - bind_events_as_methods: ClassVar[bool] = True + state_machine_name: str | None + state_field_name: str + state_machine_attr: str = 'sm' + bind_events_as_methods: bool = True - active_state: ClassVar[ObjectState] - retry_at_field_name: ClassVar[str] + active_state: ObjectState + retry_at_field_name: str class Meta: app_label = 'workers' @@ -229,7 +229,7 @@ class BaseModelWithStateMachine(models.Model, MachineMixin): """ updated = cls.objects.filter( pk=obj.pk, - retry_at=obj.retry_at, + retry_at=obj.RETRY_AT, retry_at__lte=timezone.now(), ).update( retry_at=timezone.now() + timedelta(seconds=lock_seconds) @@ -271,7 +271,10 @@ class BaseModelWithStateMachine(models.Model, MachineMixin): if not self.claim_processing_lock(lock_seconds=lock_seconds): return False - self.sm.tick() + tick = getattr(getattr(self, self.state_machine_attr, None), 'tick', None) + if not callable(tick): + raise TypeError(f'{type(self).__name__}.{self.state_machine_attr}.tick() must be callable') + tick() self.refresh_from_db() return True @@ -281,7 +284,10 @@ class BaseModelWithStateMachine(models.Model, MachineMixin): @classproperty def INITIAL_STATE(cls) -> str: - return cls._state_to_str(cls.StateMachineClass.initial_state) + initial_state = cls.StateMachineClass.initial_state + if initial_state is None: + raise ValueError('StateMachineClass.initial_state must not be None') + return cls._state_to_str(initial_state) @classproperty def FINAL_STATES(cls) -> list[str]: @@ -311,7 +317,9 @@ class BaseModelWithStateMachine(models.Model, MachineMixin): joined[item[0]] = item[1] for item in extra_choices.choices: joined[item[0]] = item[1] - return models.TextChoices('StatusChoices', joined) + joined_choices = models.TextChoices('StatusChoices', joined) + assert isinstance(joined_choices, type) + return joined_choices return wrapper @classmethod @@ -359,27 +367,22 @@ class BaseModelWithStateMachine(models.Model, MachineMixin): StateMachineCls = registry.get_machine_cls(model_state_machine_name) assert issubclass(StateMachineCls, StateMachine) return StateMachineCls - raise NotImplementedError(f'ActorType[{cls.__name__}] must define .state_machine_name: str that points to a valid StateMachine') + raise NotImplementedError('ActorType must define .state_machine_name that points to a valid StateMachine') class ModelWithStateMachine(BaseModelWithStateMachine): - StatusChoices: ClassVar[Type[DefaultStatusChoices]] = DefaultStatusChoices + StatusChoices = DefaultStatusChoices status: models.CharField = BaseModelWithStateMachine.StatusField() retry_at: models.DateTimeField = BaseModelWithStateMachine.RetryAtField() - state_machine_name: ClassVar[str] # e.g. 'core.models.ArchiveResultMachine' - state_field_name: ClassVar[str] = 'status' - state_machine_attr: ClassVar[str] = 'sm' - bind_events_as_methods: ClassVar[bool] = True - - active_state: ClassVar[str] = StatusChoices.STARTED - retry_at_field_name: ClassVar[str] = 'retry_at' - - class Meta: - app_label = 'workers' - abstract = True + state_machine_name: str | None # e.g. 'core.models.ArchiveResultMachine' + state_field_name: str = 'status' + state_machine_attr: str = 'sm' + bind_events_as_methods: bool = True + active_state = StatusChoices.STARTED + retry_at_field_name: str = 'retry_at' class BaseStateMachine(StateMachine): """ diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index 36d4ac83..713ad923 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -26,6 +26,8 @@ from rich import print from archivebox.misc.logging_util import log_worker_event if TYPE_CHECKING: + from archivebox.crawls.models import Crawl + from archivebox.core.models import Snapshot from archivebox.machine.models import Process @@ -85,6 +87,10 @@ class Worker: """Get the Django model class. Subclasses must override this.""" raise NotImplementedError("Subclasses must implement get_model()") + def runloop(self) -> None: + """Execute the worker loop.""" + raise NotImplementedError("Subclasses must implement runloop()") + def on_startup(self) -> None: """Called when worker starts.""" from archivebox.machine.models import Process @@ -136,13 +142,14 @@ class Worker: if 'Snapshot' in worker_type_name: indent_level = 2 + log_error = error if isinstance(error, Exception) and not isinstance(error, KeyboardInterrupt) else None log_worker_event( worker_type=worker_type_name, event='Shutting down', indent_level=indent_level, pid=self.pid, worker_id=str(self.worker_id), - error=error if error and not isinstance(error, KeyboardInterrupt) else None, + error=log_error, ) def _terminate_background_hooks( @@ -494,9 +501,19 @@ class CrawlWorker(Worker): def __init__(self, crawl_id: str, **kwargs: Any): super().__init__(**kwargs) self.crawl_id = crawl_id - self.crawl = None + self._crawl: Crawl | None = None self.crawl_config = None + @property + def crawl(self) -> 'Crawl': + if self._crawl is None: + raise RuntimeError('CrawlWorker.crawl accessed before on_startup()') + return self._crawl + + @crawl.setter + def crawl(self, value: 'Crawl | None') -> None: + self._crawl = value + def get_model(self): from archivebox.crawls.models import Crawl return Crawl @@ -530,7 +547,10 @@ class CrawlWorker(Worker): # Advance state machine: QUEUED → STARTED (triggers run() via @started.enter) try: - self.crawl.sm.tick() + tick = getattr(getattr(self.crawl, 'sm', None), 'tick', None) + if not callable(tick): + raise RuntimeError('Crawl.sm.tick() is unavailable') + tick() except TransitionNotAllowed: if self.crawl.status == Crawl.StatusChoices.SEALED: print( @@ -555,7 +575,10 @@ class CrawlWorker(Worker): # Check if crawl is done if self._is_crawl_finished(): print('🔄 Crawl finished, sealing...', file=sys.stderr) - self.crawl.sm.seal() + seal = getattr(getattr(self.crawl, 'sm', None), 'seal', None) + if not callable(seal): + raise RuntimeError('Crawl.sm.seal() is unavailable') + seal() break # Spawn workers for queued snapshots @@ -662,9 +685,10 @@ class CrawlWorker(Worker): # Get the Process record that was just created worker_process = Process.objects.filter(pid=pid).first() if worker_process: + process_for_pipe = worker_process # Pipe stderr in background thread so it doesn't block - def pipe_worker_stderr(): - for line in worker_process.tail_stderr(lines=0, follow=True): + def pipe_worker_stderr() -> None: + for line in process_for_pipe.tail_stderr(lines=0, follow=True): print(f' [SnapshotWorker] {line}', file=sys.stderr, flush=True) thread = threading.Thread(target=pipe_worker_stderr, daemon=True) @@ -766,9 +790,19 @@ class SnapshotWorker(Worker): def __init__(self, snapshot_id: str, **kwargs: Any): super().__init__(**kwargs) self.snapshot_id = snapshot_id - self.snapshot = None + self._snapshot: Snapshot | None = None self.background_processes: dict[str, Any] = {} # hook_name -> Process + @property + def snapshot(self) -> 'Snapshot': + if self._snapshot is None: + raise RuntimeError('SnapshotWorker.snapshot accessed before on_startup()') + return self._snapshot + + @snapshot.setter + def snapshot(self, value: 'Snapshot | None') -> None: + self._snapshot = value + def get_model(self): """Not used - SnapshotWorker doesn't poll queues.""" from archivebox.core.models import Snapshot @@ -785,7 +819,10 @@ class SnapshotWorker(Worker): return # Use state machine to transition queued -> started (triggers enter_started()) - self.snapshot.sm.tick() + tick = getattr(getattr(self.snapshot, 'sm', None), 'tick', None) + if not callable(tick): + raise RuntimeError('Snapshot.sm.tick() is unavailable') + tick() self.snapshot.refresh_from_db() self.snapshot_started_at = self.snapshot.modified_at or self.snapshot.created_at @@ -881,13 +918,19 @@ class SnapshotWorker(Worker): self._retry_failed_empty_foreground_hooks(foreground_hooks, config) if self.snapshot.status != Snapshot.StatusChoices.SEALED: # This triggers enter_sealed() which calls cleanup() and checks parent crawl sealing - self.snapshot.sm.seal() + seal = getattr(getattr(self.snapshot, 'sm', None), 'seal', None) + if not callable(seal): + raise RuntimeError('Snapshot.sm.seal() is unavailable') + seal() self.snapshot.refresh_from_db() except Exception: # Mark snapshot as sealed even on error (still triggers cleanup) self._finalize_background_hooks() - self.snapshot.sm.seal() + seal = getattr(getattr(self.snapshot, 'sm', None), 'seal', None) + if not callable(seal): + raise RuntimeError('Snapshot.sm.seal() is unavailable') + seal() self.snapshot.refresh_from_db() raise finally: @@ -926,7 +969,7 @@ class SnapshotWorker(Worker): parent=self.db_process, url=str(self.snapshot.url), snapshot_id=str(self.snapshot.id), - _crawl_id=str(self.snapshot.crawl_id) if self.snapshot.crawl_id else None, + _crawl_id=str(self.snapshot.crawl.id), ) # Link ArchiveResult to Process for tracking @@ -1150,7 +1193,7 @@ class BinaryWorker(Worker): MAX_CONCURRENT_TASKS: ClassVar[int] = 1 # One binary per worker POLL_INTERVAL: ClassVar[float] = 0.5 # Check every 500ms (daemon mode only) - def __init__(self, binary_id: str = None, worker_id: int = 0): + def __init__(self, binary_id: str | None = None, worker_id: int = 0): self.binary_id = binary_id # Optional - None means daemon mode super().__init__(worker_id=worker_id) @@ -1158,24 +1201,27 @@ class BinaryWorker(Worker): from archivebox.machine.models import Binary return Binary - def get_next_item(self): - """Get binary to install (specific or next queued).""" - from archivebox.machine.models import Binary, Machine + def _get_binary(self): + """Get a specific binary in one-shot mode.""" + from archivebox.machine.models import Binary if self.binary_id: - # Specific binary mode try: return Binary.objects.get(id=self.binary_id) except Binary.DoesNotExist: return None - else: - # Daemon mode - get all queued binaries for current machine - machine = Machine.current() - return Binary.objects.filter( - machine=machine, - status=Binary.StatusChoices.QUEUED, - retry_at__lte=timezone.now() - ).order_by('retry_at', 'created_at', 'name') + return None + + def _get_pending_binaries(self): + """Get all queued binaries for the current machine.""" + from archivebox.machine.models import Binary, Machine + + machine = Machine.current() + return Binary.objects.filter( + machine=machine, + status=Binary.StatusChoices.QUEUED, + retry_at__lte=timezone.now() + ).order_by('retry_at', 'created_at', 'name') def runloop(self) -> None: """Install binary(ies).""" @@ -1196,7 +1242,7 @@ class BinaryWorker(Worker): import sys try: - binary = self.get_next_item() + binary = self._get_binary() if not binary: log_worker_event( @@ -1251,7 +1297,7 @@ class BinaryWorker(Worker): try: while True: # Get all pending binaries - pending_binaries = list(self.get_next_item()) + pending_binaries = list(self._get_pending_binaries()) if not pending_binaries: idle_count += 1 diff --git a/bin/lint.sh b/bin/lint.sh index 6797b6d3..2a1d6d54 100755 --- a/bin/lint.sh +++ b/bin/lint.sh @@ -14,11 +14,20 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && cd .. && pwd )" source "$DIR/.venv/bin/activate" -echo "[*] Running flake8..." -cd "$DIR/archivebox" -flake8 . && echo "√ No errors found." +cd "$DIR" + +echo "[*] Running ruff..." +ruff check archivebox +echo "√ No errors found." echo -echo "[*] Running mypy..." -echo "(skipping for now, run 'mypy archivebox' to run it manually)" +echo "[*] Running pyright..." +pyright +echo "√ No errors found." + +echo + +echo "[*] Running ty..." +ty check archivebox +echo "√ No errors found." diff --git a/pyproject.toml b/pyproject.toml index 50c9132d..f0f2f779 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -151,8 +151,8 @@ dev = [ "bottle>=0.13.1", ### LINTING "ruff>=0.6.6", - "flake8>=7.1.1", - "mypy>=1.11.2", + "pyright>=1.1.406", + "ty>=0.0.1a19", ] [tool.uv]