diff --git a/README.md b/README.md
index 8f5db3cd..ea9e84e1 100644
--- a/README.md
+++ b/README.md
@@ -795,7 +795,7 @@ ArchiveBox bundles industry-standard tools like [Google Chrome](https://github.c
Web Server: Django + daphne (ASGI)
Database: Django ORM saving to SQLite3 ./data/index.sqlite3
Job Queue: Custom orchestrator using supervisord for worker management
-Build/test/lint: uv / mypy+pyright+pytest / ruff
+Build/test/lint: uv / pyright+ty+pytest / ruff
Subdependencies: abx-pkg installs apt/brew/pip/npm pkgs at runtime (e.g. yt-dlp, singlefile, readability, git)
@@ -1464,7 +1464,7 @@ archivebox install
./bin/lint.sh
./bin/test.sh
```
-(uses `flake8`, `mypy`, and `pytest -s`)
+(uses `ruff`, `pyright`, `ty`, and `pytest -s`)
diff --git a/archivebox/__init__.py b/archivebox/__init__.py
index af73f6e2..3bfd3dde 100755
--- a/archivebox/__init__.py
+++ b/archivebox/__init__.py
@@ -16,9 +16,6 @@ import sys
from pathlib import Path
from typing import Protocol, cast
-# Import uuid_compat early to monkey-patch uuid.uuid7 before Django loads migrations
-# This fixes migrations generated on Python 3.14+ that reference uuid.uuid7 directly
-from archivebox import uuid_compat # noqa: F401
from abx_plugins import get_plugins_dir
diff --git a/archivebox/cli/archivebox_search.py b/archivebox/cli/archivebox_search.py
index fa7b9405..6a421138 100644
--- a/archivebox/cli/archivebox_search.py
+++ b/archivebox/cli/archivebox_search.py
@@ -8,7 +8,6 @@ from pathlib import Path
from typing import TYPE_CHECKING, Callable
import rich_click as click
-from rich import print
from django.db.models import Q, QuerySet
@@ -212,7 +211,11 @@ def search(filter_patterns: list[str] | None=None,
folders: dict[str, Snapshot | None] = {snapshot.output_dir: snapshot for snapshot in snapshots}
output = printable_folders(folders, with_headers)
- print(output)
+ # Structured exports must be written directly to stdout.
+ # rich.print() reflows long lines to console width, which corrupts JSON/CSV/HTML output.
+ sys.stdout.write(output)
+ if not output.endswith('\n'):
+ sys.stdout.write('\n')
return output
diff --git a/archivebox/cli/tests_piping.py b/archivebox/cli/tests_piping.py
index 623c2567..c36ae8e4 100644
--- a/archivebox/cli/tests_piping.py
+++ b/archivebox/cli/tests_piping.py
@@ -29,6 +29,7 @@ import tempfile
import unittest
from io import StringIO
from pathlib import Path
+from typing import TypeVar
# Test configuration - disable slow extractors
TEST_CONFIG = {
@@ -58,6 +59,14 @@ TEST_CONFIG = {
os.environ.update(TEST_CONFIG)
+T = TypeVar('T')
+
+
+def require(value: T | None) -> T:
+ if value is None:
+ raise AssertionError('Expected value to be present')
+ return value
+
# =============================================================================
# JSONL Utility Tests
@@ -70,8 +79,7 @@ class TestJSONLParsing(unittest.TestCase):
"""Plain URLs should be parsed as Snapshot records."""
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
- result = parse_line('https://example.com')
- self.assertIsNotNone(result)
+ result = require(parse_line('https://example.com'))
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['url'], 'https://example.com')
@@ -80,8 +88,7 @@ class TestJSONLParsing(unittest.TestCase):
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
line = '{"type": "Snapshot", "url": "https://example.com", "tags": "test,demo"}'
- result = parse_line(line)
- self.assertIsNotNone(result)
+ result = require(parse_line(line))
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['url'], 'https://example.com')
self.assertEqual(result['tags'], 'test,demo')
@@ -91,8 +98,7 @@ class TestJSONLParsing(unittest.TestCase):
from archivebox.misc.jsonl import parse_line, TYPE_CRAWL
line = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com", "max_depth": 1}'
- result = parse_line(line)
- self.assertIsNotNone(result)
+ result = require(parse_line(line))
self.assertEqual(result['type'], TYPE_CRAWL)
self.assertEqual(result['id'], 'abc123')
self.assertEqual(result['urls'], 'https://example.com')
@@ -103,8 +109,7 @@ class TestJSONLParsing(unittest.TestCase):
from archivebox.misc.jsonl import parse_line
line = '{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}'
- result = parse_line(line)
- self.assertIsNotNone(result)
+ result = require(parse_line(line))
self.assertEqual(result['id'], 'abc123')
self.assertEqual(result['url'], 'https://example.com')
@@ -113,8 +118,7 @@ class TestJSONLParsing(unittest.TestCase):
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
uuid = '01234567-89ab-cdef-0123-456789abcdef'
- result = parse_line(uuid)
- self.assertIsNotNone(result)
+ result = require(parse_line(uuid))
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['id'], uuid)
@@ -144,8 +148,7 @@ class TestJSONLParsing(unittest.TestCase):
"""file:// URLs should be parsed."""
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
- result = parse_line('file:///path/to/file.txt')
- self.assertIsNotNone(result)
+ result = require(parse_line('file:///path/to/file.txt'))
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['url'], 'file:///path/to/file.txt')
@@ -501,9 +504,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Create crawl with multiple URLs (as newline-separated string)
urls = 'https://test-crawl-1.example.com\nhttps://test-crawl-2.example.com'
- crawl = Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id})
-
- self.assertIsNotNone(crawl)
+ crawl = require(Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id}))
self.assertIsNotNone(crawl.id)
self.assertEqual(crawl.urls, urls)
self.assertEqual(crawl.status, 'queued')
@@ -538,7 +539,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Step 1: Create crawl (simulating 'archivebox crawl')
urls = 'https://crawl-to-snap-1.example.com\nhttps://crawl-to-snap-2.example.com'
- crawl = Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id})
+ crawl = require(Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id}))
crawl_output = crawl.to_json()
# Step 2: Parse crawl output as snapshot input
@@ -590,7 +591,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Create snapshot
overrides = {'created_by_id': created_by_id}
- snapshot = Snapshot.from_json(records[0], overrides=overrides)
+ snapshot = require(Snapshot.from_json(records[0], overrides=overrides))
self.assertIsNotNone(snapshot.id)
self.assertEqual(snapshot.url, url)
@@ -618,7 +619,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Step 1: Create snapshot (simulating 'archivebox snapshot')
url = 'https://test-extract-1.example.com'
overrides = {'created_by_id': created_by_id}
- snapshot = Snapshot.from_json({'url': url}, overrides=overrides)
+ snapshot = require(Snapshot.from_json({'url': url}, overrides=overrides))
snapshot_output = snapshot.to_json()
# Step 2: Parse snapshot output as extract input
@@ -657,7 +658,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# === archivebox crawl https://example.com ===
url = 'https://test-pipeline-full.example.com'
- crawl = Crawl.from_json({'url': url}, overrides={'created_by_id': created_by_id})
+ crawl = require(Crawl.from_json({'url': url}, overrides={'created_by_id': created_by_id}))
crawl_jsonl = json.dumps(crawl.to_json())
# === | archivebox snapshot ===
@@ -728,12 +729,12 @@ class TestDepthWorkflows(unittest.TestCase):
# Create crawl with depth 0
url = 'https://depth0-test.example.com'
- crawl = Crawl.from_json({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id})
+ crawl = require(Crawl.from_json({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id}))
self.assertEqual(crawl.max_depth, 0)
# Create snapshot
- snapshot = Snapshot.from_json({'url': url}, overrides={'created_by_id': created_by_id})
+ snapshot = require(Snapshot.from_json({'url': url}, overrides={'created_by_id': created_by_id}))
self.assertEqual(snapshot.url, url)
def test_depth_metadata_in_crawl(self):
@@ -744,10 +745,10 @@ class TestDepthWorkflows(unittest.TestCase):
created_by_id = get_or_create_system_user_pk()
# Create crawl with depth
- crawl = Crawl.from_json(
+ crawl = require(Crawl.from_json(
{'url': 'https://depth-meta-test.example.com', 'max_depth': 2},
overrides={'created_by_id': created_by_id}
- )
+ ))
self.assertEqual(crawl.max_depth, 2)
diff --git a/archivebox/config/constants.py b/archivebox/config/constants.py
index 2a4ceb49..ffcaf775 100644
--- a/archivebox/config/constants.py
+++ b/archivebox/config/constants.py
@@ -17,7 +17,6 @@ import sys
from typing import Dict
from pathlib import Path
-from collections.abc import Mapping
from benedict import benedict
@@ -46,7 +45,7 @@ from .version import detect_installed_version
###################### Config ##########################
-class ConstantsDict(Mapping):
+class ConstantsDict:
PACKAGE_DIR: Path = PACKAGE_DIR
DATA_DIR: Path = DATA_DIR
ARCHIVE_DIR: Path = ARCHIVE_DIR
@@ -225,16 +224,9 @@ class ConstantsDict(Mapping):
def __benedict__(cls):
# when casting to benedict, only include uppercase keys that don't start with an underscore
return benedict({key: value for key, value in cls.__dict__.items() if key.isupper() and not key.startswith('_')})
-
- @classmethod
- def __len__(cls):
- return len(cls.__benedict__())
- @classmethod
- def __iter__(cls):
- return iter(cls.__benedict__())
-CONSTANTS = ConstantsDict()
+CONSTANTS = ConstantsDict
CONSTANTS_CONFIG = CONSTANTS.__benedict__()
# add all key: values to globals() for easier importing, e.g.:
diff --git a/archivebox/core/admin_tags.py b/archivebox/core/admin_tags.py
index 09c616db..87396ad8 100644
--- a/archivebox/core/admin_tags.py
+++ b/archivebox/core/admin_tags.py
@@ -1,16 +1,17 @@
__package__ = 'archivebox.core'
from django.contrib import admin
-from django.utils.html import format_html, mark_safe
+from django.utils.html import format_html
+from django.utils.safestring import mark_safe
from archivebox.misc.paginators import AccelleratedPaginator
from archivebox.base_models.admin import BaseModelAdmin
-from archivebox.core.models import Tag
+from archivebox.core.models import SnapshotTag, Tag
class TagInline(admin.TabularInline):
- model = Tag.snapshot_set.through # type: ignore
+ model = SnapshotTag
# fk_name = 'snapshot'
fields = ('id', 'tag')
extra = 1
@@ -173,4 +174,3 @@ class TagAdmin(BaseModelAdmin):
def register_admin(admin_site):
admin_site.register(Tag, TagAdmin)
-
diff --git a/archivebox/core/admin_users.py b/archivebox/core/admin_users.py
index 92c9c1cb..371317f3 100644
--- a/archivebox/core/admin_users.py
+++ b/archivebox/core/admin_users.py
@@ -2,8 +2,9 @@ __package__ = 'archivebox.core'
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
-from django.utils.html import format_html, mark_safe
from django.contrib.auth import get_user_model
+from django.utils.html import format_html
+from django.utils.safestring import mark_safe
class CustomUserAdmin(UserAdmin):
@@ -16,7 +17,7 @@ class CustomUserAdmin(UserAdmin):
add_fieldsets = UserAdmin.add_fieldsets
# Extend fieldsets for change form only (not user creation)
- fieldsets = [*UserAdmin.fieldsets, ('Data', {'fields': readonly_fields})]
+ fieldsets = [*(UserAdmin.fieldsets or ()), ('Data', {'fields': readonly_fields})]
@admin.display(description='Snapshots')
def snapshot_set(self, obj):
diff --git a/archivebox/core/forms.py b/archivebox/core/forms.py
index cc4f62b3..8589563a 100644
--- a/archivebox/core/forms.py
+++ b/archivebox/core/forms.py
@@ -22,12 +22,19 @@ def get_plugin_choices():
return [(name, name) for name in get_plugins()]
+def get_choice_field(form: forms.Form, name: str) -> forms.ChoiceField:
+ field = form.fields[name]
+ if not isinstance(field, forms.ChoiceField):
+ raise TypeError(f'{name} must be a ChoiceField')
+ return field
+
+
class AddLinkForm(forms.Form):
# Basic fields
url = forms.RegexField(
label="URLs (one per line)",
regex=URL_REGEX,
- min_length='6',
+ min_length=6,
strip=True,
widget=forms.Textarea,
required=True
@@ -162,22 +169,22 @@ class AddLinkForm(forms.Form):
extensions = {'twocaptcha', 'istilldontcareaboutcookies', 'ublock'}
# Populate plugin field choices
- self.fields['chrome_plugins'].choices = [
+ get_choice_field(self, 'chrome_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in chrome_dependent
]
- self.fields['archiving_plugins'].choices = [
+ get_choice_field(self, 'archiving_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in archiving
]
- self.fields['parsing_plugins'].choices = [
+ get_choice_field(self, 'parsing_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in parsing
]
- self.fields['search_plugins'].choices = [
+ get_choice_field(self, 'search_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in search
]
- self.fields['binary_plugins'].choices = [
+ get_choice_field(self, 'binary_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in binary
]
- self.fields['extension_plugins'].choices = [
+ get_choice_field(self, 'extension_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in extensions
]
@@ -185,13 +192,15 @@ class AddLinkForm(forms.Form):
self.fields['update'].initial = not ARCHIVING_CONFIG.ONLY_NEW
def clean(self):
- cleaned_data = super().clean()
+ cleaned_data = super().clean() or {}
# Combine all plugin groups into single list
all_selected_plugins = []
for field in ['chrome_plugins', 'archiving_plugins', 'parsing_plugins',
'search_plugins', 'binary_plugins', 'extension_plugins']:
- all_selected_plugins.extend(cleaned_data.get(field, []))
+ selected = cleaned_data.get(field)
+ if isinstance(selected, list):
+ all_selected_plugins.extend(selected)
# Store combined list for easy access
cleaned_data['plugins'] = all_selected_plugins
@@ -211,17 +220,13 @@ class AddLinkForm(forms.Form):
return schedule
-class TagWidgetMixin:
+class TagWidget(forms.TextInput):
def format_value(self, value):
if value is not None and not isinstance(value, str):
value = edit_string_for_tags(value)
return super().format_value(value)
-class TagWidget(TagWidgetMixin, forms.TextInput):
- pass
-
-
class TagField(forms.CharField):
widget = TagWidget
@@ -234,21 +239,21 @@ class TagField(forms.CharField):
"Please provide a comma-separated list of tags."
)
- def has_changed(self, initial_value, data_value):
+ def has_changed(self, initial, data):
# Always return False if the field is disabled since self.bound_data
# always uses the initial value in this case.
if self.disabled:
return False
try:
- data_value = self.clean(data_value)
+ cleaned_data = self.clean(data)
except forms.ValidationError:
- pass
+ cleaned_data = data
- if initial_value is None:
- initial_value = []
+ initial_value = [] if initial is None else initial
- initial_value = [tag.name for tag in initial_value]
- initial_value.sort()
+ if not isinstance(initial_value, list):
+ initial_value = list(initial_value)
- return initial_value != data_value
+ normalized_initial = sorted(tag.name for tag in initial_value)
+ return normalized_initial != cleaned_data
diff --git a/archivebox/core/management/commands/archivebox.py b/archivebox/core/management/commands/archivebox.py
index a68b5d94..582ef344 100644
--- a/archivebox/core/management/commands/archivebox.py
+++ b/archivebox/core/management/commands/archivebox.py
@@ -2,8 +2,7 @@ __package__ = 'archivebox'
from django.core.management.base import BaseCommand
-
-from .cli import run_subcommand
+from archivebox.cli import main as run_cli
class Command(BaseCommand):
@@ -15,4 +14,5 @@ class Command(BaseCommand):
def handle(self, *args, **kwargs):
- run_subcommand(kwargs['subcommand'], args=kwargs['command_args'])
+ command_args = [kwargs['subcommand'], *kwargs['command_args']]
+ run_cli(args=command_args)
diff --git a/archivebox/core/middleware.py b/archivebox/core/middleware.py
index 7594eb8d..62accedc 100644
--- a/archivebox/core/middleware.py
+++ b/archivebox/core/middleware.py
@@ -165,6 +165,8 @@ class ReverseProxyAuthMiddleware(RemoteUserMiddleware):
return
ip = request.META.get('REMOTE_ADDR')
+ if not isinstance(ip, str):
+ return
for cidr in SERVER_CONFIG.REVERSE_PROXY_WHITELIST.split(','):
try:
diff --git a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py
index d53670c8..93cca140 100644
--- a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py
+++ b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py
@@ -3,10 +3,11 @@
import archivebox.base_models.models
import django.db.models.deletion
import django.utils.timezone
-import uuid
from django.conf import settings
from django.db import migrations, models, connection
+from archivebox.uuid_compat import uuid7
+
def copy_old_fields_to_new(apps, schema_editor):
"""Copy data from old field names to new field names after AddField operations."""
@@ -236,7 +237,7 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='archiveresult',
name='uuid',
- field=models.UUIDField(blank=True, db_index=True, default=uuid.uuid7, null=True),
+ field=models.UUIDField(blank=True, db_index=True, default=uuid7, null=True),
),
migrations.AlterField(
model_name='snapshot',
@@ -246,7 +247,7 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='snapshot',
name='id',
- field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
+ field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
migrations.AlterField(
model_name='snapshot',
diff --git a/archivebox/core/migrations/0030_alter_archiveresult_id.py b/archivebox/core/migrations/0030_alter_archiveresult_id.py
index 0c5e54b0..80ce097c 100644
--- a/archivebox/core/migrations/0030_alter_archiveresult_id.py
+++ b/archivebox/core/migrations/0030_alter_archiveresult_id.py
@@ -1,8 +1,9 @@
# Generated by Django 6.0 on 2026-01-02 10:02
-import uuid
from django.db import migrations, models
+from archivebox.uuid_compat import uuid7
+
class Migration(migrations.Migration):
@@ -14,6 +15,6 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='archiveresult',
name='id',
- field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
+ field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
]
diff --git a/archivebox/machine/migrations/0006_process.py b/archivebox/machine/migrations/0006_process.py
index 6a2139f0..b989d482 100644
--- a/archivebox/machine/migrations/0006_process.py
+++ b/archivebox/machine/migrations/0006_process.py
@@ -2,9 +2,10 @@
import django.db.models.deletion
import django.utils.timezone
-import uuid
from django.db import migrations, models
+from archivebox.uuid_compat import uuid7
+
class Migration(migrations.Migration):
@@ -16,7 +17,7 @@ class Migration(migrations.Migration):
migrations.CreateModel(
name='Process',
fields=[
- ('id', models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True)),
+ ('id', models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True)),
('created_at', models.DateTimeField(db_index=True, default=django.utils.timezone.now)),
('modified_at', models.DateTimeField(auto_now=True)),
('pwd', models.CharField(blank=True, default='', help_text='Working directory for process execution', max_length=512)),
diff --git a/archivebox/machine/tests/test_machine_models.py b/archivebox/machine/tests/test_machine_models.py
index 983770d4..e0638285 100644
--- a/archivebox/machine/tests/test_machine_models.py
+++ b/archivebox/machine/tests/test_machine_models.py
@@ -84,6 +84,7 @@ class TestMachineModel(TestCase):
result = Machine.from_json(record)
self.assertIsNotNone(result)
+ assert result is not None
self.assertEqual(result.config.get('WGET_BINARY'), '/usr/bin/wget')
def test_machine_from_jsonl_invalid(self):
@@ -179,6 +180,7 @@ class TestBinaryModel(TestCase):
result = Binary.objects.get_valid_binary('wget')
self.assertIsNotNone(result)
+ assert result is not None
self.assertEqual(result.abspath, '/usr/bin/wget')
def test_binary_update_and_requeue(self):
@@ -209,6 +211,8 @@ class TestBinaryModel(TestCase):
'overrides': overrides,
})
+ self.assertIsNotNone(binary)
+ assert binary is not None
self.assertEqual(binary.overrides, overrides)
def test_binary_from_json_does_not_coerce_legacy_override_shapes(self):
@@ -224,6 +228,8 @@ class TestBinaryModel(TestCase):
'overrides': overrides,
})
+ self.assertIsNotNone(binary)
+ assert binary is not None
self.assertEqual(binary.overrides, overrides)
def test_binary_from_json_prefers_published_readability_package(self):
@@ -238,6 +244,8 @@ class TestBinaryModel(TestCase):
},
})
+ self.assertIsNotNone(binary)
+ assert binary is not None
self.assertEqual(
binary.overrides,
{
@@ -265,7 +273,7 @@ class TestBinaryStateMachine(TestCase):
def test_binary_state_machine_initial_state(self):
"""BinaryMachine should start in queued state."""
sm = BinaryMachine(self.binary)
- self.assertEqual(sm.current_state.value, Binary.StatusChoices.QUEUED)
+ self.assertEqual(sm.current_state_value, Binary.StatusChoices.QUEUED)
def test_binary_state_machine_can_start(self):
"""BinaryMachine.can_start() should check name and binproviders."""
@@ -604,7 +612,7 @@ class TestProcessStateMachine(TestCase):
def test_process_state_machine_initial_state(self):
"""ProcessMachine should start in queued state."""
sm = ProcessMachine(self.process)
- self.assertEqual(sm.current_state.value, Process.StatusChoices.QUEUED)
+ self.assertEqual(sm.current_state_value, Process.StatusChoices.QUEUED)
def test_process_state_machine_can_start(self):
"""ProcessMachine.can_start() should check cmd and machine."""
diff --git a/archivebox/mcp/server.py b/archivebox/mcp/server.py
index 025c3eee..19e31b7e 100644
--- a/archivebox/mcp/server.py
+++ b/archivebox/mcp/server.py
@@ -8,7 +8,7 @@ Click command metadata. Handles JSON-RPC 2.0 requests over stdio transport.
import sys
import json
import traceback
-from typing import Optional
+from typing import Any, Optional
import click
from click.testing import CliRunner
@@ -19,25 +19,25 @@ from archivebox.config.version import VERSION
class MCPJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles Click sentinel values and other special types"""
- def default(self, obj):
+ def default(self, o):
# Handle Click's sentinel values
- if hasattr(click, 'core') and hasattr(click.core, '_SentinelClass'):
- if isinstance(obj, click.core._SentinelClass):
+ sentinel_type = getattr(click.core, '_SentinelClass', None)
+ if isinstance(sentinel_type, type) and isinstance(o, sentinel_type):
return None
# Handle tuples (convert to lists)
- if isinstance(obj, tuple):
- return list(obj)
+ if isinstance(o, tuple):
+ return list(o)
# Handle any other non-serializable objects
try:
- return super().default(obj)
+ return super().default(o)
except TypeError:
- return str(obj)
+ return str(o)
# Type mapping from Click types to JSON Schema types
-def click_type_to_json_schema_type(click_type) -> dict:
+def click_type_to_json_schema_type(click_type: click.ParamType) -> dict[str, Any]:
"""Convert a Click parameter type to JSON Schema type definition"""
if isinstance(click_type, click.types.StringParamType):
@@ -49,7 +49,7 @@ def click_type_to_json_schema_type(click_type) -> dict:
elif isinstance(click_type, click.types.BoolParamType):
return {"type": "boolean"}
elif isinstance(click_type, click.types.Choice):
- return {"type": "string", "enum": click_type.choices}
+ return {"type": "string", "enum": list(click_type.choices)}
elif isinstance(click_type, click.types.Path):
return {"type": "string", "description": "File or directory path"}
elif isinstance(click_type, click.types.File):
@@ -62,7 +62,7 @@ def click_type_to_json_schema_type(click_type) -> dict:
return {"type": "string"}
-def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict:
+def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict[str, Any]:
"""
Convert a Click command to an MCP tool definition with JSON Schema.
@@ -70,20 +70,21 @@ def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> di
the input schema without manual definition.
"""
- properties = {}
- required = []
+ properties: dict[str, dict[str, Any]] = {}
+ required: list[str] = []
# Extract parameters from Click command
for param in click_command.params:
# Skip internal parameters
- if param.name in ('help', 'version'):
+ if param.name is None or param.name in ('help', 'version'):
continue
param_schema = click_type_to_json_schema_type(param.type)
# Add description from Click help text
- if param.help:
- param_schema["description"] = param.help
+ help_text = getattr(param, 'help', None)
+ if help_text:
+ param_schema["description"] = help_text
# Handle default values
if param.default is not None and param.default != ():
@@ -248,7 +249,7 @@ class MCPServer:
if cmd_name not in self._tool_cache:
if cmd_name not in self.cli_group.all_subcommands:
return None
- self._tool_cache[cmd_name] = self.cli_group.get_command(None, cmd_name)
+ self._tool_cache[cmd_name] = self.cli_group.get_command(click.Context(self.cli_group), cmd_name)
return self._tool_cache[cmd_name]
def handle_initialize(self, params: dict) -> dict:
diff --git a/archivebox/misc/db.py b/archivebox/misc/db.py
index 7f2c7247..c438df53 100644
--- a/archivebox/misc/db.py
+++ b/archivebox/misc/db.py
@@ -6,7 +6,7 @@ __package__ = 'archivebox.misc'
from io import StringIO
from pathlib import Path
-from typing import List, Tuple
+from typing import Any, List, Tuple
from archivebox.config import DATA_DIR
from archivebox.misc.util import enforce_types
@@ -48,8 +48,8 @@ def apply_migrations(out_dir: Path = DATA_DIR) -> List[str]:
@enforce_types
-def get_admins(out_dir: Path = DATA_DIR) -> List:
+def get_admins(out_dir: Path = DATA_DIR) -> List[Any]:
"""Get list of superuser accounts"""
from django.contrib.auth.models import User
- return User.objects.filter(is_superuser=True).exclude(username='system')
+ return list(User.objects.filter(is_superuser=True).exclude(username='system'))
diff --git a/archivebox/misc/logging_util.py b/archivebox/misc/logging_util.py
index c00071f6..885aec4d 100644
--- a/archivebox/misc/logging_util.py
+++ b/archivebox/misc/logging_util.py
@@ -14,7 +14,7 @@ from pathlib import Path
from datetime import datetime, timezone
from dataclasses import dataclass
-from typing import Any, Optional, List, Dict, Union, Iterable, IO, TYPE_CHECKING
+from typing import Any, Optional, List, Dict, Union, Iterable, IO, TYPE_CHECKING, cast
if TYPE_CHECKING:
from archivebox.core.models import Snapshot
@@ -397,7 +397,8 @@ def log_list_finished(snapshots):
from archivebox.core.models import Snapshot
print()
print('---------------------------------------------------------------------------------------------------')
- print(Snapshot.objects.filter(pk__in=[s.pk for s in snapshots]).to_csv(cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | '))
+ csv_queryset = cast(Any, Snapshot.objects.filter(pk__in=[s.pk for s in snapshots]))
+ print(csv_queryset.to_csv(cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | '))
print('---------------------------------------------------------------------------------------------------')
print()
diff --git a/archivebox/misc/monkey_patches.py b/archivebox/misc/monkey_patches.py
index 9ee755c4..2f4bb146 100644
--- a/archivebox/misc/monkey_patches.py
+++ b/archivebox/misc/monkey_patches.py
@@ -13,7 +13,7 @@ django_stubs_ext.monkeypatch()
# monkey patch django timezone to add back utc (it was removed in Django 5.0)
-timezone.utc = datetime.timezone.utc
+setattr(timezone, 'utc', datetime.timezone.utc)
# monkey patch django-signals-webhooks to change how it shows up in Admin UI
# from signal_webhooks.apps import DjangoSignalWebhooksConfig
diff --git a/archivebox/misc/paginators.py b/archivebox/misc/paginators.py
index 2e623a65..fa8c6cdb 100644
--- a/archivebox/misc/paginators.py
+++ b/archivebox/misc/paginators.py
@@ -13,12 +13,17 @@ class AccelleratedPaginator(Paginator):
@cached_property
def count(self):
- if self.object_list._has_filters(): # type: ignore
+ has_filters = getattr(self.object_list, '_has_filters', None)
+ if callable(has_filters) and has_filters():
# fallback to normal count method on filtered queryset
return super().count
- else:
- # otherwise count total rows in a separate fast query
- return self.object_list.model.objects.count()
+
+ model = getattr(self.object_list, 'model', None)
+ if model is None:
+ return super().count
+
+ # otherwise count total rows in a separate fast query
+ return model.objects.count()
# Alternative approach for PostgreSQL: fallback count takes > 200ms
# from django.db import connection, transaction, OperationalError
diff --git a/archivebox/misc/progress_layout.py b/archivebox/misc/progress_layout.py
index 1263856b..537db5b9 100644
--- a/archivebox/misc/progress_layout.py
+++ b/archivebox/misc/progress_layout.py
@@ -17,7 +17,7 @@ from collections import deque
from pathlib import Path
from rich import box
-from rich.console import Group
+from rich.console import Group, RenderableType
from rich.layout import Layout
from rich.columns import Columns
from rich.panel import Panel
@@ -48,7 +48,7 @@ class CrawlQueuePanel:
self.max_crawl_workers = 8
self.crawl_id: Optional[str] = None
- def __rich__(self) -> Panel:
+ def __rich__(self) -> RenderableType:
grid = Table.grid(expand=True)
grid.add_column(justify="left", ratio=1)
grid.add_column(justify="center", ratio=1)
@@ -104,7 +104,7 @@ class ProcessLogPanel:
self.compact = compact
self.bg_terminating = bg_terminating
- def __rich__(self) -> Panel:
+ def __rich__(self) -> RenderableType:
completed_line = self._completed_output_line()
if completed_line:
style = "green" if self._completed_ok() else "yellow"
diff --git a/archivebox/misc/serve_static.py b/archivebox/misc/serve_static.py
index 76bc74e8..19e2dadd 100644
--- a/archivebox/misc/serve_static.py
+++ b/archivebox/misc/serve_static.py
@@ -111,7 +111,7 @@ def _render_markdown_fallback(text: str) -> str:
return _markdown.markdown(
text,
extensions=["extra", "toc", "sane_lists"],
- output_format="html5",
+ output_format="html",
)
except Exception:
pass
diff --git a/archivebox/misc/system.py b/archivebox/misc/system.py
index 6804c210..36eac00d 100644
--- a/archivebox/misc/system.py
+++ b/archivebox/misc/system.py
@@ -9,13 +9,14 @@ import sys
from json import dump
from pathlib import Path
from typing import Optional, Union, Tuple
-from subprocess import _mswindows, PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired
+from subprocess import PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired
from atomicwrites import atomic_write as lib_atomic_write
from archivebox.config.common import STORAGE_CONFIG
from archivebox.misc.util import enforce_types, ExtendedEncoder
+IS_WINDOWS = os.name == 'nt'
def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False, text=False, start_new_session=True, **kwargs):
"""Patched of subprocess.run to kill forked child subprocesses and fix blocking io making timeout=innefective
@@ -47,13 +48,15 @@ def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False,
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired as exc:
process.kill()
- if _mswindows:
+ if IS_WINDOWS:
# Windows accumulates the output in a single blocking
# read() call run on child threads, with the timeout
# being done in a join() on those threads. communicate()
# _after_ kill() is required to collect that and add it
# to the exception.
- exc.stdout, exc.stderr = process.communicate()
+ timed_out_stdout, timed_out_stderr = process.communicate()
+ exc.stdout = timed_out_stdout.encode() if isinstance(timed_out_stdout, str) else timed_out_stdout
+ exc.stderr = timed_out_stderr.encode() if isinstance(timed_out_stderr, str) else timed_out_stderr
else:
# POSIX _communicate already populated the output so
# far into the TimeoutExpired exception.
@@ -71,11 +74,12 @@ def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False,
finally:
# force kill any straggler subprocesses that were forked from the main proc
try:
- os.killpg(pgid, signal.SIGINT)
+ if pgid is not None:
+ os.killpg(pgid, signal.SIGINT)
except Exception:
pass
- return CompletedProcess(process.args, retcode, stdout, stderr)
+ return CompletedProcess(process.args, retcode or 0, stdout, stderr)
@enforce_types
diff --git a/archivebox/misc/toml_util.py b/archivebox/misc/toml_util.py
index 9dd51d1b..0da1b298 100644
--- a/archivebox/misc/toml_util.py
+++ b/archivebox/misc/toml_util.py
@@ -42,7 +42,7 @@ def convert(ini_str: str) -> str:
"""Convert a string of INI config into its TOML equivalent (warning: strips comments)"""
config = configparser.ConfigParser()
- config.optionxform = str # capitalize key names
+ setattr(config, 'optionxform', str) # capitalize key names
config.read_string(ini_str)
# Initialize an empty dictionary to store the TOML representation
@@ -77,12 +77,12 @@ class JSONSchemaWithLambdas(GenerateJsonSchema):
Usage:
>>> json.dumps(value, encoder=JSONSchemaWithLambdas())
"""
- def encode_default(self, default: Any) -> Any:
+ def encode_default(self, dft: Any) -> Any:
config = self._config
- if isinstance(default, Callable):
- return '{{lambda ' + inspect.getsource(default).split('=lambda ')[-1].strip()[:-1] + '}}'
+ if isinstance(dft, Callable):
+ return '{{lambda ' + inspect.getsource(dft).split('=lambda ')[-1].strip()[:-1] + '}}'
return to_jsonable_python(
- default,
+ dft,
timedelta_mode=config.ser_json_timedelta,
bytes_mode=config.ser_json_bytes,
serialize_unknown=True
diff --git a/archivebox/misc/util.py b/archivebox/misc/util.py
index c69c8c86..61b898b7 100644
--- a/archivebox/misc/util.py
+++ b/archivebox/misc/util.py
@@ -56,9 +56,19 @@ urldecode = lambda s: s and unquote(s)
htmlencode = lambda s: s and escape(s, quote=True)
htmldecode = lambda s: s and unescape(s)
-short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
-ts_to_date_str = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
-ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
+def short_ts(ts: Any) -> str | None:
+ parsed = parse_date(ts)
+ return None if parsed is None else str(parsed.timestamp()).split('.')[0]
+
+
+def ts_to_date_str(ts: Any) -> str | None:
+ parsed = parse_date(ts)
+ return None if parsed is None else parsed.strftime('%Y-%m-%d %H:%M')
+
+
+def ts_to_iso(ts: Any) -> str | None:
+ parsed = parse_date(ts)
+ return None if parsed is None else parsed.isoformat()
COLOR_REGEX = re.compile(r'\[(?P\d+)(;(?P\d+)(;(?P\d+))?)?m')
@@ -175,7 +185,7 @@ def docstring(text: Optional[str]):
@enforce_types
-def str_between(string: str, start: str, end: str=None) -> str:
+def str_between(string: str, start: str, end: str | None = None) -> str:
"""(12345, , ) -> 12345"""
content = string.split(start, 1)[-1]
@@ -186,7 +196,7 @@ def str_between(string: str, start: str, end: str=None) -> str:
@enforce_types
-def parse_date(date: Any) -> datetime:
+def parse_date(date: Any) -> datetime | None:
"""Parse unix timestamps, iso format, and human-readable strings"""
if date is None:
@@ -196,20 +206,24 @@ def parse_date(date: Any) -> datetime:
if date.tzinfo is None:
return date.replace(tzinfo=timezone.utc)
- assert date.tzinfo.utcoffset(datetime.now()).seconds == 0, 'Refusing to load a non-UTC date!'
+ offset = date.utcoffset()
+ assert offset == datetime.now(timezone.utc).utcoffset(), 'Refusing to load a non-UTC date!'
return date
if isinstance(date, (float, int)):
date = str(date)
if isinstance(date, str):
- return dateparser(date, settings={'TIMEZONE': 'UTC'}).astimezone(timezone.utc)
+ parsed_date = dateparser(date, settings={'TIMEZONE': 'UTC'})
+ if parsed_date is None:
+ raise ValueError(f'Tried to parse invalid date string! {date}')
+ return parsed_date.astimezone(timezone.utc)
raise ValueError('Tried to parse invalid date! {}'.format(date))
@enforce_types
-def download_url(url: str, timeout: int=None) -> str:
+def download_url(url: str, timeout: int | None = None) -> str:
"""Download the contents of a remote url and return the text"""
from archivebox.config.common import ARCHIVING_CONFIG
@@ -221,7 +235,8 @@ def download_url(url: str, timeout: int=None) -> str:
cookie_jar = http.cookiejar.MozillaCookieJar(ARCHIVING_CONFIG.COOKIES_FILE)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
for cookie in cookie_jar:
- session.cookies.set(cookie.name, cookie.value, domain=cookie.domain, path=cookie.path)
+ if cookie.value is not None:
+ session.cookies.set(cookie.name, cookie.value, domain=cookie.domain, path=cookie.path)
response = session.get(
url,
@@ -331,47 +346,47 @@ class ExtendedEncoder(pyjson.JSONEncoder):
fields and objects
"""
- def default(self, obj):
- cls_name = obj.__class__.__name__
+ def default(self, o):
+ cls_name = o.__class__.__name__
- if hasattr(obj, '_asdict'):
- return obj._asdict()
+ if hasattr(o, '_asdict'):
+ return o._asdict()
- elif isinstance(obj, bytes):
- return obj.decode()
+ elif isinstance(o, bytes):
+ return o.decode()
- elif isinstance(obj, datetime):
- return obj.isoformat()
+ elif isinstance(o, datetime):
+ return o.isoformat()
- elif isinstance(obj, Exception):
- return '{}: {}'.format(obj.__class__.__name__, obj)
+ elif isinstance(o, Exception):
+ return '{}: {}'.format(o.__class__.__name__, o)
- elif isinstance(obj, Path):
- return str(obj)
+ elif isinstance(o, Path):
+ return str(o)
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
- return list(obj)
+ return list(o)
- elif isinstance(obj, Callable):
- return str(obj)
+ elif isinstance(o, Callable):
+ return str(o)
# Try dict/list conversion as fallback
try:
- return dict(obj)
+ return dict(o)
except Exception:
pass
try:
- return list(obj)
+ return list(o)
except Exception:
pass
try:
- return str(obj)
+ return str(o)
except Exception:
pass
- return pyjson.JSONEncoder.default(self, obj)
+ return pyjson.JSONEncoder.default(self, o)
@enforce_types
diff --git a/archivebox/mypy.ini b/archivebox/mypy.ini
deleted file mode 100644
index b1b4489a..00000000
--- a/archivebox/mypy.ini
+++ /dev/null
@@ -1,3 +0,0 @@
-[mypy]
-plugins =
- mypy_django_plugin.main
diff --git a/archivebox/personas/migrations/0002_alter_persona_id.py b/archivebox/personas/migrations/0002_alter_persona_id.py
index e8e5af2a..5b5aef6c 100644
--- a/archivebox/personas/migrations/0002_alter_persona_id.py
+++ b/archivebox/personas/migrations/0002_alter_persona_id.py
@@ -1,8 +1,9 @@
# Generated by Django 6.0 on 2026-01-05 01:09
-import uuid
from django.db import migrations, models
+from archivebox.uuid_compat import uuid7
+
class Migration(migrations.Migration):
@@ -14,6 +15,6 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='persona',
name='id',
- field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
+ field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
]
diff --git a/archivebox/tests/conftest.py b/archivebox/tests/conftest.py
index 28f58062..b8d37bd4 100644
--- a/archivebox/tests/conftest.py
+++ b/archivebox/tests/conftest.py
@@ -400,13 +400,13 @@ def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str])
# Test Data Factories
# =============================================================================
-def create_test_url(domain: str = 'example.com', path: str = None) -> str:
+def create_test_url(domain: str = 'example.com', path: str | None = None) -> str:
"""Generate unique test URL."""
path = path or uuid7().hex[:8]
return f'https://{domain}/{path}'
-def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
+def create_test_crawl_json(urls: List[str] | None = None, **kwargs) -> Dict[str, Any]:
"""Create Crawl JSONL record for testing."""
urls = urls or [create_test_url()]
return {
@@ -419,7 +419,7 @@ def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
}
-def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
+def create_test_snapshot_json(url: str | None = None, **kwargs) -> Dict[str, Any]:
"""Create Snapshot JSONL record for testing."""
return {
'type': 'Snapshot',
diff --git a/archivebox/tests/migrations_helpers.py b/archivebox/tests/migrations_helpers.py
index 5c620186..0c533f67 100644
--- a/archivebox/tests/migrations_helpers.py
+++ b/archivebox/tests/migrations_helpers.py
@@ -967,7 +967,7 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]:
# Helper Functions
# =============================================================================
-def run_archivebox(data_dir: Path, args: list, timeout: int = 60, env: dict = None) -> subprocess.CompletedProcess:
+def run_archivebox(data_dir: Path, args: list, timeout: int = 60, env: dict | None = None) -> subprocess.CompletedProcess:
"""Run archivebox command in subprocess with given data directory."""
base_env = os.environ.copy()
base_env['DATA_DIR'] = str(data_dir)
diff --git a/archivebox/tests/test_add.py b/archivebox/tests/test_add.py
deleted file mode 100644
index 39d423e3..00000000
--- a/archivebox/tests/test_add.py
+++ /dev/null
@@ -1,166 +0,0 @@
-import os
-import sqlite3
-import subprocess
-
-def test_depth_flag_is_accepted(process, disable_extractors_dict):
- arg_process = subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
- assert 'unrecognized arguments: --depth' not in arg_process.stderr.decode("utf-8")
-
-
-def test_depth_flag_fails_if_it_is_not_0_or_1(process, disable_extractors_dict):
- arg_process = subprocess.run(
- ["archivebox", "add", "--index-only", "--depth=5", "https://example.com"],
- capture_output=True,
- env=disable_extractors_dict,
- )
- # Error message may say "invalid choice" or "is not one of"
- stderr = arg_process.stderr.decode("utf-8")
- assert 'invalid' in stderr.lower() or 'not one of' in stderr.lower()
- arg_process = subprocess.run(
- ["archivebox", "add", "--index-only", "--depth=-1", "https://example.com"],
- capture_output=True,
- env=disable_extractors_dict,
- )
- stderr = arg_process.stderr.decode("utf-8")
- assert 'invalid' in stderr.lower() or 'not one of' in stderr.lower()
-
-
-def test_depth_flag_0_creates_source_file(tmp_path, process, disable_extractors_dict):
- os.chdir(tmp_path)
- subprocess.run(
- ["archivebox", "add", "--index-only", "--depth=0", "https://example.com"],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- # Check that source file was created with the URL
- sources_dir = tmp_path / "sources"
- assert sources_dir.exists()
- source_files = list(sources_dir.glob("*cli_add.txt"))
- assert len(source_files) >= 1
- source_content = source_files[0].read_text()
- assert "example.com" in source_content
-
-
-def test_overwrite_flag_is_accepted(process, disable_extractors_dict):
- subprocess.run(
- ["archivebox", "add", "--index-only", "--depth=0", "https://example.com"],
- capture_output=True,
- env=disable_extractors_dict,
- )
- arg_process = subprocess.run(
- ["archivebox", "add", "--index-only", "--overwrite", "https://example.com"],
- capture_output=True,
- env=disable_extractors_dict,
- )
- assert 'unrecognized arguments: --overwrite' not in arg_process.stderr.decode("utf-8")
-
-def test_add_creates_crawl_in_database(tmp_path, process, disable_extractors_dict):
- os.chdir(tmp_path)
- subprocess.run(
- ["archivebox", "add", "--index-only", "--depth=0", "https://example.com"],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- # Check that a Crawl was created in database
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- count = c.execute("SELECT COUNT(*) FROM crawls_crawl").fetchone()[0]
- conn.close()
-
- assert count >= 1
-
-
-def test_add_with_tags(tmp_path, process, disable_extractors_dict):
- """Test adding URL with tags."""
- os.chdir(tmp_path)
- subprocess.run(
- ["archivebox", "add", "--index-only", "--depth=0", "--tag=test,example", "https://example.com"],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- # Check that tags were created in database
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- tags = c.execute("SELECT name FROM core_tag").fetchall()
- conn.close()
-
- tag_names = [t[0] for t in tags]
- assert 'test' in tag_names or 'example' in tag_names
-
-
-def test_add_multiple_urls_single_call(tmp_path, process, disable_extractors_dict):
- """Test adding multiple URLs in a single call creates multiple snapshots."""
- os.chdir(tmp_path)
- subprocess.run(
- ["archivebox", "add", "--index-only", "--depth=0",
- "https://example.com", "https://example.org"],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- # Check both URLs are in the source file
- sources_dir = tmp_path / "sources"
- source_files = list(sources_dir.glob("*cli_add.txt"))
- assert len(source_files) >= 1
- source_content = source_files[0].read_text()
- assert "example.com" in source_content
- assert "example.org" in source_content
-
-
-def test_add_from_file(tmp_path, process, disable_extractors_dict):
- """Test adding URLs from a file."""
- os.chdir(tmp_path)
-
- # Create a file with URLs
- urls_file = tmp_path / "urls.txt"
- urls_file.write_text("https://example.com\nhttps://example.org\n")
-
- subprocess.run(
- ["archivebox", "add", "--index-only", "--depth=0", str(urls_file)],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- # Check that a Crawl was created
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- count = c.execute("SELECT COUNT(*) FROM crawls_crawl").fetchone()[0]
- conn.close()
-
- assert count >= 1
-
-
-class TestAddCLI:
- """Test the CLI interface for add command."""
-
- def test_add_help(self, tmp_path, process):
- """Test that --help works for add command."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ["archivebox", "add", "--help"],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- assert '--depth' in result.stdout or 'depth' in result.stdout
- assert '--tag' in result.stdout or 'tag' in result.stdout
-
- def test_add_no_args_shows_help(self, tmp_path, process):
- """Test that add with no args shows help or usage."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ["archivebox", "add"],
- capture_output=True,
- text=True,
- )
-
- # Should either show help or error about missing URL
- combined = result.stdout + result.stderr
- assert 'usage' in combined.lower() or 'url' in combined.lower() or 'add' in combined.lower()
diff --git a/archivebox/tests/test_admin_views.py b/archivebox/tests/test_admin_views.py
index c1bfb3bd..486b714a 100644
--- a/archivebox/tests/test_admin_views.py
+++ b/archivebox/tests/test_admin_views.py
@@ -9,9 +9,11 @@ Tests cover:
"""
import pytest
+from typing import cast
from django.test import override_settings
from django.urls import reverse
from django.contrib.auth import get_user_model
+from django.contrib.auth.models import UserManager
pytestmark = pytest.mark.django_db
@@ -24,7 +26,7 @@ PUBLIC_HOST = 'public.archivebox.localhost:8000'
@pytest.fixture
def admin_user(db):
"""Create admin user for tests."""
- return User.objects.create_superuser(
+ return cast(UserManager, User.objects).create_superuser(
username='testadmin',
email='admin@test.com',
password='testpassword'
diff --git a/archivebox/tests/test_cli_add.py b/archivebox/tests/test_cli_add.py
index 11abca82..fbd6894f 100644
--- a/archivebox/tests/test_cli_add.py
+++ b/archivebox/tests/test_cli_add.py
@@ -7,6 +7,21 @@ Verify add creates snapshots in DB, crawls, source files, and archive directorie
import os
import sqlite3
import subprocess
+from pathlib import Path
+
+
+def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
+ candidates = {snapshot_id}
+ if len(snapshot_id) == 32:
+ candidates.add(f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}")
+ elif len(snapshot_id) == 36 and '-' in snapshot_id:
+ candidates.add(snapshot_id.replace('-', ''))
+
+ for needle in candidates:
+ for path in data_dir.rglob(needle):
+ if path.is_dir():
+ return path
+ return None
def test_add_single_url_creates_snapshot_in_db(tmp_path, process, disable_extractors_dict):
@@ -144,6 +159,21 @@ def test_add_with_depth_1_flag(tmp_path, process, disable_extractors_dict):
assert 'unrecognized arguments: --depth' not in result.stderr.decode('utf-8')
+def test_add_rejects_invalid_depth_values(tmp_path, process, disable_extractors_dict):
+ """Test that add rejects depth values outside the supported range."""
+ os.chdir(tmp_path)
+
+ for depth in ('5', '-1'):
+ result = subprocess.run(
+ ['archivebox', 'add', '--index-only', f'--depth={depth}', 'https://example.com'],
+ capture_output=True,
+ env=disable_extractors_dict,
+ )
+ stderr = result.stderr.decode('utf-8').lower()
+ assert result.returncode != 0
+ assert 'invalid' in stderr or 'not one of' in stderr
+
+
def test_add_with_tags(tmp_path, process, disable_extractors_dict):
"""Test adding URL with tags stores tags_str in crawl.
@@ -245,11 +275,8 @@ def test_add_with_overwrite_flag(tmp_path, process, disable_extractors_dict):
assert 'unrecognized arguments: --overwrite' not in result.stderr.decode('utf-8')
-def test_add_creates_archive_subdirectory(tmp_path, process, disable_extractors_dict):
- """Test that add creates archive subdirectory for the snapshot.
-
- Archive subdirectories are named by timestamp, not by snapshot ID.
- """
+def test_add_creates_snapshot_output_directory(tmp_path, process, disable_extractors_dict):
+ """Test that add creates the current snapshot output directory on disk."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
@@ -257,16 +284,44 @@ def test_add_creates_archive_subdirectory(tmp_path, process, disable_extractors_
env=disable_extractors_dict,
)
- # Get the snapshot timestamp from the database
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
- timestamp = c.execute("SELECT timestamp FROM core_snapshot").fetchone()[0]
+ snapshot_id = str(c.execute("SELECT id FROM core_snapshot").fetchone()[0])
conn.close()
- # Check that archive subdirectory was created using timestamp
- archive_dir = tmp_path / "archive" / str(timestamp)
- assert archive_dir.exists()
- assert archive_dir.is_dir()
+ snapshot_dir = _find_snapshot_dir(tmp_path, snapshot_id)
+ assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id}"
+ assert snapshot_dir.is_dir()
+
+
+def test_add_help_shows_depth_and_tag_options(tmp_path, process):
+ """Test that add --help documents the main filter and crawl options."""
+ os.chdir(tmp_path)
+
+ result = subprocess.run(
+ ['archivebox', 'add', '--help'],
+ capture_output=True,
+ text=True,
+ )
+
+ assert result.returncode == 0
+ assert '--depth' in result.stdout
+ assert '--tag' in result.stdout
+
+
+def test_add_without_args_shows_usage(tmp_path, process):
+ """Test that add without URLs fails with a usage hint instead of crashing."""
+ os.chdir(tmp_path)
+
+ result = subprocess.run(
+ ['archivebox', 'add'],
+ capture_output=True,
+ text=True,
+ )
+
+ combined = result.stdout + result.stderr
+ assert result.returncode != 0
+ assert 'usage' in combined.lower() or 'url' in combined.lower()
def test_add_index_only_skips_extraction(tmp_path, process, disable_extractors_dict):
diff --git a/archivebox/tests/test_cli_init.py b/archivebox/tests/test_cli_init.py
index e6ce1ef6..fa6fe157 100644
--- a/archivebox/tests/test_cli_init.py
+++ b/archivebox/tests/test_cli_init.py
@@ -241,3 +241,24 @@ def test_init_output_shows_collection_info(tmp_path):
output = result.stdout
# Should show some helpful info about the collection
assert 'ArchiveBox' in output or 'collection' in output.lower() or 'Initializing' in output
+
+
+def test_init_ignores_unrecognized_archive_directories(tmp_path, process, disable_extractors_dict):
+ """Test that init upgrades existing dirs without choking on extra folders."""
+ os.chdir(tmp_path)
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+ (tmp_path / "archive" / "some_random_folder").mkdir(parents=True, exist_ok=True)
+
+ result = subprocess.run(
+ ['archivebox', 'init'],
+ capture_output=True,
+ text=True,
+ env=disable_extractors_dict,
+ )
+
+ assert result.returncode == 0, result.stdout + result.stderr
diff --git a/archivebox/tests/test_cli_install.py b/archivebox/tests/test_cli_install.py
index c7738468..1d0f499e 100644
--- a/archivebox/tests/test_cli_install.py
+++ b/archivebox/tests/test_cli_install.py
@@ -93,6 +93,59 @@ def test_install_shows_binary_status(tmp_path, process):
assert len(output) > 50
+def test_install_dry_run_prints_dry_run_message(tmp_path, process):
+ """Test that install --dry-run clearly reports that no changes will be made."""
+ os.chdir(tmp_path)
+ result = subprocess.run(
+ ['archivebox', 'install', '--dry-run'],
+ capture_output=True,
+ text=True,
+ timeout=60,
+ )
+
+ assert result.returncode == 0
+ assert 'dry run' in result.stdout.lower()
+
+
+def test_install_help_lists_dry_run_flag(tmp_path):
+ """Test that install --help documents the dry-run option."""
+ os.chdir(tmp_path)
+ result = subprocess.run(
+ ['archivebox', 'install', '--help'],
+ capture_output=True,
+ text=True,
+ )
+
+ assert result.returncode == 0
+ assert '--dry-run' in result.stdout or '-d' in result.stdout
+
+
+def test_install_invalid_option_fails(tmp_path):
+ """Test that invalid install options fail cleanly."""
+ os.chdir(tmp_path)
+ result = subprocess.run(
+ ['archivebox', 'install', '--invalid-option'],
+ capture_output=True,
+ text=True,
+ )
+
+ assert result.returncode != 0
+
+
+def test_install_from_empty_dir_initializes_collection(tmp_path):
+ """Test that install bootstraps an empty dir before performing work."""
+ os.chdir(tmp_path)
+ result = subprocess.run(
+ ['archivebox', 'install', '--dry-run'],
+ capture_output=True,
+ text=True,
+ )
+
+ output = result.stdout + result.stderr
+ assert result.returncode == 0
+ assert 'Initializing' in output or 'Dry run' in output or 'init' in output.lower()
+
+
def test_install_updates_binary_table(tmp_path, process):
"""Test that install completes and only mutates dependency state."""
os.chdir(tmp_path)
diff --git a/archivebox/tests/test_cli_list.py b/archivebox/tests/test_cli_list.py
new file mode 100644
index 00000000..cff62bc9
--- /dev/null
+++ b/archivebox/tests/test_cli_list.py
@@ -0,0 +1,146 @@
+#!/usr/bin/env python3
+"""
+Tests for archivebox list command.
+Verify list emits snapshot JSONL and applies the documented filters.
+"""
+
+import json
+import os
+import sqlite3
+import subprocess
+
+
+def _parse_jsonl(stdout: str) -> list[dict]:
+ return [
+ json.loads(line)
+ for line in stdout.splitlines()
+ if line.strip().startswith('{')
+ ]
+
+
+def test_list_outputs_existing_snapshots_as_jsonl(tmp_path, process, disable_extractors_dict):
+ """Test that list prints one JSON object per stored snapshot."""
+ os.chdir(tmp_path)
+ for url in ['https://example.com', 'https://iana.org']:
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', url],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ['archivebox', 'list'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ rows = _parse_jsonl(result.stdout)
+ urls = {row['url'] for row in rows}
+
+ assert result.returncode == 0, result.stderr
+ assert 'https://example.com' in urls
+ assert 'https://iana.org' in urls
+
+
+def test_list_filters_by_url_icontains(tmp_path, process, disable_extractors_dict):
+ """Test that list --url__icontains returns only matching snapshots."""
+ os.chdir(tmp_path)
+ for url in ['https://example.com', 'https://iana.org']:
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', url],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ['archivebox', 'list', '--url__icontains', 'example.com'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ rows = _parse_jsonl(result.stdout)
+ assert result.returncode == 0, result.stderr
+ assert len(rows) == 1
+ assert rows[0]['url'] == 'https://example.com'
+
+
+def test_list_filters_by_crawl_id_and_limit(tmp_path, process, disable_extractors_dict):
+ """Test that crawl-id and limit filters constrain the result set."""
+ os.chdir(tmp_path)
+ for url in ['https://example.com', 'https://iana.org']:
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', url],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ conn = sqlite3.connect("index.sqlite3")
+ c = conn.cursor()
+ crawl_id = str(c.execute(
+ "SELECT crawl_id FROM core_snapshot WHERE url = ?",
+ ('https://example.com',),
+ ).fetchone()[0])
+ conn.close()
+
+ result = subprocess.run(
+ ['archivebox', 'list', '--crawl-id', crawl_id, '--limit', '1'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ rows = _parse_jsonl(result.stdout)
+ assert result.returncode == 0, result.stderr
+ assert len(rows) == 1
+ assert rows[0]['crawl_id'].replace('-', '') == crawl_id.replace('-', '')
+ assert rows[0]['url'] == 'https://example.com'
+
+
+def test_list_filters_by_status(tmp_path, process, disable_extractors_dict):
+ """Test that list can filter using the current snapshot status."""
+ os.chdir(tmp_path)
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ conn = sqlite3.connect("index.sqlite3")
+ c = conn.cursor()
+ status = c.execute("SELECT status FROM core_snapshot LIMIT 1").fetchone()[0]
+ conn.close()
+
+ result = subprocess.run(
+ ['archivebox', 'list', '--status', status],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ rows = _parse_jsonl(result.stdout)
+ assert result.returncode == 0, result.stderr
+ assert len(rows) == 1
+ assert rows[0]['status'] == status
+
+
+def test_list_help_lists_filter_options(tmp_path, process):
+ """Test that list --help documents the supported filter flags."""
+ os.chdir(tmp_path)
+
+ result = subprocess.run(
+ ['archivebox', 'list', '--help'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ assert result.returncode == 0
+ assert '--url__icontains' in result.stdout
+ assert '--crawl-id' in result.stdout
+ assert '--limit' in result.stdout
diff --git a/archivebox/tests/test_cli_remove.py b/archivebox/tests/test_cli_remove.py
index 5558e576..54639ea3 100644
--- a/archivebox/tests/test_cli_remove.py
+++ b/archivebox/tests/test_cli_remove.py
@@ -7,6 +7,21 @@ Verify remove deletes snapshots from DB and filesystem.
import os
import sqlite3
import subprocess
+from pathlib import Path
+
+
+def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
+ candidates = {snapshot_id}
+ if len(snapshot_id) == 32:
+ candidates.add(f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}")
+ elif len(snapshot_id) == 36 and '-' in snapshot_id:
+ candidates.add(snapshot_id.replace('-', ''))
+
+ for needle in candidates:
+ for path in data_dir.rglob(needle):
+ if path.is_dir():
+ return path
+ return None
def test_remove_deletes_snapshot_from_db(tmp_path, process, disable_extractors_dict):
@@ -44,10 +59,7 @@ def test_remove_deletes_snapshot_from_db(tmp_path, process, disable_extractors_d
def test_remove_deletes_archive_directory(tmp_path, process, disable_extractors_dict):
- """Test that remove deletes the archive directory when using --delete flag.
-
- Archive directories are named by timestamp, not by snapshot ID.
- """
+ """Test that remove --delete removes the current snapshot output directory."""
os.chdir(tmp_path)
# Add a snapshot
@@ -57,24 +69,21 @@ def test_remove_deletes_archive_directory(tmp_path, process, disable_extractors_
env=disable_extractors_dict,
)
- # Get snapshot timestamp
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
- timestamp = c.execute("SELECT timestamp FROM core_snapshot").fetchone()[0]
+ snapshot_id = str(c.execute("SELECT id FROM core_snapshot").fetchone()[0])
conn.close()
- archive_dir = tmp_path / "archive" / str(timestamp)
- assert archive_dir.exists()
+ snapshot_dir = _find_snapshot_dir(tmp_path, snapshot_id)
+ assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id}"
- # Remove snapshot with --delete to remove both DB record and directory
subprocess.run(
['archivebox', 'remove', 'https://example.com', '--yes', '--delete'],
capture_output=True,
env=disable_extractors_dict,
)
- # Archive directory should be deleted
- assert not archive_dir.exists()
+ assert not snapshot_dir.exists()
def test_remove_yes_flag_skips_confirmation(tmp_path, process, disable_extractors_dict):
@@ -158,6 +167,35 @@ def test_remove_with_filter(tmp_path, process, disable_extractors_dict):
assert result.returncode in [0, 1, 2]
+def test_remove_with_regex_filter_deletes_all_matches(tmp_path, process, disable_extractors_dict):
+ """Test regex filters remove every matching snapshot."""
+ os.chdir(tmp_path)
+
+ for url in ['https://example.com', 'https://iana.org']:
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', url],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ['archivebox', 'remove', '--filter-type=regex', '.*', '--yes'],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ conn = sqlite3.connect("index.sqlite3")
+ c = conn.cursor()
+ count_after = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
+ conn.close()
+
+ output = result.stdout.decode("utf-8") + result.stderr.decode("utf-8")
+ assert count_after == 0
+ assert 'Removed' in output or 'Found' in output
+
+
def test_remove_nonexistent_url_fails_gracefully(tmp_path, process, disable_extractors_dict):
"""Test that removing non-existent URL fails gracefully."""
os.chdir(tmp_path)
@@ -169,7 +207,8 @@ def test_remove_nonexistent_url_fails_gracefully(tmp_path, process, disable_extr
)
# Should fail or show error
- assert result.returncode != 0 or 'not found' in result.stdout.lower() or 'no matches' in result.stdout.lower()
+ stdout_text = result.stdout.decode('utf-8', errors='replace').lower()
+ assert result.returncode != 0 or 'not found' in stdout_text or 'no matches' in stdout_text
def test_remove_reports_remaining_link_count_correctly(tmp_path, process, disable_extractors_dict):
diff --git a/archivebox/tests/test_cli_search.py b/archivebox/tests/test_cli_search.py
index 7ae757fc..7d244461 100644
--- a/archivebox/tests/test_cli_search.py
+++ b/archivebox/tests/test_cli_search.py
@@ -4,6 +4,7 @@ Tests for archivebox search command.
Verify search queries snapshots from DB.
"""
+import json
import os
import subprocess
@@ -65,3 +66,145 @@ def test_search_on_empty_archive(tmp_path, process):
# Should complete without error
assert result.returncode in [0, 1]
+
+
+def test_search_json_outputs_matching_snapshots(tmp_path, process, disable_extractors_dict):
+ """Test that search --json returns parseable matching snapshot rows."""
+ os.chdir(tmp_path)
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ['archivebox', 'search', '--json'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ assert result.returncode == 0, result.stderr
+ payload = json.loads(result.stdout)
+ assert any('example.com' in row.get('url', '') for row in payload)
+
+
+def test_search_json_with_headers_wraps_links_payload(tmp_path, process, disable_extractors_dict):
+ """Test that search --json --with-headers returns a headers envelope."""
+ os.chdir(tmp_path)
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ['archivebox', 'search', '--json', '--with-headers'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ assert result.returncode == 0, result.stderr
+ payload = json.loads(result.stdout)
+ links = payload.get('links', payload)
+ assert any('example.com' in row.get('url', '') for row in links)
+
+
+def test_search_html_outputs_markup(tmp_path, process, disable_extractors_dict):
+ """Test that search --html renders an HTML response."""
+ os.chdir(tmp_path)
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ['archivebox', 'search', '--html'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ assert result.returncode == 0, result.stderr
+ assert '<' in result.stdout
+
+
+def test_search_csv_outputs_requested_column(tmp_path, process, disable_extractors_dict):
+ """Test that search --csv emits the requested fields."""
+ os.chdir(tmp_path)
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ['archivebox', 'search', '--csv', 'url', '--with-headers'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ assert result.returncode == 0, result.stderr
+ assert 'url' in result.stdout
+ assert 'example.com' in result.stdout
+
+
+def test_search_with_headers_requires_structured_output_format(tmp_path, process):
+ """Test that --with-headers is rejected without --json, --html, or --csv."""
+ os.chdir(tmp_path)
+
+ result = subprocess.run(
+ ['archivebox', 'search', '--with-headers'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ assert result.returncode != 0
+ assert 'requires' in result.stderr.lower() or 'json' in result.stderr.lower()
+
+
+def test_search_sort_option_runs_successfully(tmp_path, process, disable_extractors_dict):
+ """Test that search --sort accepts sortable fields."""
+ os.chdir(tmp_path)
+ for url in ['https://iana.org', 'https://example.com']:
+ subprocess.run(
+ ['archivebox', 'add', '--index-only', '--depth=0', url],
+ capture_output=True,
+ env=disable_extractors_dict,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ['archivebox', 'search', '--csv', 'url', '--sort=url'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ assert result.returncode == 0, result.stderr
+ assert 'example.com' in result.stdout or 'iana.org' in result.stdout
+
+
+def test_search_help_lists_supported_filters(tmp_path, process):
+ """Test that search --help documents the available filters and output modes."""
+ os.chdir(tmp_path)
+
+ result = subprocess.run(
+ ['archivebox', 'search', '--help'],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+
+ assert result.returncode == 0
+ assert '--filter-type' in result.stdout or '-f' in result.stdout
+ assert '--status' in result.stdout
+ assert '--sort' in result.stdout
diff --git a/archivebox/tests/test_cli_status.py b/archivebox/tests/test_cli_status.py
index b5eb8dc6..e1d419bf 100644
--- a/archivebox/tests/test_cli_status.py
+++ b/archivebox/tests/test_cli_status.py
@@ -202,3 +202,24 @@ def test_status_shows_index_file_info(tmp_path, process):
# Should mention index
assert 'index' in result.stdout.lower() or 'Index' in result.stdout
+
+
+def test_status_help_lists_available_options(tmp_path, process):
+ """Test that status --help works and documents the command."""
+ os.chdir(tmp_path)
+ result = subprocess.run(
+ ['archivebox', 'status', '--help'],
+ capture_output=True,
+ text=True,
+ )
+
+ assert result.returncode == 0
+ assert 'status' in result.stdout.lower() or 'statistic' in result.stdout.lower()
+
+
+def test_status_shows_data_directory_path(tmp_path, process):
+ """Test that status reports which collection directory it is inspecting."""
+ os.chdir(tmp_path)
+ result = subprocess.run(['archivebox', 'status'], capture_output=True, text=True)
+
+ assert 'archive' in result.stdout.lower() or str(tmp_path) in result.stdout
diff --git a/archivebox/tests/test_cli_version.py b/archivebox/tests/test_cli_version.py
index eee2362e..e7d8d210 100644
--- a/archivebox/tests/test_cli_version.py
+++ b/archivebox/tests/test_cli_version.py
@@ -77,6 +77,17 @@ def test_version_quiet_outputs_version_number(tmp_path):
assert len(parts) >= 2
+def test_version_flag_outputs_version_number(tmp_path):
+ """Test that top-level --version reports the package version."""
+ os.chdir(tmp_path)
+ result = subprocess.run(['archivebox', '--version'], capture_output=True, text=True)
+
+ assert result.returncode == 0
+ version = result.stdout.strip()
+ assert version
+ assert len(version.split('.')) >= 2
+
+
def test_version_shows_system_info_in_initialized_dir(tmp_path, process):
"""Test that version shows system metadata in initialized directory."""
os.chdir(tmp_path)
@@ -148,3 +159,20 @@ def test_version_auto_selects_short_tmp_dir_for_deep_collection_path(tmp_path):
assert reported_tmp_dir.exists()
assert not reported_tmp_dir.is_relative_to(default_tmp_dir)
assert len(f"file://{reported_tmp_dir / 'supervisord.sock'}") <= 96
+
+
+def test_version_help_lists_quiet_flag(tmp_path):
+ """Test that version --help documents the quiet output mode."""
+ os.chdir(tmp_path)
+ result = subprocess.run(['archivebox', 'version', '--help'], capture_output=True, text=True)
+
+ assert result.returncode == 0
+ assert '--quiet' in result.stdout or '-q' in result.stdout
+
+
+def test_version_invalid_option_fails(tmp_path):
+ """Test that invalid version options fail cleanly."""
+ os.chdir(tmp_path)
+ result = subprocess.run(['archivebox', 'version', '--invalid-option'], capture_output=True, text=True)
+
+ assert result.returncode != 0
diff --git a/archivebox/tests/test_init.py b/archivebox/tests/test_init.py
deleted file mode 100644
index 3a3697bd..00000000
--- a/archivebox/tests/test_init.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# archivebox init
-# archivebox add
-
-import os
-import subprocess
-import sqlite3
-
-from archivebox.config.common import STORAGE_CONFIG
-
-from .fixtures import disable_extractors_dict, process
-
-FIXTURES = (disable_extractors_dict, process)
-
-DIR_PERMISSIONS = STORAGE_CONFIG.OUTPUT_PERMISSIONS.replace('6', '7').replace('4', '5')
-
-def test_init(tmp_path, process):
- assert "Initializing a new ArchiveBox" in process.stdout.decode("utf-8")
-
-def test_update(tmp_path, process):
- os.chdir(tmp_path)
- update_process = subprocess.run(['archivebox', 'init'], capture_output=True)
- assert "updating existing ArchiveBox" in update_process.stdout.decode("utf-8")
-
-def test_add_link(tmp_path, process, disable_extractors_dict):
- os.chdir(tmp_path)
- add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True, env=disable_extractors_dict)
- assert add_process.returncode == 0, add_process.stderr.decode("utf-8")
-
- # In the new architecture, URLs are saved to source files
- # Check that a source file was created with the URL
- sources_dir = tmp_path / "sources"
- assert sources_dir.exists(), "Sources directory should be created"
- source_files = list(sources_dir.glob("*cli_add.txt"))
- assert len(source_files) >= 1, "Source file should be created"
- source_content = source_files[0].read_text()
- assert "https://example.com" in source_content
-
-
-def test_add_multiple_urls(tmp_path, process, disable_extractors_dict):
- """Test adding multiple URLs via command line arguments"""
- os.chdir(tmp_path)
- add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com', 'https://iana.org'],
- capture_output=True, env=disable_extractors_dict)
- assert add_process.returncode == 0, add_process.stderr.decode("utf-8")
-
- # Check that a source file was created with both URLs
- sources_dir = tmp_path / "sources"
- assert sources_dir.exists(), "Sources directory should be created"
- source_files = list(sources_dir.glob("*cli_add.txt"))
- assert len(source_files) >= 1, "Source file should be created"
- source_content = source_files[-1].read_text()
- assert "https://example.com" in source_content
- assert "https://iana.org" in source_content
-
-def test_correct_permissions_output_folder(tmp_path, process):
- index_files = ['index.sqlite3', 'archive']
- for file in index_files:
- file_path = tmp_path / file
- assert oct(file_path.stat().st_mode)[-3:] in (STORAGE_CONFIG.OUTPUT_PERMISSIONS, DIR_PERMISSIONS)
-
-def test_correct_permissions_add_command_results(tmp_path, process, disable_extractors_dict):
- os.chdir(tmp_path)
- add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True,
- env=disable_extractors_dict)
- assert add_process.returncode == 0, add_process.stderr.decode("utf-8")
-
- # Check database permissions
- assert oct((tmp_path / "index.sqlite3").stat().st_mode)[-3:] in (STORAGE_CONFIG.OUTPUT_PERMISSIONS, DIR_PERMISSIONS)
-
-def test_collision_urls_different_timestamps(tmp_path, process, disable_extractors_dict):
- os.chdir(tmp_path)
- subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True,
- env=disable_extractors_dict)
- subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True,
- env=disable_extractors_dict)
-
- # Check both URLs are in database
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- count = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
- conn.close()
-
- assert count == 2
-
-def test_unrecognized_folders(tmp_path, process, disable_extractors_dict):
- os.chdir(tmp_path)
- subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True,
- env=disable_extractors_dict)
- (tmp_path / "archive" / "some_random_folder").mkdir(parents=True, exist_ok=True)
-
- init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
- # Just check that init completes successfully
- assert init_process.returncode == 0
diff --git a/archivebox/tests/test_install.py b/archivebox/tests/test_install.py
deleted file mode 100644
index af967500..00000000
--- a/archivebox/tests/test_install.py
+++ /dev/null
@@ -1,128 +0,0 @@
-#!/usr/bin/env python3
-"""Integration tests for archivebox install command."""
-
-import os
-import subprocess
-import sqlite3
-
-import pytest
-
-
-
-class TestInstallDryRun:
- """Test the dry-run mode of install command."""
-
- def test_dry_run_prints_message(self, tmp_path, process):
- """Test that dry-run mode prints appropriate message."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'install', '--dry-run'],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- assert 'Dry run' in result.stdout
-
- def test_dry_run_does_not_create_crawl(self, tmp_path, process):
- """Test that dry-run mode doesn't create a crawl."""
- os.chdir(tmp_path)
-
- # Get initial crawl count
- conn = sqlite3.connect('index.sqlite3')
- c = conn.cursor()
- c.execute("SELECT COUNT(*) FROM crawls_crawl")
- initial_count = c.fetchone()[0]
- conn.close()
-
- # Run install with dry-run
- result = subprocess.run(
- ['archivebox', 'install', '--dry-run'],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
-
- # Check crawl count unchanged
- conn = sqlite3.connect('index.sqlite3')
- c = conn.cursor()
- c.execute("SELECT COUNT(*) FROM crawls_crawl")
- final_count = c.fetchone()[0]
- conn.close()
-
- assert final_count == initial_count
-
-
-class TestInstallOutput:
- """Test the output/messages from install command."""
-
- def test_install_prints_detecting_message(self, tmp_path, process, disable_extractors_dict):
- """Test that install prints detecting dependencies message."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'install', '--dry-run'],
- capture_output=True,
- text=True,
- env=disable_extractors_dict,
- )
-
- assert result.returncode == 0
- # Should mention detecting or dependencies
- output = result.stdout.lower()
- assert 'detect' in output or 'dependenc' in output or 'dry run' in output
-
-
-class TestInstallCLI:
- """Test the CLI interface for install command."""
-
- def test_cli_help(self, tmp_path):
- """Test that --help works for install command."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'install', '--help'],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- assert '--dry-run' in result.stdout or '-d' in result.stdout
-
- def test_cli_invalid_option(self, tmp_path):
- """Test that invalid options are handled."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'install', '--invalid-option'],
- capture_output=True,
- text=True,
- )
-
- # Should fail with non-zero exit code
- assert result.returncode != 0
-
-
-class TestInstallInitialization:
- """Test that install initializes the data directory if needed."""
-
- def test_install_from_empty_dir(self, tmp_path):
- """Test that install from empty dir initializes first."""
- os.chdir(tmp_path)
-
- # Don't use process fixture - start from empty dir
- result = subprocess.run(
- ['archivebox', 'install', '--dry-run'],
- capture_output=True,
- text=True,
- )
-
- # Should either initialize or show dry run message
- output = result.stdout
- assert 'Initializing' in output or 'Dry run' in output or 'init' in output.lower()
-
-
-if __name__ == '__main__':
- pytest.main([__file__, '-v'])
diff --git a/archivebox/tests/test_list.py b/archivebox/tests/test_list.py
deleted file mode 100644
index 2aaad4fa..00000000
--- a/archivebox/tests/test_list.py
+++ /dev/null
@@ -1,98 +0,0 @@
-import json
-import subprocess
-
-from .fixtures import disable_extractors_dict, process
-
-FIXTURES = (disable_extractors_dict, process)
-
-def test_search_json(process, disable_extractors_dict):
- subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
- search_process = subprocess.run(["archivebox", "search", "--json"], capture_output=True)
- output_str = search_process.stdout.decode("utf-8").strip()
- # Handle potential control characters in output
- try:
- output_json = json.loads(output_str)
- except json.JSONDecodeError:
- # Try with strict=False if there are control characters
- import re
- # Remove ANSI escape sequences and control characters
- clean_str = re.sub(r'\x1b\[[0-9;]*m', '', output_str)
- clean_str = re.sub(r'[\x00-\x1f\x7f]', lambda m: ' ' if m.group(0) in '\t\n\r' else '', clean_str)
- output_json = json.loads(clean_str)
- # Verify we get at least one snapshot back
- assert len(output_json) >= 1
- # Should include the requested URL
- assert any("example.com" in entry.get("url", "") for entry in output_json)
-
-
-def test_search_json_headers(process, disable_extractors_dict):
- subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
- search_process = subprocess.run(["archivebox", "search", "--json", "--with-headers"], capture_output=True)
- output_str = search_process.stdout.decode("utf-8").strip()
- # Handle potential control characters in output
- try:
- output_json = json.loads(output_str)
- except json.JSONDecodeError:
- # Try with strict=False if there are control characters
- import re
- # Remove ANSI escape sequences and control characters
- clean_str = re.sub(r'\x1b\[[0-9;]*m', '', output_str)
- clean_str = re.sub(r'[\x00-\x1f\x7f]', lambda m: ' ' if m.group(0) in '\t\n\r' else '', clean_str)
- output_json = json.loads(clean_str)
- # The response should have a links key with headers mode
- links = output_json.get("links", output_json)
- assert len(links) >= 1
-
-def test_search_html(process, disable_extractors_dict):
- subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
- search_process = subprocess.run(["archivebox", "search", "--html"], capture_output=True)
- output_html = search_process.stdout.decode("utf-8")
- # Should contain some HTML and reference to the source file
- assert "sources" in output_html or "cli_add" in output_html or "<" in output_html
-
-def test_search_html_headers(process, disable_extractors_dict):
- subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
- search_process = subprocess.run(["archivebox", "search", "--html", "--with-headers"], capture_output=True)
- output_html = search_process.stdout.decode("utf-8")
- # Should contain HTML
- assert "<" in output_html
-
-def test_search_csv(process, disable_extractors_dict):
- subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
- search_process = subprocess.run(["archivebox", "search", "--csv", "url"], capture_output=True)
- output_csv = search_process.stdout.decode("utf-8")
- # Should contain the requested URL
- assert "example.com" in output_csv
-
-def test_search_csv_headers(process, disable_extractors_dict):
- subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
- search_process = subprocess.run(["archivebox", "search", "--csv", "url", "--with-headers"], capture_output=True)
- output_csv = search_process.stdout.decode("utf-8")
- # Should have url header and requested URL
- assert "url" in output_csv
- assert "example.com" in output_csv
-
-def test_search_with_headers_requires_format(process):
- search_process = subprocess.run(["archivebox", "search", "--with-headers"], capture_output=True)
- stderr = search_process.stderr.decode("utf-8")
- assert "--with-headers" in stderr and ("requires" in stderr or "can only be used" in stderr)
-
-def test_sort_by_url(process, disable_extractors_dict):
- # Add two URLs - they will create separate source files
- subprocess.run(["archivebox", "add", "--index-only", "https://iana.org", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
- subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
- capture_output=True, env=disable_extractors_dict)
-
- # Search with sort should return results (even if they're file:// URLs)
- search_process = subprocess.run(["archivebox", "search", "--csv", "url", "--sort=url"], capture_output=True)
- output = search_process.stdout.decode("utf-8")
- lines = [line for line in output.strip().split("\n") if line]
- # Should have at least 2 snapshots (the source file snapshots)
- assert len(lines) >= 2
diff --git a/archivebox/tests/test_migrations_04_to_09.py b/archivebox/tests/test_migrations_04_to_09.py
index a197a09e..f98345c7 100644
--- a/archivebox/tests/test_migrations_04_to_09.py
+++ b/archivebox/tests/test_migrations_04_to_09.py
@@ -12,6 +12,7 @@ import sqlite3
import tempfile
import unittest
from pathlib import Path
+from typing import cast
from .migrations_helpers import (
SCHEMA_0_4,
@@ -74,7 +75,7 @@ class TestMigrationFrom04x(unittest.TestCase):
# Collect unique tags from original data
original_tags = set()
- for tags_str in self.original_data['tags_str']:
+ for tags_str in cast(list[str], self.original_data['tags_str']):
if tags_str:
for tag in tags_str.split(','):
original_tags.add(tag.strip())
diff --git a/archivebox/tests/test_remove.py b/archivebox/tests/test_remove.py
deleted file mode 100644
index 078f4e06..00000000
--- a/archivebox/tests/test_remove.py
+++ /dev/null
@@ -1,89 +0,0 @@
-import os
-import sqlite3
-import subprocess
-
-from .fixtures import disable_extractors_dict, process
-
-FIXTURES = (disable_extractors_dict, process)
-
-def test_remove_single_snapshot(tmp_path, process, disable_extractors_dict):
- """Test removing a snapshot by URL pattern"""
- os.chdir(tmp_path)
- # Add a URL - creates source file snapshot
- subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict)
-
- # Verify snapshot exists
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- count_before = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0]
- conn.close()
- assert count_before >= 1
-
- # Remove all snapshots (including source file snapshots)
- remove_process = subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes'], capture_output=True)
- # Check that it ran successfully (either output indicates success or return code 0)
- output = remove_process.stdout.decode("utf-8") + remove_process.stderr.decode("utf-8")
- assert remove_process.returncode == 0 or "removed" in output.lower() or "Found" in output
-
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- count = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0]
- conn.close()
-
- assert count == 0
-
-
-def test_remove_with_delete_flag(tmp_path, process, disable_extractors_dict):
- """Test removing snapshot with --delete also removes archive folder"""
- os.chdir(tmp_path)
- subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict)
-
- # Get archives before delete
- archive_dir = tmp_path / "archive"
- archives_before = list(archive_dir.iterdir()) if archive_dir.exists() else []
-
- # Only run the rest of the test if archives were created
- if archives_before:
- subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True)
- archives_after = list(archive_dir.iterdir()) if archive_dir.exists() else []
- assert len(archives_after) < len(archives_before)
- else:
- # With --index-only, archive folders may not be created immediately
- # Just verify that remove command doesn't error
- remove_result = subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True)
- assert remove_result.returncode in (0, 1) # 0 = success, 1 = no matches
-
-
-def test_remove_regex(tmp_path, process, disable_extractors_dict):
- """Test removing snapshots by regex pattern"""
- os.chdir(tmp_path)
- subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict)
- subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True, env=disable_extractors_dict)
-
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- count_before = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0]
- conn.close()
- assert count_before >= 2
-
- subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True)
-
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- count_after = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0]
- conn.close()
- assert count_after == 0
-
-
-def test_add_creates_crawls(tmp_path, process, disable_extractors_dict):
- """Test that adding URLs creates crawls in database"""
- os.chdir(tmp_path)
- subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict)
- subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True, env=disable_extractors_dict)
-
- conn = sqlite3.connect("index.sqlite3")
- c = conn.cursor()
- crawl_count = c.execute("SELECT COUNT() FROM crawls_crawl").fetchone()[0]
- conn.close()
-
- assert crawl_count == 2
diff --git a/archivebox/tests/test_search.py b/archivebox/tests/test_search.py
deleted file mode 100644
index 9b141be8..00000000
--- a/archivebox/tests/test_search.py
+++ /dev/null
@@ -1,142 +0,0 @@
-#!/usr/bin/env python3
-"""Integration tests for archivebox search command."""
-
-import os
-import subprocess
-
-import pytest
-
-
-
-def test_search_returns_snapshots(tmp_path, process, disable_extractors_dict):
- """Test that search returns snapshots."""
- os.chdir(tmp_path)
-
- # Add some snapshots
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- result = subprocess.run(
- ['archivebox', 'search'],
- capture_output=True,
- text=True,
- )
-
- # Should return some output (path or URL info)
- assert result.stdout.strip() != '' or result.returncode == 0
-
-
-def test_search_filter_by_substring(tmp_path, process, disable_extractors_dict):
- """Test that substring filter works."""
- os.chdir(tmp_path)
-
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- # Search with filter - may not find if URL isn't stored as expected
- result = subprocess.run(
- ['archivebox', 'search', '--filter-type=substring', 'example'],
- capture_output=True,
- text=True,
- )
-
- # Should run without error
- assert result.returncode == 0 or 'No Snapshots' in result.stderr
-
-
-def test_search_sort_option(tmp_path, process, disable_extractors_dict):
- """Test that --sort option works."""
- os.chdir(tmp_path)
-
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- result = subprocess.run(
- ['archivebox', 'search', '--sort=url'],
- capture_output=True,
- text=True,
- )
-
- # Should run without error
- assert result.returncode == 0
-
-
-def test_search_with_headers_requires_format(tmp_path, process):
- """Test that --with-headers requires --json, --html, or --csv."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'search', '--with-headers'],
- capture_output=True,
- text=True,
- )
-
- # Should fail with error message
- assert result.returncode != 0
- assert 'requires' in result.stderr.lower() or 'json' in result.stderr.lower()
-
-
-def test_search_status_option(tmp_path, process, disable_extractors_dict):
- """Test that --status option filters by status."""
- os.chdir(tmp_path)
-
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- result = subprocess.run(
- ['archivebox', 'search', '--status=indexed'],
- capture_output=True,
- text=True,
- )
-
- # Should run without error
- assert result.returncode == 0
-
-
-def test_search_no_snapshots_message(tmp_path, process):
- """Test that searching empty archive shows appropriate output."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'search'],
- capture_output=True,
- text=True,
- )
-
- # Should complete (empty results are OK)
- assert result.returncode == 0
-
-
-class TestSearchCLI:
- """Test the CLI interface for search command."""
-
- def test_cli_help(self, tmp_path, process):
- """Test that --help works for search command."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'search', '--help'],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- assert '--filter-type' in result.stdout or '-f' in result.stdout
- assert '--status' in result.stdout
- assert '--sort' in result.stdout
-
-
-if __name__ == '__main__':
- pytest.main([__file__, '-v'])
diff --git a/archivebox/tests/test_status.py b/archivebox/tests/test_status.py
deleted file mode 100644
index 9035374d..00000000
--- a/archivebox/tests/test_status.py
+++ /dev/null
@@ -1,195 +0,0 @@
-#!/usr/bin/env python3
-"""Integration tests for archivebox status command."""
-
-import os
-import subprocess
-
-import pytest
-
-
-
-def test_status_shows_index_info(tmp_path, process):
- """Test that status shows index information."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should show index scanning info
- assert 'index' in result.stdout.lower() or 'Index' in result.stdout
-
-
-def test_status_shows_snapshot_count(tmp_path, process, disable_extractors_dict):
- """Test that status shows snapshot count."""
- os.chdir(tmp_path)
-
- # Add some snapshots
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://iana.org'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should show link/snapshot count
- assert '2' in result.stdout or 'links' in result.stdout.lower()
-
-
-def test_status_shows_archive_size(tmp_path, process, disable_extractors_dict):
- """Test that status shows archive size information."""
- os.chdir(tmp_path)
-
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should show size info (bytes, KB, MB, etc)
- assert 'Size' in result.stdout or 'size' in result.stdout or 'B' in result.stdout
-
-
-def test_status_shows_indexed_count(tmp_path, process, disable_extractors_dict):
- """Test that status shows indexed folder count."""
- os.chdir(tmp_path)
-
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should show indexed count
- assert 'indexed' in result.stdout.lower()
-
-
-def test_status_shows_archived_vs_unarchived(tmp_path, process, disable_extractors_dict):
- """Test that status shows archived vs unarchived counts."""
- os.chdir(tmp_path)
-
- # Add index-only snapshot (unarchived)
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should show archived/unarchived categories
- assert 'archived' in result.stdout.lower() or 'unarchived' in result.stdout.lower()
-
-
-def test_status_shows_data_directory_info(tmp_path, process):
- """Test that status shows data directory path."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should show data directory or archive path
- assert 'archive' in result.stdout.lower() or str(tmp_path) in result.stdout
-
-
-def test_status_shows_user_info(tmp_path, process):
- """Test that status shows user information."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should show user info section
- assert 'user' in result.stdout.lower() or 'login' in result.stdout.lower()
-
-
-def test_status_empty_archive(tmp_path, process):
- """Test status on empty archive shows zero counts."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should still run successfully
- assert result.returncode == 0 or 'index' in result.stdout.lower()
- # Should show 0 links
- assert '0' in result.stdout or 'links' in result.stdout.lower()
-
-
-def test_status_shows_valid_vs_invalid(tmp_path, process, disable_extractors_dict):
- """Test that status shows valid vs invalid folder counts."""
- os.chdir(tmp_path)
-
- subprocess.run(
- ['archivebox', 'add', '--index-only', 'https://example.com'],
- capture_output=True,
- env=disable_extractors_dict,
- )
-
- result = subprocess.run(
- ['archivebox', 'status'],
- capture_output=True,
- text=True,
- )
-
- # Should show valid/invalid categories
- assert 'valid' in result.stdout.lower() or 'present' in result.stdout.lower()
-
-
-class TestStatusCLI:
- """Test the CLI interface for status command."""
-
- def test_cli_help(self, tmp_path, process):
- """Test that --help works for status command."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'status', '--help'],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Help should show some info about the command
- assert 'status' in result.stdout.lower() or 'statistic' in result.stdout.lower()
-
-
-if __name__ == '__main__':
- pytest.main([__file__, '-v'])
diff --git a/archivebox/tests/test_version.py b/archivebox/tests/test_version.py
deleted file mode 100644
index 7ad7705d..00000000
--- a/archivebox/tests/test_version.py
+++ /dev/null
@@ -1,158 +0,0 @@
-#!/usr/bin/env python3
-"""Integration tests for archivebox version command."""
-
-import os
-import subprocess
-
-import pytest
-
-
-
-class TestVersionQuiet:
- """Test the quiet/minimal version output."""
-
- def test_version_prints_version_number(self, tmp_path):
- """Test that version prints the version number."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'version', '--quiet'],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Should contain a version string like "0.8.0" or similar
- version = result.stdout.strip()
- assert version
- # Version should be a valid semver-ish format
- parts = version.split('.')
- assert len(parts) >= 2 # At least major.minor
-
- def test_version_flag_prints_version_number(self, tmp_path):
- """Test that --version flag prints the version number."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', '--version'],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- version = result.stdout.strip()
- assert version
- parts = version.split('.')
- assert len(parts) >= 2
-
-
-class TestVersionFull:
- """Test the full version output."""
-
- def test_version_shows_system_info(self, tmp_path, process):
- """Test that version shows system information."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'version'],
- capture_output=True,
- text=True,
- )
-
- output = result.stdout
-
- # Should show basic system info (exit code may be 1 if binaries missing)
- assert 'ArchiveBox' in output
-
- def test_version_shows_binary_section(self, tmp_path, process):
- """Test that version shows binary dependencies section."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'version'],
- capture_output=True,
- text=True,
- )
-
- output = result.stdout
-
- # Should show binary dependencies section
- assert 'Binary' in output or 'Dependenc' in output
-
- def test_version_shows_data_locations(self, tmp_path, process):
- """Test that version shows data locations."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'version'],
- capture_output=True,
- text=True,
- )
-
- output = result.stdout
-
- # Should show data/code locations
- assert 'Data' in output or 'location' in output.lower() or 'DIR' in output or 'Code' in output
-
-
-class TestVersionWithBinaries:
- """Test version output after running install."""
-
- def test_version_shows_binary_status(self, tmp_path, process, disable_extractors_dict):
- """Test that version shows binary status (installed or not)."""
- os.chdir(tmp_path)
-
- # First run install (with dry-run to speed up)
- subprocess.run(
- ['archivebox', 'install', '--dry-run'],
- capture_output=True,
- text=True,
- env=disable_extractors_dict,
- )
-
- # Now check version
- result = subprocess.run(
- ['archivebox', 'version'],
- capture_output=True,
- text=True,
- env=disable_extractors_dict,
- )
-
- output = result.stdout
-
- # Should show binary status (either installed or not installed)
- assert 'installed' in output.lower() or 'Binary' in output
-
-
-class TestVersionCLI:
- """Test the CLI interface for version command."""
-
- def test_cli_help(self, tmp_path):
- """Test that --help works for version command."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'version', '--help'],
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- assert '--quiet' in result.stdout or '-q' in result.stdout
-
- def test_cli_invalid_option(self, tmp_path):
- """Test that invalid options are handled."""
- os.chdir(tmp_path)
-
- result = subprocess.run(
- ['archivebox', 'version', '--invalid-option'],
- capture_output=True,
- text=True,
- )
-
- # Should fail with non-zero exit code
- assert result.returncode != 0
-
-
-if __name__ == '__main__':
- pytest.main([__file__, '-v'])
diff --git a/archivebox/uuid_compat.py b/archivebox/uuid_compat.py
index d9b7c456..5a422a47 100755
--- a/archivebox/uuid_compat.py
+++ b/archivebox/uuid_compat.py
@@ -1,40 +1,17 @@
-"""UUID7 compatibility layer for Python 3.13+
-
-Python 3.14+ has native uuid7 support. For Python 3.13, we use uuid_extensions.
-
-IMPORTANT: We also monkey-patch uuid.uuid7 for backward compatibility with
-migrations that were auto-generated on Python 3.14+ systems.
-"""
+"""UUID7 compatibility layer."""
import sys
import uuid
-import functools
+from importlib import import_module
if sys.version_info >= (3, 14):
- from uuid import uuid7 as _uuid7
+ _UUID7_GENERATOR = getattr(uuid, 'uuid7')
else:
- try:
- from uuid_extensions import uuid7 as _uuid7
- except ImportError:
- raise ImportError(
- "uuid_extensions package is required for Python <3.14. "
- "Install it with: pip install uuid_extensions"
- )
-
- # Monkey-patch uuid module for migrations generated on Python 3.14+
- # that reference uuid.uuid7 directly
- if not hasattr(uuid, 'uuid7'):
- uuid.uuid7 = _uuid7
+ _UUID7_GENERATOR = getattr(import_module('uuid_extensions'), 'uuid7')
-@functools.wraps(_uuid7)
-def uuid7():
- """Generate a UUID7 (time-ordered UUID).
-
- This wrapper ensures Django migrations always reference
- 'archivebox.uuid_compat.uuid7' regardless of Python version.
- """
- return _uuid7()
+def uuid7() -> uuid.UUID:
+ return _UUID7_GENERATOR()
__all__ = ['uuid7']
diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py
index 7dbd66f0..f0ea7b96 100644
--- a/archivebox/workers/orchestrator.py
+++ b/archivebox/workers/orchestrator.py
@@ -31,7 +31,7 @@ __package__ = 'archivebox.workers'
import os
import time
from typing import Type
-from datetime import timedelta
+from datetime import datetime, timedelta
from multiprocessing import Process as MPProcess
from pathlib import Path
@@ -189,7 +189,7 @@ class Orchestrator:
event='Shutting down',
indent_level=0,
pid=self.pid,
- error=error if error and not isinstance(error, KeyboardInterrupt) else None,
+ error=error if isinstance(error, Exception) and not isinstance(error, KeyboardInterrupt) else None,
)
def get_total_worker_count(self) -> int:
@@ -567,7 +567,8 @@ class Orchestrator:
status=ArchiveResult.StatusChoices.STARTED,
).select_related('process')
for ar in started_ars:
- if ar.process_id and ar.process and ar.process.status == Process.StatusChoices.RUNNING:
+ process_id = getattr(ar, 'process_id', None)
+ if process_id and ar.process and ar.process.status == Process.StatusChoices.RUNNING:
try:
ar.process.kill_tree(graceful_timeout=0.0)
except Exception:
@@ -904,28 +905,29 @@ class Orchestrator:
size = ''
stderr_tail = ''
if ar:
- if ar.process_id and ar.process:
+ process_id = getattr(ar, 'process_id', None)
+ if process_id and ar.process:
stderr_tail = _tail_stderr_line(ar.process)
if ar.status == ArchiveResult.StatusChoices.STARTED:
status = 'started'
is_running = True
is_pending = False
- start_ts = ar.start_ts or (ar.process.started_at if ar.process_id and ar.process else None)
+ start_ts = ar.start_ts or (ar.process.started_at if process_id and ar.process else None)
if start_ts:
elapsed = _format_seconds((now - start_ts).total_seconds())
hook_timeout = None
- if ar.process_id and ar.process and ar.process.timeout:
+ if process_id and ar.process and ar.process.timeout:
hook_timeout = ar.process.timeout
hook_timeout = hook_timeout or hook_timeouts.get(hook_name)
if hook_timeout:
timeout = _format_seconds(hook_timeout)
else:
status = ar.status
- if ar.process_id and ar.process and ar.process.exit_code == 137:
+ if process_id and ar.process and ar.process.exit_code == 137:
status = 'failed'
is_pending = False
- start_ts = ar.start_ts or (ar.process.started_at if ar.process_id and ar.process else None)
- end_ts = ar.end_ts or (ar.process.ended_at if ar.process_id and ar.process else None)
+ start_ts = ar.start_ts or (ar.process.started_at if process_id and ar.process else None)
+ end_ts = ar.end_ts or (ar.process.ended_at if process_id and ar.process else None)
if start_ts and end_ts:
elapsed = _format_seconds((end_ts - start_ts).total_seconds())
size = _format_size(getattr(ar, 'output_size', None))
@@ -1093,7 +1095,7 @@ class Orchestrator:
from archivebox.core.models import Snapshot
# Get all started snapshots (optionally filtered by crawl_id)
- snapshot_filter = {'status': 'started'}
+ snapshot_filter: dict[str, str | datetime] = {'status': 'started'}
if self.crawl_id:
snapshot_filter['crawl_id'] = self.crawl_id
else:
diff --git a/archivebox/workers/supervisord_util.py b/archivebox/workers/supervisord_util.py
index b85865cc..1adcdaca 100644
--- a/archivebox/workers/supervisord_util.py
+++ b/archivebox/workers/supervisord_util.py
@@ -335,6 +335,7 @@ def start_worker(supervisor, daemon, lazy=False):
for added in added:
supervisor.addProcessGroup(added)
+ procs = []
for _ in range(25):
procs = supervisor.getAllProcessInfo()
for proc in procs:
diff --git a/archivebox/workers/tests/test_scheduled_crawls.py b/archivebox/workers/tests/test_scheduled_crawls.py
index 0a7645be..9162279e 100644
--- a/archivebox/workers/tests/test_scheduled_crawls.py
+++ b/archivebox/workers/tests/test_scheduled_crawls.py
@@ -1,7 +1,9 @@
from datetime import timedelta
+from typing import cast
from unittest.mock import patch
from django.contrib.auth import get_user_model
+from django.contrib.auth.models import UserManager
from django.test import TestCase
from django.utils import timezone
@@ -12,7 +14,8 @@ from archivebox.workers.worker import CrawlWorker
class TestScheduledCrawlMaterialization(TestCase):
def setUp(self):
- self.user = get_user_model().objects.create_user(
+ user_manager = cast(UserManager, get_user_model().objects)
+ self.user = user_manager.create_user(
username='schedule-user',
password='password',
)
@@ -52,6 +55,8 @@ class TestScheduledCrawlMaterialization(TestCase):
self.assertEqual(scheduled_crawls.count(), 2)
queued_crawl = scheduled_crawls.last()
+ self.assertIsNotNone(queued_crawl)
+ assert queued_crawl is not None
self.assertEqual(queued_crawl.status, Crawl.StatusChoices.QUEUED)
self.assertEqual(queued_crawl.urls, 'https://example.com/feed.xml')
self.assertEqual(queued_crawl.max_depth, 1)
@@ -63,7 +68,7 @@ class TestScheduledCrawlMaterialization(TestCase):
Orchestrator(exit_on_idle=True)._materialize_due_schedules()
self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1)
- Orchestrator(exit_on_idle=False, crawl_id=str(schedule.template_id))._materialize_due_schedules()
+ Orchestrator(exit_on_idle=False, crawl_id=str(schedule.template.id))._materialize_due_schedules()
self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1)
@patch.object(CrawlWorker, 'start')
diff --git a/archivebox/workers/tests/test_snapshot_worker.py b/archivebox/workers/tests/test_snapshot_worker.py
index 4233e69c..bb2be6d4 100644
--- a/archivebox/workers/tests/test_snapshot_worker.py
+++ b/archivebox/workers/tests/test_snapshot_worker.py
@@ -1,5 +1,6 @@
from pathlib import Path
from types import SimpleNamespace
+from typing import Any, cast
from unittest.mock import patch
from django.test import SimpleTestCase
@@ -11,14 +12,14 @@ class TestSnapshotWorkerRetryForegroundHooks(SimpleTestCase):
def _make_worker(self):
worker = SnapshotWorker.__new__(SnapshotWorker)
worker.pid = 12345
- worker.snapshot = SimpleNamespace(
+ cast(Any, worker).snapshot = SimpleNamespace(
status='started',
refresh_from_db=lambda: None,
)
worker._snapshot_exceeded_hard_timeout = lambda: False
worker._seal_snapshot_due_to_timeout = lambda: None
worker._run_hook = lambda *args, **kwargs: SimpleNamespace()
- worker._wait_for_hook = lambda *args, **kwargs: None
+ worker._wait_for_hook = lambda process, ar: None
return worker
@patch('archivebox.workers.worker.log_worker_event')
@@ -49,10 +50,10 @@ class TestSnapshotWorkerRetryForegroundHooks(SimpleTestCase):
run_calls.append((args, kwargs))
return SimpleNamespace()
- def wait_for_hook(process, archive_result):
- wait_calls.append((process, archive_result))
- archive_result.status = 'succeeded'
- archive_result.output_files = {'singlefile.html': {}}
+ def wait_for_hook(process, ar):
+ wait_calls.append((process, ar))
+ ar.status = 'succeeded'
+ ar.output_files = {'singlefile.html': {}}
archive_result = SimpleNamespace(
status='failed',
diff --git a/pyproject.toml b/pyproject.toml
index f0f2f779..d179814d 100755
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -68,7 +68,6 @@ dependencies = [
"python-benedict[io,parse]>=0.33.2", # for: dict replacement all over the codebase to allow .attr-style access
"base32-crockford>=0.3.0", # for: encoding UUIDs in base32
### Static Typing
- "mypy-extensions>=1.0.0", # for: django-stubs type hints (TODO: remove in favor of pylance/pyright?)
"django-stubs>=5.0.4", # for: vscode type hints on models and common django APIs
### API clients
"requests>=2.32.3", # for: fetching title, static files, headers (TODO: replace with httpx?)
@@ -86,7 +85,7 @@ dependencies = [
"abx-plugins>=1.9.11", # shared ArchiveBox plugin package with install_args-only overrides
"gallery-dl>=1.31.1",
### UUID7 backport for Python <3.14
- "uuid7>=0.1.0; python_version < '3.14'", # for: uuid7 support on Python 3.13 (provides uuid_extensions module)
+ "uuid7>=0.1.0; python_version < '3.14'", # provides the uuid_extensions module on Python 3.13
"pytest-django>=4.11.1",
]
@@ -244,21 +243,6 @@ directory = "htmlcov"
output = "coverage.json"
show_contexts = true
-[tool.mypy]
-mypy_path = "typings"
-namespace_packages = true
-explicit_package_bases = true
-# follow_imports = "silent"
-# ignore_missing_imports = true
-# disallow_incomplete_defs = true
-# disallow_untyped_defs = true
-# disallow_untyped_decorators = true
-# exclude = "tests/.*"
-plugins = ["mypy_django_plugin.main"]
-
-[tool.django-stubs]
-django_settings_module = "archivebox.core.settings"
-
[tool.pyright]
include = [
"archivebox",