From bc21d4bfdb358923a24898895c2c8734c701569b Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Sun, 15 Mar 2026 20:12:27 -0700 Subject: [PATCH] type and test fixes --- README.md | 4 +- archivebox/__init__.py | 3 - archivebox/cli/archivebox_search.py | 7 +- archivebox/cli/tests_piping.py | 47 ++--- archivebox/config/constants.py | 12 +- archivebox/core/admin_tags.py | 8 +- archivebox/core/admin_users.py | 5 +- archivebox/core/forms.py | 49 +++-- .../core/management/commands/archivebox.py | 6 +- archivebox/core/middleware.py | 2 + ...options_alter_snapshot_options_and_more.py | 7 +- .../migrations/0030_alter_archiveresult_id.py | 5 +- archivebox/machine/migrations/0006_process.py | 5 +- .../machine/tests/test_machine_models.py | 12 +- archivebox/mcp/server.py | 35 ++-- archivebox/misc/db.py | 6 +- archivebox/misc/logging_util.py | 5 +- archivebox/misc/monkey_patches.py | 2 +- archivebox/misc/paginators.py | 13 +- archivebox/misc/progress_layout.py | 6 +- archivebox/misc/serve_static.py | 2 +- archivebox/misc/system.py | 14 +- archivebox/misc/toml_util.py | 10 +- archivebox/misc/util.py | 71 ++++--- archivebox/mypy.ini | 3 - .../migrations/0002_alter_persona_id.py | 5 +- archivebox/tests/conftest.py | 6 +- archivebox/tests/migrations_helpers.py | 2 +- archivebox/tests/test_add.py | 166 --------------- archivebox/tests/test_admin_views.py | 4 +- archivebox/tests/test_cli_add.py | 77 ++++++- archivebox/tests/test_cli_init.py | 21 ++ archivebox/tests/test_cli_install.py | 53 +++++ archivebox/tests/test_cli_list.py | 146 +++++++++++++ archivebox/tests/test_cli_remove.py | 63 ++++-- archivebox/tests/test_cli_search.py | 143 +++++++++++++ archivebox/tests/test_cli_status.py | 21 ++ archivebox/tests/test_cli_version.py | 28 +++ archivebox/tests/test_init.py | 94 --------- archivebox/tests/test_install.py | 128 ------------ archivebox/tests/test_list.py | 98 --------- archivebox/tests/test_migrations_04_to_09.py | 3 +- archivebox/tests/test_remove.py | 89 -------- archivebox/tests/test_search.py | 142 ------------- archivebox/tests/test_status.py | 195 ------------------ archivebox/tests/test_version.py | 158 -------------- archivebox/uuid_compat.py | 35 +--- archivebox/workers/orchestrator.py | 22 +- archivebox/workers/supervisord_util.py | 1 + .../workers/tests/test_scheduled_crawls.py | 9 +- .../workers/tests/test_snapshot_worker.py | 13 +- pyproject.toml | 18 +- 52 files changed, 762 insertions(+), 1317 deletions(-) delete mode 100644 archivebox/mypy.ini delete mode 100644 archivebox/tests/test_add.py create mode 100644 archivebox/tests/test_cli_list.py delete mode 100644 archivebox/tests/test_init.py delete mode 100644 archivebox/tests/test_install.py delete mode 100644 archivebox/tests/test_list.py delete mode 100644 archivebox/tests/test_remove.py delete mode 100644 archivebox/tests/test_search.py delete mode 100644 archivebox/tests/test_status.py delete mode 100644 archivebox/tests/test_version.py diff --git a/README.md b/README.md index 8f5db3cd..ea9e84e1 100644 --- a/README.md +++ b/README.md @@ -795,7 +795,7 @@ ArchiveBox bundles industry-standard tools like [Google Chrome](https://github.c
  • Web Server: Django + daphne (ASGI)
  • Database: Django ORM saving to SQLite3 ./data/index.sqlite3
  • Job Queue: Custom orchestrator using supervisord for worker management
  • -
  • Build/test/lint: uv / mypy+pyright+pytest / ruff
  • +
  • Build/test/lint: uv / pyright+ty+pytest / ruff
  • Subdependencies: abx-pkg installs apt/brew/pip/npm pkgs at runtime (e.g. yt-dlp, singlefile, readability, git)
  • @@ -1464,7 +1464,7 @@ archivebox install ./bin/lint.sh ./bin/test.sh ``` -(uses `flake8`, `mypy`, and `pytest -s`) +(uses `ruff`, `pyright`, `ty`, and `pytest -s`) diff --git a/archivebox/__init__.py b/archivebox/__init__.py index af73f6e2..3bfd3dde 100755 --- a/archivebox/__init__.py +++ b/archivebox/__init__.py @@ -16,9 +16,6 @@ import sys from pathlib import Path from typing import Protocol, cast -# Import uuid_compat early to monkey-patch uuid.uuid7 before Django loads migrations -# This fixes migrations generated on Python 3.14+ that reference uuid.uuid7 directly -from archivebox import uuid_compat # noqa: F401 from abx_plugins import get_plugins_dir diff --git a/archivebox/cli/archivebox_search.py b/archivebox/cli/archivebox_search.py index fa7b9405..6a421138 100644 --- a/archivebox/cli/archivebox_search.py +++ b/archivebox/cli/archivebox_search.py @@ -8,7 +8,6 @@ from pathlib import Path from typing import TYPE_CHECKING, Callable import rich_click as click -from rich import print from django.db.models import Q, QuerySet @@ -212,7 +211,11 @@ def search(filter_patterns: list[str] | None=None, folders: dict[str, Snapshot | None] = {snapshot.output_dir: snapshot for snapshot in snapshots} output = printable_folders(folders, with_headers) - print(output) + # Structured exports must be written directly to stdout. + # rich.print() reflows long lines to console width, which corrupts JSON/CSV/HTML output. + sys.stdout.write(output) + if not output.endswith('\n'): + sys.stdout.write('\n') return output diff --git a/archivebox/cli/tests_piping.py b/archivebox/cli/tests_piping.py index 623c2567..c36ae8e4 100644 --- a/archivebox/cli/tests_piping.py +++ b/archivebox/cli/tests_piping.py @@ -29,6 +29,7 @@ import tempfile import unittest from io import StringIO from pathlib import Path +from typing import TypeVar # Test configuration - disable slow extractors TEST_CONFIG = { @@ -58,6 +59,14 @@ TEST_CONFIG = { os.environ.update(TEST_CONFIG) +T = TypeVar('T') + + +def require(value: T | None) -> T: + if value is None: + raise AssertionError('Expected value to be present') + return value + # ============================================================================= # JSONL Utility Tests @@ -70,8 +79,7 @@ class TestJSONLParsing(unittest.TestCase): """Plain URLs should be parsed as Snapshot records.""" from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT - result = parse_line('https://example.com') - self.assertIsNotNone(result) + result = require(parse_line('https://example.com')) self.assertEqual(result['type'], TYPE_SNAPSHOT) self.assertEqual(result['url'], 'https://example.com') @@ -80,8 +88,7 @@ class TestJSONLParsing(unittest.TestCase): from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT line = '{"type": "Snapshot", "url": "https://example.com", "tags": "test,demo"}' - result = parse_line(line) - self.assertIsNotNone(result) + result = require(parse_line(line)) self.assertEqual(result['type'], TYPE_SNAPSHOT) self.assertEqual(result['url'], 'https://example.com') self.assertEqual(result['tags'], 'test,demo') @@ -91,8 +98,7 @@ class TestJSONLParsing(unittest.TestCase): from archivebox.misc.jsonl import parse_line, TYPE_CRAWL line = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com", "max_depth": 1}' - result = parse_line(line) - self.assertIsNotNone(result) + result = require(parse_line(line)) self.assertEqual(result['type'], TYPE_CRAWL) self.assertEqual(result['id'], 'abc123') self.assertEqual(result['urls'], 'https://example.com') @@ -103,8 +109,7 @@ class TestJSONLParsing(unittest.TestCase): from archivebox.misc.jsonl import parse_line line = '{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}' - result = parse_line(line) - self.assertIsNotNone(result) + result = require(parse_line(line)) self.assertEqual(result['id'], 'abc123') self.assertEqual(result['url'], 'https://example.com') @@ -113,8 +118,7 @@ class TestJSONLParsing(unittest.TestCase): from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT uuid = '01234567-89ab-cdef-0123-456789abcdef' - result = parse_line(uuid) - self.assertIsNotNone(result) + result = require(parse_line(uuid)) self.assertEqual(result['type'], TYPE_SNAPSHOT) self.assertEqual(result['id'], uuid) @@ -144,8 +148,7 @@ class TestJSONLParsing(unittest.TestCase): """file:// URLs should be parsed.""" from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT - result = parse_line('file:///path/to/file.txt') - self.assertIsNotNone(result) + result = require(parse_line('file:///path/to/file.txt')) self.assertEqual(result['type'], TYPE_SNAPSHOT) self.assertEqual(result['url'], 'file:///path/to/file.txt') @@ -501,9 +504,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): # Create crawl with multiple URLs (as newline-separated string) urls = 'https://test-crawl-1.example.com\nhttps://test-crawl-2.example.com' - crawl = Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id}) - - self.assertIsNotNone(crawl) + crawl = require(Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id})) self.assertIsNotNone(crawl.id) self.assertEqual(crawl.urls, urls) self.assertEqual(crawl.status, 'queued') @@ -538,7 +539,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): # Step 1: Create crawl (simulating 'archivebox crawl') urls = 'https://crawl-to-snap-1.example.com\nhttps://crawl-to-snap-2.example.com' - crawl = Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id}) + crawl = require(Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id})) crawl_output = crawl.to_json() # Step 2: Parse crawl output as snapshot input @@ -590,7 +591,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): # Create snapshot overrides = {'created_by_id': created_by_id} - snapshot = Snapshot.from_json(records[0], overrides=overrides) + snapshot = require(Snapshot.from_json(records[0], overrides=overrides)) self.assertIsNotNone(snapshot.id) self.assertEqual(snapshot.url, url) @@ -618,7 +619,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): # Step 1: Create snapshot (simulating 'archivebox snapshot') url = 'https://test-extract-1.example.com' overrides = {'created_by_id': created_by_id} - snapshot = Snapshot.from_json({'url': url}, overrides=overrides) + snapshot = require(Snapshot.from_json({'url': url}, overrides=overrides)) snapshot_output = snapshot.to_json() # Step 2: Parse snapshot output as extract input @@ -657,7 +658,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): # === archivebox crawl https://example.com === url = 'https://test-pipeline-full.example.com' - crawl = Crawl.from_json({'url': url}, overrides={'created_by_id': created_by_id}) + crawl = require(Crawl.from_json({'url': url}, overrides={'created_by_id': created_by_id})) crawl_jsonl = json.dumps(crawl.to_json()) # === | archivebox snapshot === @@ -728,12 +729,12 @@ class TestDepthWorkflows(unittest.TestCase): # Create crawl with depth 0 url = 'https://depth0-test.example.com' - crawl = Crawl.from_json({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id}) + crawl = require(Crawl.from_json({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id})) self.assertEqual(crawl.max_depth, 0) # Create snapshot - snapshot = Snapshot.from_json({'url': url}, overrides={'created_by_id': created_by_id}) + snapshot = require(Snapshot.from_json({'url': url}, overrides={'created_by_id': created_by_id})) self.assertEqual(snapshot.url, url) def test_depth_metadata_in_crawl(self): @@ -744,10 +745,10 @@ class TestDepthWorkflows(unittest.TestCase): created_by_id = get_or_create_system_user_pk() # Create crawl with depth - crawl = Crawl.from_json( + crawl = require(Crawl.from_json( {'url': 'https://depth-meta-test.example.com', 'max_depth': 2}, overrides={'created_by_id': created_by_id} - ) + )) self.assertEqual(crawl.max_depth, 2) diff --git a/archivebox/config/constants.py b/archivebox/config/constants.py index 2a4ceb49..ffcaf775 100644 --- a/archivebox/config/constants.py +++ b/archivebox/config/constants.py @@ -17,7 +17,6 @@ import sys from typing import Dict from pathlib import Path -from collections.abc import Mapping from benedict import benedict @@ -46,7 +45,7 @@ from .version import detect_installed_version ###################### Config ########################## -class ConstantsDict(Mapping): +class ConstantsDict: PACKAGE_DIR: Path = PACKAGE_DIR DATA_DIR: Path = DATA_DIR ARCHIVE_DIR: Path = ARCHIVE_DIR @@ -225,16 +224,9 @@ class ConstantsDict(Mapping): def __benedict__(cls): # when casting to benedict, only include uppercase keys that don't start with an underscore return benedict({key: value for key, value in cls.__dict__.items() if key.isupper() and not key.startswith('_')}) - - @classmethod - def __len__(cls): - return len(cls.__benedict__()) - @classmethod - def __iter__(cls): - return iter(cls.__benedict__()) -CONSTANTS = ConstantsDict() +CONSTANTS = ConstantsDict CONSTANTS_CONFIG = CONSTANTS.__benedict__() # add all key: values to globals() for easier importing, e.g.: diff --git a/archivebox/core/admin_tags.py b/archivebox/core/admin_tags.py index 09c616db..87396ad8 100644 --- a/archivebox/core/admin_tags.py +++ b/archivebox/core/admin_tags.py @@ -1,16 +1,17 @@ __package__ = 'archivebox.core' from django.contrib import admin -from django.utils.html import format_html, mark_safe +from django.utils.html import format_html +from django.utils.safestring import mark_safe from archivebox.misc.paginators import AccelleratedPaginator from archivebox.base_models.admin import BaseModelAdmin -from archivebox.core.models import Tag +from archivebox.core.models import SnapshotTag, Tag class TagInline(admin.TabularInline): - model = Tag.snapshot_set.through # type: ignore + model = SnapshotTag # fk_name = 'snapshot' fields = ('id', 'tag') extra = 1 @@ -173,4 +174,3 @@ class TagAdmin(BaseModelAdmin): def register_admin(admin_site): admin_site.register(Tag, TagAdmin) - diff --git a/archivebox/core/admin_users.py b/archivebox/core/admin_users.py index 92c9c1cb..371317f3 100644 --- a/archivebox/core/admin_users.py +++ b/archivebox/core/admin_users.py @@ -2,8 +2,9 @@ __package__ = 'archivebox.core' from django.contrib import admin from django.contrib.auth.admin import UserAdmin -from django.utils.html import format_html, mark_safe from django.contrib.auth import get_user_model +from django.utils.html import format_html +from django.utils.safestring import mark_safe class CustomUserAdmin(UserAdmin): @@ -16,7 +17,7 @@ class CustomUserAdmin(UserAdmin): add_fieldsets = UserAdmin.add_fieldsets # Extend fieldsets for change form only (not user creation) - fieldsets = [*UserAdmin.fieldsets, ('Data', {'fields': readonly_fields})] + fieldsets = [*(UserAdmin.fieldsets or ()), ('Data', {'fields': readonly_fields})] @admin.display(description='Snapshots') def snapshot_set(self, obj): diff --git a/archivebox/core/forms.py b/archivebox/core/forms.py index cc4f62b3..8589563a 100644 --- a/archivebox/core/forms.py +++ b/archivebox/core/forms.py @@ -22,12 +22,19 @@ def get_plugin_choices(): return [(name, name) for name in get_plugins()] +def get_choice_field(form: forms.Form, name: str) -> forms.ChoiceField: + field = form.fields[name] + if not isinstance(field, forms.ChoiceField): + raise TypeError(f'{name} must be a ChoiceField') + return field + + class AddLinkForm(forms.Form): # Basic fields url = forms.RegexField( label="URLs (one per line)", regex=URL_REGEX, - min_length='6', + min_length=6, strip=True, widget=forms.Textarea, required=True @@ -162,22 +169,22 @@ class AddLinkForm(forms.Form): extensions = {'twocaptcha', 'istilldontcareaboutcookies', 'ublock'} # Populate plugin field choices - self.fields['chrome_plugins'].choices = [ + get_choice_field(self, 'chrome_plugins').choices = [ (p, p) for p in sorted(all_plugins) if p in chrome_dependent ] - self.fields['archiving_plugins'].choices = [ + get_choice_field(self, 'archiving_plugins').choices = [ (p, p) for p in sorted(all_plugins) if p in archiving ] - self.fields['parsing_plugins'].choices = [ + get_choice_field(self, 'parsing_plugins').choices = [ (p, p) for p in sorted(all_plugins) if p in parsing ] - self.fields['search_plugins'].choices = [ + get_choice_field(self, 'search_plugins').choices = [ (p, p) for p in sorted(all_plugins) if p in search ] - self.fields['binary_plugins'].choices = [ + get_choice_field(self, 'binary_plugins').choices = [ (p, p) for p in sorted(all_plugins) if p in binary ] - self.fields['extension_plugins'].choices = [ + get_choice_field(self, 'extension_plugins').choices = [ (p, p) for p in sorted(all_plugins) if p in extensions ] @@ -185,13 +192,15 @@ class AddLinkForm(forms.Form): self.fields['update'].initial = not ARCHIVING_CONFIG.ONLY_NEW def clean(self): - cleaned_data = super().clean() + cleaned_data = super().clean() or {} # Combine all plugin groups into single list all_selected_plugins = [] for field in ['chrome_plugins', 'archiving_plugins', 'parsing_plugins', 'search_plugins', 'binary_plugins', 'extension_plugins']: - all_selected_plugins.extend(cleaned_data.get(field, [])) + selected = cleaned_data.get(field) + if isinstance(selected, list): + all_selected_plugins.extend(selected) # Store combined list for easy access cleaned_data['plugins'] = all_selected_plugins @@ -211,17 +220,13 @@ class AddLinkForm(forms.Form): return schedule -class TagWidgetMixin: +class TagWidget(forms.TextInput): def format_value(self, value): if value is not None and not isinstance(value, str): value = edit_string_for_tags(value) return super().format_value(value) -class TagWidget(TagWidgetMixin, forms.TextInput): - pass - - class TagField(forms.CharField): widget = TagWidget @@ -234,21 +239,21 @@ class TagField(forms.CharField): "Please provide a comma-separated list of tags." ) - def has_changed(self, initial_value, data_value): + def has_changed(self, initial, data): # Always return False if the field is disabled since self.bound_data # always uses the initial value in this case. if self.disabled: return False try: - data_value = self.clean(data_value) + cleaned_data = self.clean(data) except forms.ValidationError: - pass + cleaned_data = data - if initial_value is None: - initial_value = [] + initial_value = [] if initial is None else initial - initial_value = [tag.name for tag in initial_value] - initial_value.sort() + if not isinstance(initial_value, list): + initial_value = list(initial_value) - return initial_value != data_value + normalized_initial = sorted(tag.name for tag in initial_value) + return normalized_initial != cleaned_data diff --git a/archivebox/core/management/commands/archivebox.py b/archivebox/core/management/commands/archivebox.py index a68b5d94..582ef344 100644 --- a/archivebox/core/management/commands/archivebox.py +++ b/archivebox/core/management/commands/archivebox.py @@ -2,8 +2,7 @@ __package__ = 'archivebox' from django.core.management.base import BaseCommand - -from .cli import run_subcommand +from archivebox.cli import main as run_cli class Command(BaseCommand): @@ -15,4 +14,5 @@ class Command(BaseCommand): def handle(self, *args, **kwargs): - run_subcommand(kwargs['subcommand'], args=kwargs['command_args']) + command_args = [kwargs['subcommand'], *kwargs['command_args']] + run_cli(args=command_args) diff --git a/archivebox/core/middleware.py b/archivebox/core/middleware.py index 7594eb8d..62accedc 100644 --- a/archivebox/core/middleware.py +++ b/archivebox/core/middleware.py @@ -165,6 +165,8 @@ class ReverseProxyAuthMiddleware(RemoteUserMiddleware): return ip = request.META.get('REMOTE_ADDR') + if not isinstance(ip, str): + return for cidr in SERVER_CONFIG.REVERSE_PROXY_WHITELIST.split(','): try: diff --git a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py index d53670c8..93cca140 100644 --- a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py +++ b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py @@ -3,10 +3,11 @@ import archivebox.base_models.models import django.db.models.deletion import django.utils.timezone -import uuid from django.conf import settings from django.db import migrations, models, connection +from archivebox.uuid_compat import uuid7 + def copy_old_fields_to_new(apps, schema_editor): """Copy data from old field names to new field names after AddField operations.""" @@ -236,7 +237,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='archiveresult', name='uuid', - field=models.UUIDField(blank=True, db_index=True, default=uuid.uuid7, null=True), + field=models.UUIDField(blank=True, db_index=True, default=uuid7, null=True), ), migrations.AlterField( model_name='snapshot', @@ -246,7 +247,7 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='snapshot', name='id', - field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True), + field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True), ), migrations.AlterField( model_name='snapshot', diff --git a/archivebox/core/migrations/0030_alter_archiveresult_id.py b/archivebox/core/migrations/0030_alter_archiveresult_id.py index 0c5e54b0..80ce097c 100644 --- a/archivebox/core/migrations/0030_alter_archiveresult_id.py +++ b/archivebox/core/migrations/0030_alter_archiveresult_id.py @@ -1,8 +1,9 @@ # Generated by Django 6.0 on 2026-01-02 10:02 -import uuid from django.db import migrations, models +from archivebox.uuid_compat import uuid7 + class Migration(migrations.Migration): @@ -14,6 +15,6 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='archiveresult', name='id', - field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True), + field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True), ), ] diff --git a/archivebox/machine/migrations/0006_process.py b/archivebox/machine/migrations/0006_process.py index 6a2139f0..b989d482 100644 --- a/archivebox/machine/migrations/0006_process.py +++ b/archivebox/machine/migrations/0006_process.py @@ -2,9 +2,10 @@ import django.db.models.deletion import django.utils.timezone -import uuid from django.db import migrations, models +from archivebox.uuid_compat import uuid7 + class Migration(migrations.Migration): @@ -16,7 +17,7 @@ class Migration(migrations.Migration): migrations.CreateModel( name='Process', fields=[ - ('id', models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True)), + ('id', models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True)), ('created_at', models.DateTimeField(db_index=True, default=django.utils.timezone.now)), ('modified_at', models.DateTimeField(auto_now=True)), ('pwd', models.CharField(blank=True, default='', help_text='Working directory for process execution', max_length=512)), diff --git a/archivebox/machine/tests/test_machine_models.py b/archivebox/machine/tests/test_machine_models.py index 983770d4..e0638285 100644 --- a/archivebox/machine/tests/test_machine_models.py +++ b/archivebox/machine/tests/test_machine_models.py @@ -84,6 +84,7 @@ class TestMachineModel(TestCase): result = Machine.from_json(record) self.assertIsNotNone(result) + assert result is not None self.assertEqual(result.config.get('WGET_BINARY'), '/usr/bin/wget') def test_machine_from_jsonl_invalid(self): @@ -179,6 +180,7 @@ class TestBinaryModel(TestCase): result = Binary.objects.get_valid_binary('wget') self.assertIsNotNone(result) + assert result is not None self.assertEqual(result.abspath, '/usr/bin/wget') def test_binary_update_and_requeue(self): @@ -209,6 +211,8 @@ class TestBinaryModel(TestCase): 'overrides': overrides, }) + self.assertIsNotNone(binary) + assert binary is not None self.assertEqual(binary.overrides, overrides) def test_binary_from_json_does_not_coerce_legacy_override_shapes(self): @@ -224,6 +228,8 @@ class TestBinaryModel(TestCase): 'overrides': overrides, }) + self.assertIsNotNone(binary) + assert binary is not None self.assertEqual(binary.overrides, overrides) def test_binary_from_json_prefers_published_readability_package(self): @@ -238,6 +244,8 @@ class TestBinaryModel(TestCase): }, }) + self.assertIsNotNone(binary) + assert binary is not None self.assertEqual( binary.overrides, { @@ -265,7 +273,7 @@ class TestBinaryStateMachine(TestCase): def test_binary_state_machine_initial_state(self): """BinaryMachine should start in queued state.""" sm = BinaryMachine(self.binary) - self.assertEqual(sm.current_state.value, Binary.StatusChoices.QUEUED) + self.assertEqual(sm.current_state_value, Binary.StatusChoices.QUEUED) def test_binary_state_machine_can_start(self): """BinaryMachine.can_start() should check name and binproviders.""" @@ -604,7 +612,7 @@ class TestProcessStateMachine(TestCase): def test_process_state_machine_initial_state(self): """ProcessMachine should start in queued state.""" sm = ProcessMachine(self.process) - self.assertEqual(sm.current_state.value, Process.StatusChoices.QUEUED) + self.assertEqual(sm.current_state_value, Process.StatusChoices.QUEUED) def test_process_state_machine_can_start(self): """ProcessMachine.can_start() should check cmd and machine.""" diff --git a/archivebox/mcp/server.py b/archivebox/mcp/server.py index 025c3eee..19e31b7e 100644 --- a/archivebox/mcp/server.py +++ b/archivebox/mcp/server.py @@ -8,7 +8,7 @@ Click command metadata. Handles JSON-RPC 2.0 requests over stdio transport. import sys import json import traceback -from typing import Optional +from typing import Any, Optional import click from click.testing import CliRunner @@ -19,25 +19,25 @@ from archivebox.config.version import VERSION class MCPJSONEncoder(json.JSONEncoder): """Custom JSON encoder that handles Click sentinel values and other special types""" - def default(self, obj): + def default(self, o): # Handle Click's sentinel values - if hasattr(click, 'core') and hasattr(click.core, '_SentinelClass'): - if isinstance(obj, click.core._SentinelClass): + sentinel_type = getattr(click.core, '_SentinelClass', None) + if isinstance(sentinel_type, type) and isinstance(o, sentinel_type): return None # Handle tuples (convert to lists) - if isinstance(obj, tuple): - return list(obj) + if isinstance(o, tuple): + return list(o) # Handle any other non-serializable objects try: - return super().default(obj) + return super().default(o) except TypeError: - return str(obj) + return str(o) # Type mapping from Click types to JSON Schema types -def click_type_to_json_schema_type(click_type) -> dict: +def click_type_to_json_schema_type(click_type: click.ParamType) -> dict[str, Any]: """Convert a Click parameter type to JSON Schema type definition""" if isinstance(click_type, click.types.StringParamType): @@ -49,7 +49,7 @@ def click_type_to_json_schema_type(click_type) -> dict: elif isinstance(click_type, click.types.BoolParamType): return {"type": "boolean"} elif isinstance(click_type, click.types.Choice): - return {"type": "string", "enum": click_type.choices} + return {"type": "string", "enum": list(click_type.choices)} elif isinstance(click_type, click.types.Path): return {"type": "string", "description": "File or directory path"} elif isinstance(click_type, click.types.File): @@ -62,7 +62,7 @@ def click_type_to_json_schema_type(click_type) -> dict: return {"type": "string"} -def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict: +def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict[str, Any]: """ Convert a Click command to an MCP tool definition with JSON Schema. @@ -70,20 +70,21 @@ def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> di the input schema without manual definition. """ - properties = {} - required = [] + properties: dict[str, dict[str, Any]] = {} + required: list[str] = [] # Extract parameters from Click command for param in click_command.params: # Skip internal parameters - if param.name in ('help', 'version'): + if param.name is None or param.name in ('help', 'version'): continue param_schema = click_type_to_json_schema_type(param.type) # Add description from Click help text - if param.help: - param_schema["description"] = param.help + help_text = getattr(param, 'help', None) + if help_text: + param_schema["description"] = help_text # Handle default values if param.default is not None and param.default != (): @@ -248,7 +249,7 @@ class MCPServer: if cmd_name not in self._tool_cache: if cmd_name not in self.cli_group.all_subcommands: return None - self._tool_cache[cmd_name] = self.cli_group.get_command(None, cmd_name) + self._tool_cache[cmd_name] = self.cli_group.get_command(click.Context(self.cli_group), cmd_name) return self._tool_cache[cmd_name] def handle_initialize(self, params: dict) -> dict: diff --git a/archivebox/misc/db.py b/archivebox/misc/db.py index 7f2c7247..c438df53 100644 --- a/archivebox/misc/db.py +++ b/archivebox/misc/db.py @@ -6,7 +6,7 @@ __package__ = 'archivebox.misc' from io import StringIO from pathlib import Path -from typing import List, Tuple +from typing import Any, List, Tuple from archivebox.config import DATA_DIR from archivebox.misc.util import enforce_types @@ -48,8 +48,8 @@ def apply_migrations(out_dir: Path = DATA_DIR) -> List[str]: @enforce_types -def get_admins(out_dir: Path = DATA_DIR) -> List: +def get_admins(out_dir: Path = DATA_DIR) -> List[Any]: """Get list of superuser accounts""" from django.contrib.auth.models import User - return User.objects.filter(is_superuser=True).exclude(username='system') + return list(User.objects.filter(is_superuser=True).exclude(username='system')) diff --git a/archivebox/misc/logging_util.py b/archivebox/misc/logging_util.py index c00071f6..885aec4d 100644 --- a/archivebox/misc/logging_util.py +++ b/archivebox/misc/logging_util.py @@ -14,7 +14,7 @@ from pathlib import Path from datetime import datetime, timezone from dataclasses import dataclass -from typing import Any, Optional, List, Dict, Union, Iterable, IO, TYPE_CHECKING +from typing import Any, Optional, List, Dict, Union, Iterable, IO, TYPE_CHECKING, cast if TYPE_CHECKING: from archivebox.core.models import Snapshot @@ -397,7 +397,8 @@ def log_list_finished(snapshots): from archivebox.core.models import Snapshot print() print('---------------------------------------------------------------------------------------------------') - print(Snapshot.objects.filter(pk__in=[s.pk for s in snapshots]).to_csv(cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | ')) + csv_queryset = cast(Any, Snapshot.objects.filter(pk__in=[s.pk for s in snapshots])) + print(csv_queryset.to_csv(cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | ')) print('---------------------------------------------------------------------------------------------------') print() diff --git a/archivebox/misc/monkey_patches.py b/archivebox/misc/monkey_patches.py index 9ee755c4..2f4bb146 100644 --- a/archivebox/misc/monkey_patches.py +++ b/archivebox/misc/monkey_patches.py @@ -13,7 +13,7 @@ django_stubs_ext.monkeypatch() # monkey patch django timezone to add back utc (it was removed in Django 5.0) -timezone.utc = datetime.timezone.utc +setattr(timezone, 'utc', datetime.timezone.utc) # monkey patch django-signals-webhooks to change how it shows up in Admin UI # from signal_webhooks.apps import DjangoSignalWebhooksConfig diff --git a/archivebox/misc/paginators.py b/archivebox/misc/paginators.py index 2e623a65..fa8c6cdb 100644 --- a/archivebox/misc/paginators.py +++ b/archivebox/misc/paginators.py @@ -13,12 +13,17 @@ class AccelleratedPaginator(Paginator): @cached_property def count(self): - if self.object_list._has_filters(): # type: ignore + has_filters = getattr(self.object_list, '_has_filters', None) + if callable(has_filters) and has_filters(): # fallback to normal count method on filtered queryset return super().count - else: - # otherwise count total rows in a separate fast query - return self.object_list.model.objects.count() + + model = getattr(self.object_list, 'model', None) + if model is None: + return super().count + + # otherwise count total rows in a separate fast query + return model.objects.count() # Alternative approach for PostgreSQL: fallback count takes > 200ms # from django.db import connection, transaction, OperationalError diff --git a/archivebox/misc/progress_layout.py b/archivebox/misc/progress_layout.py index 1263856b..537db5b9 100644 --- a/archivebox/misc/progress_layout.py +++ b/archivebox/misc/progress_layout.py @@ -17,7 +17,7 @@ from collections import deque from pathlib import Path from rich import box -from rich.console import Group +from rich.console import Group, RenderableType from rich.layout import Layout from rich.columns import Columns from rich.panel import Panel @@ -48,7 +48,7 @@ class CrawlQueuePanel: self.max_crawl_workers = 8 self.crawl_id: Optional[str] = None - def __rich__(self) -> Panel: + def __rich__(self) -> RenderableType: grid = Table.grid(expand=True) grid.add_column(justify="left", ratio=1) grid.add_column(justify="center", ratio=1) @@ -104,7 +104,7 @@ class ProcessLogPanel: self.compact = compact self.bg_terminating = bg_terminating - def __rich__(self) -> Panel: + def __rich__(self) -> RenderableType: completed_line = self._completed_output_line() if completed_line: style = "green" if self._completed_ok() else "yellow" diff --git a/archivebox/misc/serve_static.py b/archivebox/misc/serve_static.py index 76bc74e8..19e2dadd 100644 --- a/archivebox/misc/serve_static.py +++ b/archivebox/misc/serve_static.py @@ -111,7 +111,7 @@ def _render_markdown_fallback(text: str) -> str: return _markdown.markdown( text, extensions=["extra", "toc", "sane_lists"], - output_format="html5", + output_format="html", ) except Exception: pass diff --git a/archivebox/misc/system.py b/archivebox/misc/system.py index 6804c210..36eac00d 100644 --- a/archivebox/misc/system.py +++ b/archivebox/misc/system.py @@ -9,13 +9,14 @@ import sys from json import dump from pathlib import Path from typing import Optional, Union, Tuple -from subprocess import _mswindows, PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired +from subprocess import PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired from atomicwrites import atomic_write as lib_atomic_write from archivebox.config.common import STORAGE_CONFIG from archivebox.misc.util import enforce_types, ExtendedEncoder +IS_WINDOWS = os.name == 'nt' def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False, text=False, start_new_session=True, **kwargs): """Patched of subprocess.run to kill forked child subprocesses and fix blocking io making timeout=innefective @@ -47,13 +48,15 @@ def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False, stdout, stderr = process.communicate(input, timeout=timeout) except TimeoutExpired as exc: process.kill() - if _mswindows: + if IS_WINDOWS: # Windows accumulates the output in a single blocking # read() call run on child threads, with the timeout # being done in a join() on those threads. communicate() # _after_ kill() is required to collect that and add it # to the exception. - exc.stdout, exc.stderr = process.communicate() + timed_out_stdout, timed_out_stderr = process.communicate() + exc.stdout = timed_out_stdout.encode() if isinstance(timed_out_stdout, str) else timed_out_stdout + exc.stderr = timed_out_stderr.encode() if isinstance(timed_out_stderr, str) else timed_out_stderr else: # POSIX _communicate already populated the output so # far into the TimeoutExpired exception. @@ -71,11 +74,12 @@ def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False, finally: # force kill any straggler subprocesses that were forked from the main proc try: - os.killpg(pgid, signal.SIGINT) + if pgid is not None: + os.killpg(pgid, signal.SIGINT) except Exception: pass - return CompletedProcess(process.args, retcode, stdout, stderr) + return CompletedProcess(process.args, retcode or 0, stdout, stderr) @enforce_types diff --git a/archivebox/misc/toml_util.py b/archivebox/misc/toml_util.py index 9dd51d1b..0da1b298 100644 --- a/archivebox/misc/toml_util.py +++ b/archivebox/misc/toml_util.py @@ -42,7 +42,7 @@ def convert(ini_str: str) -> str: """Convert a string of INI config into its TOML equivalent (warning: strips comments)""" config = configparser.ConfigParser() - config.optionxform = str # capitalize key names + setattr(config, 'optionxform', str) # capitalize key names config.read_string(ini_str) # Initialize an empty dictionary to store the TOML representation @@ -77,12 +77,12 @@ class JSONSchemaWithLambdas(GenerateJsonSchema): Usage: >>> json.dumps(value, encoder=JSONSchemaWithLambdas()) """ - def encode_default(self, default: Any) -> Any: + def encode_default(self, dft: Any) -> Any: config = self._config - if isinstance(default, Callable): - return '{{lambda ' + inspect.getsource(default).split('=lambda ')[-1].strip()[:-1] + '}}' + if isinstance(dft, Callable): + return '{{lambda ' + inspect.getsource(dft).split('=lambda ')[-1].strip()[:-1] + '}}' return to_jsonable_python( - default, + dft, timedelta_mode=config.ser_json_timedelta, bytes_mode=config.ser_json_bytes, serialize_unknown=True diff --git a/archivebox/misc/util.py b/archivebox/misc/util.py index c69c8c86..61b898b7 100644 --- a/archivebox/misc/util.py +++ b/archivebox/misc/util.py @@ -56,9 +56,19 @@ urldecode = lambda s: s and unquote(s) htmlencode = lambda s: s and escape(s, quote=True) htmldecode = lambda s: s and unescape(s) -short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0] -ts_to_date_str = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M') -ts_to_iso = lambda ts: ts and parse_date(ts).isoformat() +def short_ts(ts: Any) -> str | None: + parsed = parse_date(ts) + return None if parsed is None else str(parsed.timestamp()).split('.')[0] + + +def ts_to_date_str(ts: Any) -> str | None: + parsed = parse_date(ts) + return None if parsed is None else parsed.strftime('%Y-%m-%d %H:%M') + + +def ts_to_iso(ts: Any) -> str | None: + parsed = parse_date(ts) + return None if parsed is None else parsed.isoformat() COLOR_REGEX = re.compile(r'\[(?P\d+)(;(?P\d+)(;(?P\d+))?)?m') @@ -175,7 +185,7 @@ def docstring(text: Optional[str]): @enforce_types -def str_between(string: str, start: str, end: str=None) -> str: +def str_between(string: str, start: str, end: str | None = None) -> str: """(12345, , ) -> 12345""" content = string.split(start, 1)[-1] @@ -186,7 +196,7 @@ def str_between(string: str, start: str, end: str=None) -> str: @enforce_types -def parse_date(date: Any) -> datetime: +def parse_date(date: Any) -> datetime | None: """Parse unix timestamps, iso format, and human-readable strings""" if date is None: @@ -196,20 +206,24 @@ def parse_date(date: Any) -> datetime: if date.tzinfo is None: return date.replace(tzinfo=timezone.utc) - assert date.tzinfo.utcoffset(datetime.now()).seconds == 0, 'Refusing to load a non-UTC date!' + offset = date.utcoffset() + assert offset == datetime.now(timezone.utc).utcoffset(), 'Refusing to load a non-UTC date!' return date if isinstance(date, (float, int)): date = str(date) if isinstance(date, str): - return dateparser(date, settings={'TIMEZONE': 'UTC'}).astimezone(timezone.utc) + parsed_date = dateparser(date, settings={'TIMEZONE': 'UTC'}) + if parsed_date is None: + raise ValueError(f'Tried to parse invalid date string! {date}') + return parsed_date.astimezone(timezone.utc) raise ValueError('Tried to parse invalid date! {}'.format(date)) @enforce_types -def download_url(url: str, timeout: int=None) -> str: +def download_url(url: str, timeout: int | None = None) -> str: """Download the contents of a remote url and return the text""" from archivebox.config.common import ARCHIVING_CONFIG @@ -221,7 +235,8 @@ def download_url(url: str, timeout: int=None) -> str: cookie_jar = http.cookiejar.MozillaCookieJar(ARCHIVING_CONFIG.COOKIES_FILE) cookie_jar.load(ignore_discard=True, ignore_expires=True) for cookie in cookie_jar: - session.cookies.set(cookie.name, cookie.value, domain=cookie.domain, path=cookie.path) + if cookie.value is not None: + session.cookies.set(cookie.name, cookie.value, domain=cookie.domain, path=cookie.path) response = session.get( url, @@ -331,47 +346,47 @@ class ExtendedEncoder(pyjson.JSONEncoder): fields and objects """ - def default(self, obj): - cls_name = obj.__class__.__name__ + def default(self, o): + cls_name = o.__class__.__name__ - if hasattr(obj, '_asdict'): - return obj._asdict() + if hasattr(o, '_asdict'): + return o._asdict() - elif isinstance(obj, bytes): - return obj.decode() + elif isinstance(o, bytes): + return o.decode() - elif isinstance(obj, datetime): - return obj.isoformat() + elif isinstance(o, datetime): + return o.isoformat() - elif isinstance(obj, Exception): - return '{}: {}'.format(obj.__class__.__name__, obj) + elif isinstance(o, Exception): + return '{}: {}'.format(o.__class__.__name__, o) - elif isinstance(obj, Path): - return str(obj) + elif isinstance(o, Path): + return str(o) elif cls_name in ('dict_items', 'dict_keys', 'dict_values'): - return list(obj) + return list(o) - elif isinstance(obj, Callable): - return str(obj) + elif isinstance(o, Callable): + return str(o) # Try dict/list conversion as fallback try: - return dict(obj) + return dict(o) except Exception: pass try: - return list(obj) + return list(o) except Exception: pass try: - return str(obj) + return str(o) except Exception: pass - return pyjson.JSONEncoder.default(self, obj) + return pyjson.JSONEncoder.default(self, o) @enforce_types diff --git a/archivebox/mypy.ini b/archivebox/mypy.ini deleted file mode 100644 index b1b4489a..00000000 --- a/archivebox/mypy.ini +++ /dev/null @@ -1,3 +0,0 @@ -[mypy] -plugins = - mypy_django_plugin.main diff --git a/archivebox/personas/migrations/0002_alter_persona_id.py b/archivebox/personas/migrations/0002_alter_persona_id.py index e8e5af2a..5b5aef6c 100644 --- a/archivebox/personas/migrations/0002_alter_persona_id.py +++ b/archivebox/personas/migrations/0002_alter_persona_id.py @@ -1,8 +1,9 @@ # Generated by Django 6.0 on 2026-01-05 01:09 -import uuid from django.db import migrations, models +from archivebox.uuid_compat import uuid7 + class Migration(migrations.Migration): @@ -14,6 +15,6 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='persona', name='id', - field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True), + field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True), ), ] diff --git a/archivebox/tests/conftest.py b/archivebox/tests/conftest.py index 28f58062..b8d37bd4 100644 --- a/archivebox/tests/conftest.py +++ b/archivebox/tests/conftest.py @@ -400,13 +400,13 @@ def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str]) # Test Data Factories # ============================================================================= -def create_test_url(domain: str = 'example.com', path: str = None) -> str: +def create_test_url(domain: str = 'example.com', path: str | None = None) -> str: """Generate unique test URL.""" path = path or uuid7().hex[:8] return f'https://{domain}/{path}' -def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]: +def create_test_crawl_json(urls: List[str] | None = None, **kwargs) -> Dict[str, Any]: """Create Crawl JSONL record for testing.""" urls = urls or [create_test_url()] return { @@ -419,7 +419,7 @@ def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]: } -def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]: +def create_test_snapshot_json(url: str | None = None, **kwargs) -> Dict[str, Any]: """Create Snapshot JSONL record for testing.""" return { 'type': 'Snapshot', diff --git a/archivebox/tests/migrations_helpers.py b/archivebox/tests/migrations_helpers.py index 5c620186..0c533f67 100644 --- a/archivebox/tests/migrations_helpers.py +++ b/archivebox/tests/migrations_helpers.py @@ -967,7 +967,7 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: # Helper Functions # ============================================================================= -def run_archivebox(data_dir: Path, args: list, timeout: int = 60, env: dict = None) -> subprocess.CompletedProcess: +def run_archivebox(data_dir: Path, args: list, timeout: int = 60, env: dict | None = None) -> subprocess.CompletedProcess: """Run archivebox command in subprocess with given data directory.""" base_env = os.environ.copy() base_env['DATA_DIR'] = str(data_dir) diff --git a/archivebox/tests/test_add.py b/archivebox/tests/test_add.py deleted file mode 100644 index 39d423e3..00000000 --- a/archivebox/tests/test_add.py +++ /dev/null @@ -1,166 +0,0 @@ -import os -import sqlite3 -import subprocess - -def test_depth_flag_is_accepted(process, disable_extractors_dict): - arg_process = subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - assert 'unrecognized arguments: --depth' not in arg_process.stderr.decode("utf-8") - - -def test_depth_flag_fails_if_it_is_not_0_or_1(process, disable_extractors_dict): - arg_process = subprocess.run( - ["archivebox", "add", "--index-only", "--depth=5", "https://example.com"], - capture_output=True, - env=disable_extractors_dict, - ) - # Error message may say "invalid choice" or "is not one of" - stderr = arg_process.stderr.decode("utf-8") - assert 'invalid' in stderr.lower() or 'not one of' in stderr.lower() - arg_process = subprocess.run( - ["archivebox", "add", "--index-only", "--depth=-1", "https://example.com"], - capture_output=True, - env=disable_extractors_dict, - ) - stderr = arg_process.stderr.decode("utf-8") - assert 'invalid' in stderr.lower() or 'not one of' in stderr.lower() - - -def test_depth_flag_0_creates_source_file(tmp_path, process, disable_extractors_dict): - os.chdir(tmp_path) - subprocess.run( - ["archivebox", "add", "--index-only", "--depth=0", "https://example.com"], - capture_output=True, - env=disable_extractors_dict, - ) - - # Check that source file was created with the URL - sources_dir = tmp_path / "sources" - assert sources_dir.exists() - source_files = list(sources_dir.glob("*cli_add.txt")) - assert len(source_files) >= 1 - source_content = source_files[0].read_text() - assert "example.com" in source_content - - -def test_overwrite_flag_is_accepted(process, disable_extractors_dict): - subprocess.run( - ["archivebox", "add", "--index-only", "--depth=0", "https://example.com"], - capture_output=True, - env=disable_extractors_dict, - ) - arg_process = subprocess.run( - ["archivebox", "add", "--index-only", "--overwrite", "https://example.com"], - capture_output=True, - env=disable_extractors_dict, - ) - assert 'unrecognized arguments: --overwrite' not in arg_process.stderr.decode("utf-8") - -def test_add_creates_crawl_in_database(tmp_path, process, disable_extractors_dict): - os.chdir(tmp_path) - subprocess.run( - ["archivebox", "add", "--index-only", "--depth=0", "https://example.com"], - capture_output=True, - env=disable_extractors_dict, - ) - - # Check that a Crawl was created in database - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - count = c.execute("SELECT COUNT(*) FROM crawls_crawl").fetchone()[0] - conn.close() - - assert count >= 1 - - -def test_add_with_tags(tmp_path, process, disable_extractors_dict): - """Test adding URL with tags.""" - os.chdir(tmp_path) - subprocess.run( - ["archivebox", "add", "--index-only", "--depth=0", "--tag=test,example", "https://example.com"], - capture_output=True, - env=disable_extractors_dict, - ) - - # Check that tags were created in database - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - tags = c.execute("SELECT name FROM core_tag").fetchall() - conn.close() - - tag_names = [t[0] for t in tags] - assert 'test' in tag_names or 'example' in tag_names - - -def test_add_multiple_urls_single_call(tmp_path, process, disable_extractors_dict): - """Test adding multiple URLs in a single call creates multiple snapshots.""" - os.chdir(tmp_path) - subprocess.run( - ["archivebox", "add", "--index-only", "--depth=0", - "https://example.com", "https://example.org"], - capture_output=True, - env=disable_extractors_dict, - ) - - # Check both URLs are in the source file - sources_dir = tmp_path / "sources" - source_files = list(sources_dir.glob("*cli_add.txt")) - assert len(source_files) >= 1 - source_content = source_files[0].read_text() - assert "example.com" in source_content - assert "example.org" in source_content - - -def test_add_from_file(tmp_path, process, disable_extractors_dict): - """Test adding URLs from a file.""" - os.chdir(tmp_path) - - # Create a file with URLs - urls_file = tmp_path / "urls.txt" - urls_file.write_text("https://example.com\nhttps://example.org\n") - - subprocess.run( - ["archivebox", "add", "--index-only", "--depth=0", str(urls_file)], - capture_output=True, - env=disable_extractors_dict, - ) - - # Check that a Crawl was created - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - count = c.execute("SELECT COUNT(*) FROM crawls_crawl").fetchone()[0] - conn.close() - - assert count >= 1 - - -class TestAddCLI: - """Test the CLI interface for add command.""" - - def test_add_help(self, tmp_path, process): - """Test that --help works for add command.""" - os.chdir(tmp_path) - - result = subprocess.run( - ["archivebox", "add", "--help"], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - assert '--depth' in result.stdout or 'depth' in result.stdout - assert '--tag' in result.stdout or 'tag' in result.stdout - - def test_add_no_args_shows_help(self, tmp_path, process): - """Test that add with no args shows help or usage.""" - os.chdir(tmp_path) - - result = subprocess.run( - ["archivebox", "add"], - capture_output=True, - text=True, - ) - - # Should either show help or error about missing URL - combined = result.stdout + result.stderr - assert 'usage' in combined.lower() or 'url' in combined.lower() or 'add' in combined.lower() diff --git a/archivebox/tests/test_admin_views.py b/archivebox/tests/test_admin_views.py index c1bfb3bd..486b714a 100644 --- a/archivebox/tests/test_admin_views.py +++ b/archivebox/tests/test_admin_views.py @@ -9,9 +9,11 @@ Tests cover: """ import pytest +from typing import cast from django.test import override_settings from django.urls import reverse from django.contrib.auth import get_user_model +from django.contrib.auth.models import UserManager pytestmark = pytest.mark.django_db @@ -24,7 +26,7 @@ PUBLIC_HOST = 'public.archivebox.localhost:8000' @pytest.fixture def admin_user(db): """Create admin user for tests.""" - return User.objects.create_superuser( + return cast(UserManager, User.objects).create_superuser( username='testadmin', email='admin@test.com', password='testpassword' diff --git a/archivebox/tests/test_cli_add.py b/archivebox/tests/test_cli_add.py index 11abca82..fbd6894f 100644 --- a/archivebox/tests/test_cli_add.py +++ b/archivebox/tests/test_cli_add.py @@ -7,6 +7,21 @@ Verify add creates snapshots in DB, crawls, source files, and archive directorie import os import sqlite3 import subprocess +from pathlib import Path + + +def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None: + candidates = {snapshot_id} + if len(snapshot_id) == 32: + candidates.add(f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}") + elif len(snapshot_id) == 36 and '-' in snapshot_id: + candidates.add(snapshot_id.replace('-', '')) + + for needle in candidates: + for path in data_dir.rglob(needle): + if path.is_dir(): + return path + return None def test_add_single_url_creates_snapshot_in_db(tmp_path, process, disable_extractors_dict): @@ -144,6 +159,21 @@ def test_add_with_depth_1_flag(tmp_path, process, disable_extractors_dict): assert 'unrecognized arguments: --depth' not in result.stderr.decode('utf-8') +def test_add_rejects_invalid_depth_values(tmp_path, process, disable_extractors_dict): + """Test that add rejects depth values outside the supported range.""" + os.chdir(tmp_path) + + for depth in ('5', '-1'): + result = subprocess.run( + ['archivebox', 'add', '--index-only', f'--depth={depth}', 'https://example.com'], + capture_output=True, + env=disable_extractors_dict, + ) + stderr = result.stderr.decode('utf-8').lower() + assert result.returncode != 0 + assert 'invalid' in stderr or 'not one of' in stderr + + def test_add_with_tags(tmp_path, process, disable_extractors_dict): """Test adding URL with tags stores tags_str in crawl. @@ -245,11 +275,8 @@ def test_add_with_overwrite_flag(tmp_path, process, disable_extractors_dict): assert 'unrecognized arguments: --overwrite' not in result.stderr.decode('utf-8') -def test_add_creates_archive_subdirectory(tmp_path, process, disable_extractors_dict): - """Test that add creates archive subdirectory for the snapshot. - - Archive subdirectories are named by timestamp, not by snapshot ID. - """ +def test_add_creates_snapshot_output_directory(tmp_path, process, disable_extractors_dict): + """Test that add creates the current snapshot output directory on disk.""" os.chdir(tmp_path) subprocess.run( ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'], @@ -257,16 +284,44 @@ def test_add_creates_archive_subdirectory(tmp_path, process, disable_extractors_ env=disable_extractors_dict, ) - # Get the snapshot timestamp from the database conn = sqlite3.connect("index.sqlite3") c = conn.cursor() - timestamp = c.execute("SELECT timestamp FROM core_snapshot").fetchone()[0] + snapshot_id = str(c.execute("SELECT id FROM core_snapshot").fetchone()[0]) conn.close() - # Check that archive subdirectory was created using timestamp - archive_dir = tmp_path / "archive" / str(timestamp) - assert archive_dir.exists() - assert archive_dir.is_dir() + snapshot_dir = _find_snapshot_dir(tmp_path, snapshot_id) + assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id}" + assert snapshot_dir.is_dir() + + +def test_add_help_shows_depth_and_tag_options(tmp_path, process): + """Test that add --help documents the main filter and crawl options.""" + os.chdir(tmp_path) + + result = subprocess.run( + ['archivebox', 'add', '--help'], + capture_output=True, + text=True, + ) + + assert result.returncode == 0 + assert '--depth' in result.stdout + assert '--tag' in result.stdout + + +def test_add_without_args_shows_usage(tmp_path, process): + """Test that add without URLs fails with a usage hint instead of crashing.""" + os.chdir(tmp_path) + + result = subprocess.run( + ['archivebox', 'add'], + capture_output=True, + text=True, + ) + + combined = result.stdout + result.stderr + assert result.returncode != 0 + assert 'usage' in combined.lower() or 'url' in combined.lower() def test_add_index_only_skips_extraction(tmp_path, process, disable_extractors_dict): diff --git a/archivebox/tests/test_cli_init.py b/archivebox/tests/test_cli_init.py index e6ce1ef6..fa6fe157 100644 --- a/archivebox/tests/test_cli_init.py +++ b/archivebox/tests/test_cli_init.py @@ -241,3 +241,24 @@ def test_init_output_shows_collection_info(tmp_path): output = result.stdout # Should show some helpful info about the collection assert 'ArchiveBox' in output or 'collection' in output.lower() or 'Initializing' in output + + +def test_init_ignores_unrecognized_archive_directories(tmp_path, process, disable_extractors_dict): + """Test that init upgrades existing dirs without choking on extra folders.""" + os.chdir(tmp_path) + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + (tmp_path / "archive" / "some_random_folder").mkdir(parents=True, exist_ok=True) + + result = subprocess.run( + ['archivebox', 'init'], + capture_output=True, + text=True, + env=disable_extractors_dict, + ) + + assert result.returncode == 0, result.stdout + result.stderr diff --git a/archivebox/tests/test_cli_install.py b/archivebox/tests/test_cli_install.py index c7738468..1d0f499e 100644 --- a/archivebox/tests/test_cli_install.py +++ b/archivebox/tests/test_cli_install.py @@ -93,6 +93,59 @@ def test_install_shows_binary_status(tmp_path, process): assert len(output) > 50 +def test_install_dry_run_prints_dry_run_message(tmp_path, process): + """Test that install --dry-run clearly reports that no changes will be made.""" + os.chdir(tmp_path) + result = subprocess.run( + ['archivebox', 'install', '--dry-run'], + capture_output=True, + text=True, + timeout=60, + ) + + assert result.returncode == 0 + assert 'dry run' in result.stdout.lower() + + +def test_install_help_lists_dry_run_flag(tmp_path): + """Test that install --help documents the dry-run option.""" + os.chdir(tmp_path) + result = subprocess.run( + ['archivebox', 'install', '--help'], + capture_output=True, + text=True, + ) + + assert result.returncode == 0 + assert '--dry-run' in result.stdout or '-d' in result.stdout + + +def test_install_invalid_option_fails(tmp_path): + """Test that invalid install options fail cleanly.""" + os.chdir(tmp_path) + result = subprocess.run( + ['archivebox', 'install', '--invalid-option'], + capture_output=True, + text=True, + ) + + assert result.returncode != 0 + + +def test_install_from_empty_dir_initializes_collection(tmp_path): + """Test that install bootstraps an empty dir before performing work.""" + os.chdir(tmp_path) + result = subprocess.run( + ['archivebox', 'install', '--dry-run'], + capture_output=True, + text=True, + ) + + output = result.stdout + result.stderr + assert result.returncode == 0 + assert 'Initializing' in output or 'Dry run' in output or 'init' in output.lower() + + def test_install_updates_binary_table(tmp_path, process): """Test that install completes and only mutates dependency state.""" os.chdir(tmp_path) diff --git a/archivebox/tests/test_cli_list.py b/archivebox/tests/test_cli_list.py new file mode 100644 index 00000000..cff62bc9 --- /dev/null +++ b/archivebox/tests/test_cli_list.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +""" +Tests for archivebox list command. +Verify list emits snapshot JSONL and applies the documented filters. +""" + +import json +import os +import sqlite3 +import subprocess + + +def _parse_jsonl(stdout: str) -> list[dict]: + return [ + json.loads(line) + for line in stdout.splitlines() + if line.strip().startswith('{') + ] + + +def test_list_outputs_existing_snapshots_as_jsonl(tmp_path, process, disable_extractors_dict): + """Test that list prints one JSON object per stored snapshot.""" + os.chdir(tmp_path) + for url in ['https://example.com', 'https://iana.org']: + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', url], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'list'], + capture_output=True, + text=True, + timeout=30, + ) + + rows = _parse_jsonl(result.stdout) + urls = {row['url'] for row in rows} + + assert result.returncode == 0, result.stderr + assert 'https://example.com' in urls + assert 'https://iana.org' in urls + + +def test_list_filters_by_url_icontains(tmp_path, process, disable_extractors_dict): + """Test that list --url__icontains returns only matching snapshots.""" + os.chdir(tmp_path) + for url in ['https://example.com', 'https://iana.org']: + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', url], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'list', '--url__icontains', 'example.com'], + capture_output=True, + text=True, + timeout=30, + ) + + rows = _parse_jsonl(result.stdout) + assert result.returncode == 0, result.stderr + assert len(rows) == 1 + assert rows[0]['url'] == 'https://example.com' + + +def test_list_filters_by_crawl_id_and_limit(tmp_path, process, disable_extractors_dict): + """Test that crawl-id and limit filters constrain the result set.""" + os.chdir(tmp_path) + for url in ['https://example.com', 'https://iana.org']: + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', url], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + conn = sqlite3.connect("index.sqlite3") + c = conn.cursor() + crawl_id = str(c.execute( + "SELECT crawl_id FROM core_snapshot WHERE url = ?", + ('https://example.com',), + ).fetchone()[0]) + conn.close() + + result = subprocess.run( + ['archivebox', 'list', '--crawl-id', crawl_id, '--limit', '1'], + capture_output=True, + text=True, + timeout=30, + ) + + rows = _parse_jsonl(result.stdout) + assert result.returncode == 0, result.stderr + assert len(rows) == 1 + assert rows[0]['crawl_id'].replace('-', '') == crawl_id.replace('-', '') + assert rows[0]['url'] == 'https://example.com' + + +def test_list_filters_by_status(tmp_path, process, disable_extractors_dict): + """Test that list can filter using the current snapshot status.""" + os.chdir(tmp_path) + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + conn = sqlite3.connect("index.sqlite3") + c = conn.cursor() + status = c.execute("SELECT status FROM core_snapshot LIMIT 1").fetchone()[0] + conn.close() + + result = subprocess.run( + ['archivebox', 'list', '--status', status], + capture_output=True, + text=True, + timeout=30, + ) + + rows = _parse_jsonl(result.stdout) + assert result.returncode == 0, result.stderr + assert len(rows) == 1 + assert rows[0]['status'] == status + + +def test_list_help_lists_filter_options(tmp_path, process): + """Test that list --help documents the supported filter flags.""" + os.chdir(tmp_path) + + result = subprocess.run( + ['archivebox', 'list', '--help'], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0 + assert '--url__icontains' in result.stdout + assert '--crawl-id' in result.stdout + assert '--limit' in result.stdout diff --git a/archivebox/tests/test_cli_remove.py b/archivebox/tests/test_cli_remove.py index 5558e576..54639ea3 100644 --- a/archivebox/tests/test_cli_remove.py +++ b/archivebox/tests/test_cli_remove.py @@ -7,6 +7,21 @@ Verify remove deletes snapshots from DB and filesystem. import os import sqlite3 import subprocess +from pathlib import Path + + +def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None: + candidates = {snapshot_id} + if len(snapshot_id) == 32: + candidates.add(f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}") + elif len(snapshot_id) == 36 and '-' in snapshot_id: + candidates.add(snapshot_id.replace('-', '')) + + for needle in candidates: + for path in data_dir.rglob(needle): + if path.is_dir(): + return path + return None def test_remove_deletes_snapshot_from_db(tmp_path, process, disable_extractors_dict): @@ -44,10 +59,7 @@ def test_remove_deletes_snapshot_from_db(tmp_path, process, disable_extractors_d def test_remove_deletes_archive_directory(tmp_path, process, disable_extractors_dict): - """Test that remove deletes the archive directory when using --delete flag. - - Archive directories are named by timestamp, not by snapshot ID. - """ + """Test that remove --delete removes the current snapshot output directory.""" os.chdir(tmp_path) # Add a snapshot @@ -57,24 +69,21 @@ def test_remove_deletes_archive_directory(tmp_path, process, disable_extractors_ env=disable_extractors_dict, ) - # Get snapshot timestamp conn = sqlite3.connect("index.sqlite3") c = conn.cursor() - timestamp = c.execute("SELECT timestamp FROM core_snapshot").fetchone()[0] + snapshot_id = str(c.execute("SELECT id FROM core_snapshot").fetchone()[0]) conn.close() - archive_dir = tmp_path / "archive" / str(timestamp) - assert archive_dir.exists() + snapshot_dir = _find_snapshot_dir(tmp_path, snapshot_id) + assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id}" - # Remove snapshot with --delete to remove both DB record and directory subprocess.run( ['archivebox', 'remove', 'https://example.com', '--yes', '--delete'], capture_output=True, env=disable_extractors_dict, ) - # Archive directory should be deleted - assert not archive_dir.exists() + assert not snapshot_dir.exists() def test_remove_yes_flag_skips_confirmation(tmp_path, process, disable_extractors_dict): @@ -158,6 +167,35 @@ def test_remove_with_filter(tmp_path, process, disable_extractors_dict): assert result.returncode in [0, 1, 2] +def test_remove_with_regex_filter_deletes_all_matches(tmp_path, process, disable_extractors_dict): + """Test regex filters remove every matching snapshot.""" + os.chdir(tmp_path) + + for url in ['https://example.com', 'https://iana.org']: + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', url], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'remove', '--filter-type=regex', '.*', '--yes'], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + conn = sqlite3.connect("index.sqlite3") + c = conn.cursor() + count_after = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0] + conn.close() + + output = result.stdout.decode("utf-8") + result.stderr.decode("utf-8") + assert count_after == 0 + assert 'Removed' in output or 'Found' in output + + def test_remove_nonexistent_url_fails_gracefully(tmp_path, process, disable_extractors_dict): """Test that removing non-existent URL fails gracefully.""" os.chdir(tmp_path) @@ -169,7 +207,8 @@ def test_remove_nonexistent_url_fails_gracefully(tmp_path, process, disable_extr ) # Should fail or show error - assert result.returncode != 0 or 'not found' in result.stdout.lower() or 'no matches' in result.stdout.lower() + stdout_text = result.stdout.decode('utf-8', errors='replace').lower() + assert result.returncode != 0 or 'not found' in stdout_text or 'no matches' in stdout_text def test_remove_reports_remaining_link_count_correctly(tmp_path, process, disable_extractors_dict): diff --git a/archivebox/tests/test_cli_search.py b/archivebox/tests/test_cli_search.py index 7ae757fc..7d244461 100644 --- a/archivebox/tests/test_cli_search.py +++ b/archivebox/tests/test_cli_search.py @@ -4,6 +4,7 @@ Tests for archivebox search command. Verify search queries snapshots from DB. """ +import json import os import subprocess @@ -65,3 +66,145 @@ def test_search_on_empty_archive(tmp_path, process): # Should complete without error assert result.returncode in [0, 1] + + +def test_search_json_outputs_matching_snapshots(tmp_path, process, disable_extractors_dict): + """Test that search --json returns parseable matching snapshot rows.""" + os.chdir(tmp_path) + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'search', '--json'], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, result.stderr + payload = json.loads(result.stdout) + assert any('example.com' in row.get('url', '') for row in payload) + + +def test_search_json_with_headers_wraps_links_payload(tmp_path, process, disable_extractors_dict): + """Test that search --json --with-headers returns a headers envelope.""" + os.chdir(tmp_path) + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'search', '--json', '--with-headers'], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, result.stderr + payload = json.loads(result.stdout) + links = payload.get('links', payload) + assert any('example.com' in row.get('url', '') for row in links) + + +def test_search_html_outputs_markup(tmp_path, process, disable_extractors_dict): + """Test that search --html renders an HTML response.""" + os.chdir(tmp_path) + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'search', '--html'], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, result.stderr + assert '<' in result.stdout + + +def test_search_csv_outputs_requested_column(tmp_path, process, disable_extractors_dict): + """Test that search --csv emits the requested fields.""" + os.chdir(tmp_path) + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'search', '--csv', 'url', '--with-headers'], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, result.stderr + assert 'url' in result.stdout + assert 'example.com' in result.stdout + + +def test_search_with_headers_requires_structured_output_format(tmp_path, process): + """Test that --with-headers is rejected without --json, --html, or --csv.""" + os.chdir(tmp_path) + + result = subprocess.run( + ['archivebox', 'search', '--with-headers'], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode != 0 + assert 'requires' in result.stderr.lower() or 'json' in result.stderr.lower() + + +def test_search_sort_option_runs_successfully(tmp_path, process, disable_extractors_dict): + """Test that search --sort accepts sortable fields.""" + os.chdir(tmp_path) + for url in ['https://iana.org', 'https://example.com']: + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', url], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'search', '--csv', 'url', '--sort=url'], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0, result.stderr + assert 'example.com' in result.stdout or 'iana.org' in result.stdout + + +def test_search_help_lists_supported_filters(tmp_path, process): + """Test that search --help documents the available filters and output modes.""" + os.chdir(tmp_path) + + result = subprocess.run( + ['archivebox', 'search', '--help'], + capture_output=True, + text=True, + timeout=30, + ) + + assert result.returncode == 0 + assert '--filter-type' in result.stdout or '-f' in result.stdout + assert '--status' in result.stdout + assert '--sort' in result.stdout diff --git a/archivebox/tests/test_cli_status.py b/archivebox/tests/test_cli_status.py index b5eb8dc6..e1d419bf 100644 --- a/archivebox/tests/test_cli_status.py +++ b/archivebox/tests/test_cli_status.py @@ -202,3 +202,24 @@ def test_status_shows_index_file_info(tmp_path, process): # Should mention index assert 'index' in result.stdout.lower() or 'Index' in result.stdout + + +def test_status_help_lists_available_options(tmp_path, process): + """Test that status --help works and documents the command.""" + os.chdir(tmp_path) + result = subprocess.run( + ['archivebox', 'status', '--help'], + capture_output=True, + text=True, + ) + + assert result.returncode == 0 + assert 'status' in result.stdout.lower() or 'statistic' in result.stdout.lower() + + +def test_status_shows_data_directory_path(tmp_path, process): + """Test that status reports which collection directory it is inspecting.""" + os.chdir(tmp_path) + result = subprocess.run(['archivebox', 'status'], capture_output=True, text=True) + + assert 'archive' in result.stdout.lower() or str(tmp_path) in result.stdout diff --git a/archivebox/tests/test_cli_version.py b/archivebox/tests/test_cli_version.py index eee2362e..e7d8d210 100644 --- a/archivebox/tests/test_cli_version.py +++ b/archivebox/tests/test_cli_version.py @@ -77,6 +77,17 @@ def test_version_quiet_outputs_version_number(tmp_path): assert len(parts) >= 2 +def test_version_flag_outputs_version_number(tmp_path): + """Test that top-level --version reports the package version.""" + os.chdir(tmp_path) + result = subprocess.run(['archivebox', '--version'], capture_output=True, text=True) + + assert result.returncode == 0 + version = result.stdout.strip() + assert version + assert len(version.split('.')) >= 2 + + def test_version_shows_system_info_in_initialized_dir(tmp_path, process): """Test that version shows system metadata in initialized directory.""" os.chdir(tmp_path) @@ -148,3 +159,20 @@ def test_version_auto_selects_short_tmp_dir_for_deep_collection_path(tmp_path): assert reported_tmp_dir.exists() assert not reported_tmp_dir.is_relative_to(default_tmp_dir) assert len(f"file://{reported_tmp_dir / 'supervisord.sock'}") <= 96 + + +def test_version_help_lists_quiet_flag(tmp_path): + """Test that version --help documents the quiet output mode.""" + os.chdir(tmp_path) + result = subprocess.run(['archivebox', 'version', '--help'], capture_output=True, text=True) + + assert result.returncode == 0 + assert '--quiet' in result.stdout or '-q' in result.stdout + + +def test_version_invalid_option_fails(tmp_path): + """Test that invalid version options fail cleanly.""" + os.chdir(tmp_path) + result = subprocess.run(['archivebox', 'version', '--invalid-option'], capture_output=True, text=True) + + assert result.returncode != 0 diff --git a/archivebox/tests/test_init.py b/archivebox/tests/test_init.py deleted file mode 100644 index 3a3697bd..00000000 --- a/archivebox/tests/test_init.py +++ /dev/null @@ -1,94 +0,0 @@ -# archivebox init -# archivebox add - -import os -import subprocess -import sqlite3 - -from archivebox.config.common import STORAGE_CONFIG - -from .fixtures import disable_extractors_dict, process - -FIXTURES = (disable_extractors_dict, process) - -DIR_PERMISSIONS = STORAGE_CONFIG.OUTPUT_PERMISSIONS.replace('6', '7').replace('4', '5') - -def test_init(tmp_path, process): - assert "Initializing a new ArchiveBox" in process.stdout.decode("utf-8") - -def test_update(tmp_path, process): - os.chdir(tmp_path) - update_process = subprocess.run(['archivebox', 'init'], capture_output=True) - assert "updating existing ArchiveBox" in update_process.stdout.decode("utf-8") - -def test_add_link(tmp_path, process, disable_extractors_dict): - os.chdir(tmp_path) - add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, env=disable_extractors_dict) - assert add_process.returncode == 0, add_process.stderr.decode("utf-8") - - # In the new architecture, URLs are saved to source files - # Check that a source file was created with the URL - sources_dir = tmp_path / "sources" - assert sources_dir.exists(), "Sources directory should be created" - source_files = list(sources_dir.glob("*cli_add.txt")) - assert len(source_files) >= 1, "Source file should be created" - source_content = source_files[0].read_text() - assert "https://example.com" in source_content - - -def test_add_multiple_urls(tmp_path, process, disable_extractors_dict): - """Test adding multiple URLs via command line arguments""" - os.chdir(tmp_path) - add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com', 'https://iana.org'], - capture_output=True, env=disable_extractors_dict) - assert add_process.returncode == 0, add_process.stderr.decode("utf-8") - - # Check that a source file was created with both URLs - sources_dir = tmp_path / "sources" - assert sources_dir.exists(), "Sources directory should be created" - source_files = list(sources_dir.glob("*cli_add.txt")) - assert len(source_files) >= 1, "Source file should be created" - source_content = source_files[-1].read_text() - assert "https://example.com" in source_content - assert "https://iana.org" in source_content - -def test_correct_permissions_output_folder(tmp_path, process): - index_files = ['index.sqlite3', 'archive'] - for file in index_files: - file_path = tmp_path / file - assert oct(file_path.stat().st_mode)[-3:] in (STORAGE_CONFIG.OUTPUT_PERMISSIONS, DIR_PERMISSIONS) - -def test_correct_permissions_add_command_results(tmp_path, process, disable_extractors_dict): - os.chdir(tmp_path) - add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, - env=disable_extractors_dict) - assert add_process.returncode == 0, add_process.stderr.decode("utf-8") - - # Check database permissions - assert oct((tmp_path / "index.sqlite3").stat().st_mode)[-3:] in (STORAGE_CONFIG.OUTPUT_PERMISSIONS, DIR_PERMISSIONS) - -def test_collision_urls_different_timestamps(tmp_path, process, disable_extractors_dict): - os.chdir(tmp_path) - subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, - env=disable_extractors_dict) - subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True, - env=disable_extractors_dict) - - # Check both URLs are in database - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - count = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0] - conn.close() - - assert count == 2 - -def test_unrecognized_folders(tmp_path, process, disable_extractors_dict): - os.chdir(tmp_path) - subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, - env=disable_extractors_dict) - (tmp_path / "archive" / "some_random_folder").mkdir(parents=True, exist_ok=True) - - init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict) - # Just check that init completes successfully - assert init_process.returncode == 0 diff --git a/archivebox/tests/test_install.py b/archivebox/tests/test_install.py deleted file mode 100644 index af967500..00000000 --- a/archivebox/tests/test_install.py +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/env python3 -"""Integration tests for archivebox install command.""" - -import os -import subprocess -import sqlite3 - -import pytest - - - -class TestInstallDryRun: - """Test the dry-run mode of install command.""" - - def test_dry_run_prints_message(self, tmp_path, process): - """Test that dry-run mode prints appropriate message.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'install', '--dry-run'], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - assert 'Dry run' in result.stdout - - def test_dry_run_does_not_create_crawl(self, tmp_path, process): - """Test that dry-run mode doesn't create a crawl.""" - os.chdir(tmp_path) - - # Get initial crawl count - conn = sqlite3.connect('index.sqlite3') - c = conn.cursor() - c.execute("SELECT COUNT(*) FROM crawls_crawl") - initial_count = c.fetchone()[0] - conn.close() - - # Run install with dry-run - result = subprocess.run( - ['archivebox', 'install', '--dry-run'], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - - # Check crawl count unchanged - conn = sqlite3.connect('index.sqlite3') - c = conn.cursor() - c.execute("SELECT COUNT(*) FROM crawls_crawl") - final_count = c.fetchone()[0] - conn.close() - - assert final_count == initial_count - - -class TestInstallOutput: - """Test the output/messages from install command.""" - - def test_install_prints_detecting_message(self, tmp_path, process, disable_extractors_dict): - """Test that install prints detecting dependencies message.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'install', '--dry-run'], - capture_output=True, - text=True, - env=disable_extractors_dict, - ) - - assert result.returncode == 0 - # Should mention detecting or dependencies - output = result.stdout.lower() - assert 'detect' in output or 'dependenc' in output or 'dry run' in output - - -class TestInstallCLI: - """Test the CLI interface for install command.""" - - def test_cli_help(self, tmp_path): - """Test that --help works for install command.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'install', '--help'], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - assert '--dry-run' in result.stdout or '-d' in result.stdout - - def test_cli_invalid_option(self, tmp_path): - """Test that invalid options are handled.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'install', '--invalid-option'], - capture_output=True, - text=True, - ) - - # Should fail with non-zero exit code - assert result.returncode != 0 - - -class TestInstallInitialization: - """Test that install initializes the data directory if needed.""" - - def test_install_from_empty_dir(self, tmp_path): - """Test that install from empty dir initializes first.""" - os.chdir(tmp_path) - - # Don't use process fixture - start from empty dir - result = subprocess.run( - ['archivebox', 'install', '--dry-run'], - capture_output=True, - text=True, - ) - - # Should either initialize or show dry run message - output = result.stdout - assert 'Initializing' in output or 'Dry run' in output or 'init' in output.lower() - - -if __name__ == '__main__': - pytest.main([__file__, '-v']) diff --git a/archivebox/tests/test_list.py b/archivebox/tests/test_list.py deleted file mode 100644 index 2aaad4fa..00000000 --- a/archivebox/tests/test_list.py +++ /dev/null @@ -1,98 +0,0 @@ -import json -import subprocess - -from .fixtures import disable_extractors_dict, process - -FIXTURES = (disable_extractors_dict, process) - -def test_search_json(process, disable_extractors_dict): - subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - search_process = subprocess.run(["archivebox", "search", "--json"], capture_output=True) - output_str = search_process.stdout.decode("utf-8").strip() - # Handle potential control characters in output - try: - output_json = json.loads(output_str) - except json.JSONDecodeError: - # Try with strict=False if there are control characters - import re - # Remove ANSI escape sequences and control characters - clean_str = re.sub(r'\x1b\[[0-9;]*m', '', output_str) - clean_str = re.sub(r'[\x00-\x1f\x7f]', lambda m: ' ' if m.group(0) in '\t\n\r' else '', clean_str) - output_json = json.loads(clean_str) - # Verify we get at least one snapshot back - assert len(output_json) >= 1 - # Should include the requested URL - assert any("example.com" in entry.get("url", "") for entry in output_json) - - -def test_search_json_headers(process, disable_extractors_dict): - subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - search_process = subprocess.run(["archivebox", "search", "--json", "--with-headers"], capture_output=True) - output_str = search_process.stdout.decode("utf-8").strip() - # Handle potential control characters in output - try: - output_json = json.loads(output_str) - except json.JSONDecodeError: - # Try with strict=False if there are control characters - import re - # Remove ANSI escape sequences and control characters - clean_str = re.sub(r'\x1b\[[0-9;]*m', '', output_str) - clean_str = re.sub(r'[\x00-\x1f\x7f]', lambda m: ' ' if m.group(0) in '\t\n\r' else '', clean_str) - output_json = json.loads(clean_str) - # The response should have a links key with headers mode - links = output_json.get("links", output_json) - assert len(links) >= 1 - -def test_search_html(process, disable_extractors_dict): - subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - search_process = subprocess.run(["archivebox", "search", "--html"], capture_output=True) - output_html = search_process.stdout.decode("utf-8") - # Should contain some HTML and reference to the source file - assert "sources" in output_html or "cli_add" in output_html or "<" in output_html - -def test_search_html_headers(process, disable_extractors_dict): - subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - search_process = subprocess.run(["archivebox", "search", "--html", "--with-headers"], capture_output=True) - output_html = search_process.stdout.decode("utf-8") - # Should contain HTML - assert "<" in output_html - -def test_search_csv(process, disable_extractors_dict): - subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - search_process = subprocess.run(["archivebox", "search", "--csv", "url"], capture_output=True) - output_csv = search_process.stdout.decode("utf-8") - # Should contain the requested URL - assert "example.com" in output_csv - -def test_search_csv_headers(process, disable_extractors_dict): - subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - search_process = subprocess.run(["archivebox", "search", "--csv", "url", "--with-headers"], capture_output=True) - output_csv = search_process.stdout.decode("utf-8") - # Should have url header and requested URL - assert "url" in output_csv - assert "example.com" in output_csv - -def test_search_with_headers_requires_format(process): - search_process = subprocess.run(["archivebox", "search", "--with-headers"], capture_output=True) - stderr = search_process.stderr.decode("utf-8") - assert "--with-headers" in stderr and ("requires" in stderr or "can only be used" in stderr) - -def test_sort_by_url(process, disable_extractors_dict): - # Add two URLs - they will create separate source files - subprocess.run(["archivebox", "add", "--index-only", "https://iana.org", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"], - capture_output=True, env=disable_extractors_dict) - - # Search with sort should return results (even if they're file:// URLs) - search_process = subprocess.run(["archivebox", "search", "--csv", "url", "--sort=url"], capture_output=True) - output = search_process.stdout.decode("utf-8") - lines = [line for line in output.strip().split("\n") if line] - # Should have at least 2 snapshots (the source file snapshots) - assert len(lines) >= 2 diff --git a/archivebox/tests/test_migrations_04_to_09.py b/archivebox/tests/test_migrations_04_to_09.py index a197a09e..f98345c7 100644 --- a/archivebox/tests/test_migrations_04_to_09.py +++ b/archivebox/tests/test_migrations_04_to_09.py @@ -12,6 +12,7 @@ import sqlite3 import tempfile import unittest from pathlib import Path +from typing import cast from .migrations_helpers import ( SCHEMA_0_4, @@ -74,7 +75,7 @@ class TestMigrationFrom04x(unittest.TestCase): # Collect unique tags from original data original_tags = set() - for tags_str in self.original_data['tags_str']: + for tags_str in cast(list[str], self.original_data['tags_str']): if tags_str: for tag in tags_str.split(','): original_tags.add(tag.strip()) diff --git a/archivebox/tests/test_remove.py b/archivebox/tests/test_remove.py deleted file mode 100644 index 078f4e06..00000000 --- a/archivebox/tests/test_remove.py +++ /dev/null @@ -1,89 +0,0 @@ -import os -import sqlite3 -import subprocess - -from .fixtures import disable_extractors_dict, process - -FIXTURES = (disable_extractors_dict, process) - -def test_remove_single_snapshot(tmp_path, process, disable_extractors_dict): - """Test removing a snapshot by URL pattern""" - os.chdir(tmp_path) - # Add a URL - creates source file snapshot - subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict) - - # Verify snapshot exists - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - count_before = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0] - conn.close() - assert count_before >= 1 - - # Remove all snapshots (including source file snapshots) - remove_process = subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes'], capture_output=True) - # Check that it ran successfully (either output indicates success or return code 0) - output = remove_process.stdout.decode("utf-8") + remove_process.stderr.decode("utf-8") - assert remove_process.returncode == 0 or "removed" in output.lower() or "Found" in output - - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - count = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0] - conn.close() - - assert count == 0 - - -def test_remove_with_delete_flag(tmp_path, process, disable_extractors_dict): - """Test removing snapshot with --delete also removes archive folder""" - os.chdir(tmp_path) - subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict) - - # Get archives before delete - archive_dir = tmp_path / "archive" - archives_before = list(archive_dir.iterdir()) if archive_dir.exists() else [] - - # Only run the rest of the test if archives were created - if archives_before: - subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True) - archives_after = list(archive_dir.iterdir()) if archive_dir.exists() else [] - assert len(archives_after) < len(archives_before) - else: - # With --index-only, archive folders may not be created immediately - # Just verify that remove command doesn't error - remove_result = subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True) - assert remove_result.returncode in (0, 1) # 0 = success, 1 = no matches - - -def test_remove_regex(tmp_path, process, disable_extractors_dict): - """Test removing snapshots by regex pattern""" - os.chdir(tmp_path) - subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict) - subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True, env=disable_extractors_dict) - - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - count_before = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0] - conn.close() - assert count_before >= 2 - - subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True) - - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - count_after = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0] - conn.close() - assert count_after == 0 - - -def test_add_creates_crawls(tmp_path, process, disable_extractors_dict): - """Test that adding URLs creates crawls in database""" - os.chdir(tmp_path) - subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict) - subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True, env=disable_extractors_dict) - - conn = sqlite3.connect("index.sqlite3") - c = conn.cursor() - crawl_count = c.execute("SELECT COUNT() FROM crawls_crawl").fetchone()[0] - conn.close() - - assert crawl_count == 2 diff --git a/archivebox/tests/test_search.py b/archivebox/tests/test_search.py deleted file mode 100644 index 9b141be8..00000000 --- a/archivebox/tests/test_search.py +++ /dev/null @@ -1,142 +0,0 @@ -#!/usr/bin/env python3 -"""Integration tests for archivebox search command.""" - -import os -import subprocess - -import pytest - - - -def test_search_returns_snapshots(tmp_path, process, disable_extractors_dict): - """Test that search returns snapshots.""" - os.chdir(tmp_path) - - # Add some snapshots - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - - result = subprocess.run( - ['archivebox', 'search'], - capture_output=True, - text=True, - ) - - # Should return some output (path or URL info) - assert result.stdout.strip() != '' or result.returncode == 0 - - -def test_search_filter_by_substring(tmp_path, process, disable_extractors_dict): - """Test that substring filter works.""" - os.chdir(tmp_path) - - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - - # Search with filter - may not find if URL isn't stored as expected - result = subprocess.run( - ['archivebox', 'search', '--filter-type=substring', 'example'], - capture_output=True, - text=True, - ) - - # Should run without error - assert result.returncode == 0 or 'No Snapshots' in result.stderr - - -def test_search_sort_option(tmp_path, process, disable_extractors_dict): - """Test that --sort option works.""" - os.chdir(tmp_path) - - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - - result = subprocess.run( - ['archivebox', 'search', '--sort=url'], - capture_output=True, - text=True, - ) - - # Should run without error - assert result.returncode == 0 - - -def test_search_with_headers_requires_format(tmp_path, process): - """Test that --with-headers requires --json, --html, or --csv.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'search', '--with-headers'], - capture_output=True, - text=True, - ) - - # Should fail with error message - assert result.returncode != 0 - assert 'requires' in result.stderr.lower() or 'json' in result.stderr.lower() - - -def test_search_status_option(tmp_path, process, disable_extractors_dict): - """Test that --status option filters by status.""" - os.chdir(tmp_path) - - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - - result = subprocess.run( - ['archivebox', 'search', '--status=indexed'], - capture_output=True, - text=True, - ) - - # Should run without error - assert result.returncode == 0 - - -def test_search_no_snapshots_message(tmp_path, process): - """Test that searching empty archive shows appropriate output.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'search'], - capture_output=True, - text=True, - ) - - # Should complete (empty results are OK) - assert result.returncode == 0 - - -class TestSearchCLI: - """Test the CLI interface for search command.""" - - def test_cli_help(self, tmp_path, process): - """Test that --help works for search command.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'search', '--help'], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - assert '--filter-type' in result.stdout or '-f' in result.stdout - assert '--status' in result.stdout - assert '--sort' in result.stdout - - -if __name__ == '__main__': - pytest.main([__file__, '-v']) diff --git a/archivebox/tests/test_status.py b/archivebox/tests/test_status.py deleted file mode 100644 index 9035374d..00000000 --- a/archivebox/tests/test_status.py +++ /dev/null @@ -1,195 +0,0 @@ -#!/usr/bin/env python3 -"""Integration tests for archivebox status command.""" - -import os -import subprocess - -import pytest - - - -def test_status_shows_index_info(tmp_path, process): - """Test that status shows index information.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should show index scanning info - assert 'index' in result.stdout.lower() or 'Index' in result.stdout - - -def test_status_shows_snapshot_count(tmp_path, process, disable_extractors_dict): - """Test that status shows snapshot count.""" - os.chdir(tmp_path) - - # Add some snapshots - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://iana.org'], - capture_output=True, - env=disable_extractors_dict, - ) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should show link/snapshot count - assert '2' in result.stdout or 'links' in result.stdout.lower() - - -def test_status_shows_archive_size(tmp_path, process, disable_extractors_dict): - """Test that status shows archive size information.""" - os.chdir(tmp_path) - - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should show size info (bytes, KB, MB, etc) - assert 'Size' in result.stdout or 'size' in result.stdout or 'B' in result.stdout - - -def test_status_shows_indexed_count(tmp_path, process, disable_extractors_dict): - """Test that status shows indexed folder count.""" - os.chdir(tmp_path) - - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should show indexed count - assert 'indexed' in result.stdout.lower() - - -def test_status_shows_archived_vs_unarchived(tmp_path, process, disable_extractors_dict): - """Test that status shows archived vs unarchived counts.""" - os.chdir(tmp_path) - - # Add index-only snapshot (unarchived) - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should show archived/unarchived categories - assert 'archived' in result.stdout.lower() or 'unarchived' in result.stdout.lower() - - -def test_status_shows_data_directory_info(tmp_path, process): - """Test that status shows data directory path.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should show data directory or archive path - assert 'archive' in result.stdout.lower() or str(tmp_path) in result.stdout - - -def test_status_shows_user_info(tmp_path, process): - """Test that status shows user information.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should show user info section - assert 'user' in result.stdout.lower() or 'login' in result.stdout.lower() - - -def test_status_empty_archive(tmp_path, process): - """Test status on empty archive shows zero counts.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should still run successfully - assert result.returncode == 0 or 'index' in result.stdout.lower() - # Should show 0 links - assert '0' in result.stdout or 'links' in result.stdout.lower() - - -def test_status_shows_valid_vs_invalid(tmp_path, process, disable_extractors_dict): - """Test that status shows valid vs invalid folder counts.""" - os.chdir(tmp_path) - - subprocess.run( - ['archivebox', 'add', '--index-only', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - ) - - result = subprocess.run( - ['archivebox', 'status'], - capture_output=True, - text=True, - ) - - # Should show valid/invalid categories - assert 'valid' in result.stdout.lower() or 'present' in result.stdout.lower() - - -class TestStatusCLI: - """Test the CLI interface for status command.""" - - def test_cli_help(self, tmp_path, process): - """Test that --help works for status command.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'status', '--help'], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Help should show some info about the command - assert 'status' in result.stdout.lower() or 'statistic' in result.stdout.lower() - - -if __name__ == '__main__': - pytest.main([__file__, '-v']) diff --git a/archivebox/tests/test_version.py b/archivebox/tests/test_version.py deleted file mode 100644 index 7ad7705d..00000000 --- a/archivebox/tests/test_version.py +++ /dev/null @@ -1,158 +0,0 @@ -#!/usr/bin/env python3 -"""Integration tests for archivebox version command.""" - -import os -import subprocess - -import pytest - - - -class TestVersionQuiet: - """Test the quiet/minimal version output.""" - - def test_version_prints_version_number(self, tmp_path): - """Test that version prints the version number.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'version', '--quiet'], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Should contain a version string like "0.8.0" or similar - version = result.stdout.strip() - assert version - # Version should be a valid semver-ish format - parts = version.split('.') - assert len(parts) >= 2 # At least major.minor - - def test_version_flag_prints_version_number(self, tmp_path): - """Test that --version flag prints the version number.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', '--version'], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - version = result.stdout.strip() - assert version - parts = version.split('.') - assert len(parts) >= 2 - - -class TestVersionFull: - """Test the full version output.""" - - def test_version_shows_system_info(self, tmp_path, process): - """Test that version shows system information.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'version'], - capture_output=True, - text=True, - ) - - output = result.stdout - - # Should show basic system info (exit code may be 1 if binaries missing) - assert 'ArchiveBox' in output - - def test_version_shows_binary_section(self, tmp_path, process): - """Test that version shows binary dependencies section.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'version'], - capture_output=True, - text=True, - ) - - output = result.stdout - - # Should show binary dependencies section - assert 'Binary' in output or 'Dependenc' in output - - def test_version_shows_data_locations(self, tmp_path, process): - """Test that version shows data locations.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'version'], - capture_output=True, - text=True, - ) - - output = result.stdout - - # Should show data/code locations - assert 'Data' in output or 'location' in output.lower() or 'DIR' in output or 'Code' in output - - -class TestVersionWithBinaries: - """Test version output after running install.""" - - def test_version_shows_binary_status(self, tmp_path, process, disable_extractors_dict): - """Test that version shows binary status (installed or not).""" - os.chdir(tmp_path) - - # First run install (with dry-run to speed up) - subprocess.run( - ['archivebox', 'install', '--dry-run'], - capture_output=True, - text=True, - env=disable_extractors_dict, - ) - - # Now check version - result = subprocess.run( - ['archivebox', 'version'], - capture_output=True, - text=True, - env=disable_extractors_dict, - ) - - output = result.stdout - - # Should show binary status (either installed or not installed) - assert 'installed' in output.lower() or 'Binary' in output - - -class TestVersionCLI: - """Test the CLI interface for version command.""" - - def test_cli_help(self, tmp_path): - """Test that --help works for version command.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'version', '--help'], - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - assert '--quiet' in result.stdout or '-q' in result.stdout - - def test_cli_invalid_option(self, tmp_path): - """Test that invalid options are handled.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'version', '--invalid-option'], - capture_output=True, - text=True, - ) - - # Should fail with non-zero exit code - assert result.returncode != 0 - - -if __name__ == '__main__': - pytest.main([__file__, '-v']) diff --git a/archivebox/uuid_compat.py b/archivebox/uuid_compat.py index d9b7c456..5a422a47 100755 --- a/archivebox/uuid_compat.py +++ b/archivebox/uuid_compat.py @@ -1,40 +1,17 @@ -"""UUID7 compatibility layer for Python 3.13+ - -Python 3.14+ has native uuid7 support. For Python 3.13, we use uuid_extensions. - -IMPORTANT: We also monkey-patch uuid.uuid7 for backward compatibility with -migrations that were auto-generated on Python 3.14+ systems. -""" +"""UUID7 compatibility layer.""" import sys import uuid -import functools +from importlib import import_module if sys.version_info >= (3, 14): - from uuid import uuid7 as _uuid7 + _UUID7_GENERATOR = getattr(uuid, 'uuid7') else: - try: - from uuid_extensions import uuid7 as _uuid7 - except ImportError: - raise ImportError( - "uuid_extensions package is required for Python <3.14. " - "Install it with: pip install uuid_extensions" - ) - - # Monkey-patch uuid module for migrations generated on Python 3.14+ - # that reference uuid.uuid7 directly - if not hasattr(uuid, 'uuid7'): - uuid.uuid7 = _uuid7 + _UUID7_GENERATOR = getattr(import_module('uuid_extensions'), 'uuid7') -@functools.wraps(_uuid7) -def uuid7(): - """Generate a UUID7 (time-ordered UUID). - - This wrapper ensures Django migrations always reference - 'archivebox.uuid_compat.uuid7' regardless of Python version. - """ - return _uuid7() +def uuid7() -> uuid.UUID: + return _UUID7_GENERATOR() __all__ = ['uuid7'] diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index 7dbd66f0..f0ea7b96 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -31,7 +31,7 @@ __package__ = 'archivebox.workers' import os import time from typing import Type -from datetime import timedelta +from datetime import datetime, timedelta from multiprocessing import Process as MPProcess from pathlib import Path @@ -189,7 +189,7 @@ class Orchestrator: event='Shutting down', indent_level=0, pid=self.pid, - error=error if error and not isinstance(error, KeyboardInterrupt) else None, + error=error if isinstance(error, Exception) and not isinstance(error, KeyboardInterrupt) else None, ) def get_total_worker_count(self) -> int: @@ -567,7 +567,8 @@ class Orchestrator: status=ArchiveResult.StatusChoices.STARTED, ).select_related('process') for ar in started_ars: - if ar.process_id and ar.process and ar.process.status == Process.StatusChoices.RUNNING: + process_id = getattr(ar, 'process_id', None) + if process_id and ar.process and ar.process.status == Process.StatusChoices.RUNNING: try: ar.process.kill_tree(graceful_timeout=0.0) except Exception: @@ -904,28 +905,29 @@ class Orchestrator: size = '' stderr_tail = '' if ar: - if ar.process_id and ar.process: + process_id = getattr(ar, 'process_id', None) + if process_id and ar.process: stderr_tail = _tail_stderr_line(ar.process) if ar.status == ArchiveResult.StatusChoices.STARTED: status = 'started' is_running = True is_pending = False - start_ts = ar.start_ts or (ar.process.started_at if ar.process_id and ar.process else None) + start_ts = ar.start_ts or (ar.process.started_at if process_id and ar.process else None) if start_ts: elapsed = _format_seconds((now - start_ts).total_seconds()) hook_timeout = None - if ar.process_id and ar.process and ar.process.timeout: + if process_id and ar.process and ar.process.timeout: hook_timeout = ar.process.timeout hook_timeout = hook_timeout or hook_timeouts.get(hook_name) if hook_timeout: timeout = _format_seconds(hook_timeout) else: status = ar.status - if ar.process_id and ar.process and ar.process.exit_code == 137: + if process_id and ar.process and ar.process.exit_code == 137: status = 'failed' is_pending = False - start_ts = ar.start_ts or (ar.process.started_at if ar.process_id and ar.process else None) - end_ts = ar.end_ts or (ar.process.ended_at if ar.process_id and ar.process else None) + start_ts = ar.start_ts or (ar.process.started_at if process_id and ar.process else None) + end_ts = ar.end_ts or (ar.process.ended_at if process_id and ar.process else None) if start_ts and end_ts: elapsed = _format_seconds((end_ts - start_ts).total_seconds()) size = _format_size(getattr(ar, 'output_size', None)) @@ -1093,7 +1095,7 @@ class Orchestrator: from archivebox.core.models import Snapshot # Get all started snapshots (optionally filtered by crawl_id) - snapshot_filter = {'status': 'started'} + snapshot_filter: dict[str, str | datetime] = {'status': 'started'} if self.crawl_id: snapshot_filter['crawl_id'] = self.crawl_id else: diff --git a/archivebox/workers/supervisord_util.py b/archivebox/workers/supervisord_util.py index b85865cc..1adcdaca 100644 --- a/archivebox/workers/supervisord_util.py +++ b/archivebox/workers/supervisord_util.py @@ -335,6 +335,7 @@ def start_worker(supervisor, daemon, lazy=False): for added in added: supervisor.addProcessGroup(added) + procs = [] for _ in range(25): procs = supervisor.getAllProcessInfo() for proc in procs: diff --git a/archivebox/workers/tests/test_scheduled_crawls.py b/archivebox/workers/tests/test_scheduled_crawls.py index 0a7645be..9162279e 100644 --- a/archivebox/workers/tests/test_scheduled_crawls.py +++ b/archivebox/workers/tests/test_scheduled_crawls.py @@ -1,7 +1,9 @@ from datetime import timedelta +from typing import cast from unittest.mock import patch from django.contrib.auth import get_user_model +from django.contrib.auth.models import UserManager from django.test import TestCase from django.utils import timezone @@ -12,7 +14,8 @@ from archivebox.workers.worker import CrawlWorker class TestScheduledCrawlMaterialization(TestCase): def setUp(self): - self.user = get_user_model().objects.create_user( + user_manager = cast(UserManager, get_user_model().objects) + self.user = user_manager.create_user( username='schedule-user', password='password', ) @@ -52,6 +55,8 @@ class TestScheduledCrawlMaterialization(TestCase): self.assertEqual(scheduled_crawls.count(), 2) queued_crawl = scheduled_crawls.last() + self.assertIsNotNone(queued_crawl) + assert queued_crawl is not None self.assertEqual(queued_crawl.status, Crawl.StatusChoices.QUEUED) self.assertEqual(queued_crawl.urls, 'https://example.com/feed.xml') self.assertEqual(queued_crawl.max_depth, 1) @@ -63,7 +68,7 @@ class TestScheduledCrawlMaterialization(TestCase): Orchestrator(exit_on_idle=True)._materialize_due_schedules() self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1) - Orchestrator(exit_on_idle=False, crawl_id=str(schedule.template_id))._materialize_due_schedules() + Orchestrator(exit_on_idle=False, crawl_id=str(schedule.template.id))._materialize_due_schedules() self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1) @patch.object(CrawlWorker, 'start') diff --git a/archivebox/workers/tests/test_snapshot_worker.py b/archivebox/workers/tests/test_snapshot_worker.py index 4233e69c..bb2be6d4 100644 --- a/archivebox/workers/tests/test_snapshot_worker.py +++ b/archivebox/workers/tests/test_snapshot_worker.py @@ -1,5 +1,6 @@ from pathlib import Path from types import SimpleNamespace +from typing import Any, cast from unittest.mock import patch from django.test import SimpleTestCase @@ -11,14 +12,14 @@ class TestSnapshotWorkerRetryForegroundHooks(SimpleTestCase): def _make_worker(self): worker = SnapshotWorker.__new__(SnapshotWorker) worker.pid = 12345 - worker.snapshot = SimpleNamespace( + cast(Any, worker).snapshot = SimpleNamespace( status='started', refresh_from_db=lambda: None, ) worker._snapshot_exceeded_hard_timeout = lambda: False worker._seal_snapshot_due_to_timeout = lambda: None worker._run_hook = lambda *args, **kwargs: SimpleNamespace() - worker._wait_for_hook = lambda *args, **kwargs: None + worker._wait_for_hook = lambda process, ar: None return worker @patch('archivebox.workers.worker.log_worker_event') @@ -49,10 +50,10 @@ class TestSnapshotWorkerRetryForegroundHooks(SimpleTestCase): run_calls.append((args, kwargs)) return SimpleNamespace() - def wait_for_hook(process, archive_result): - wait_calls.append((process, archive_result)) - archive_result.status = 'succeeded' - archive_result.output_files = {'singlefile.html': {}} + def wait_for_hook(process, ar): + wait_calls.append((process, ar)) + ar.status = 'succeeded' + ar.output_files = {'singlefile.html': {}} archive_result = SimpleNamespace( status='failed', diff --git a/pyproject.toml b/pyproject.toml index f0f2f779..d179814d 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,7 +68,6 @@ dependencies = [ "python-benedict[io,parse]>=0.33.2", # for: dict replacement all over the codebase to allow .attr-style access "base32-crockford>=0.3.0", # for: encoding UUIDs in base32 ### Static Typing - "mypy-extensions>=1.0.0", # for: django-stubs type hints (TODO: remove in favor of pylance/pyright?) "django-stubs>=5.0.4", # for: vscode type hints on models and common django APIs ### API clients "requests>=2.32.3", # for: fetching title, static files, headers (TODO: replace with httpx?) @@ -86,7 +85,7 @@ dependencies = [ "abx-plugins>=1.9.11", # shared ArchiveBox plugin package with install_args-only overrides "gallery-dl>=1.31.1", ### UUID7 backport for Python <3.14 - "uuid7>=0.1.0; python_version < '3.14'", # for: uuid7 support on Python 3.13 (provides uuid_extensions module) + "uuid7>=0.1.0; python_version < '3.14'", # provides the uuid_extensions module on Python 3.13 "pytest-django>=4.11.1", ] @@ -244,21 +243,6 @@ directory = "htmlcov" output = "coverage.json" show_contexts = true -[tool.mypy] -mypy_path = "typings" -namespace_packages = true -explicit_package_bases = true -# follow_imports = "silent" -# ignore_missing_imports = true -# disallow_incomplete_defs = true -# disallow_untyped_defs = true -# disallow_untyped_decorators = true -# exclude = "tests/.*" -plugins = ["mypy_django_plugin.main"] - -[tool.django-stubs] -django_settings_module = "archivebox.core.settings" - [tool.pyright] include = [ "archivebox",