type and test fixes

This commit is contained in:
Nick Sweeting
2026-03-15 20:12:27 -07:00
parent 3889eb4efa
commit bc21d4bfdb
52 changed files with 762 additions and 1317 deletions

View File

@@ -16,9 +16,6 @@ import sys
from pathlib import Path
from typing import Protocol, cast
# Import uuid_compat early to monkey-patch uuid.uuid7 before Django loads migrations
# This fixes migrations generated on Python 3.14+ that reference uuid.uuid7 directly
from archivebox import uuid_compat # noqa: F401
from abx_plugins import get_plugins_dir

View File

@@ -8,7 +8,6 @@ from pathlib import Path
from typing import TYPE_CHECKING, Callable
import rich_click as click
from rich import print
from django.db.models import Q, QuerySet
@@ -212,7 +211,11 @@ def search(filter_patterns: list[str] | None=None,
folders: dict[str, Snapshot | None] = {snapshot.output_dir: snapshot for snapshot in snapshots}
output = printable_folders(folders, with_headers)
print(output)
# Structured exports must be written directly to stdout.
# rich.print() reflows long lines to console width, which corrupts JSON/CSV/HTML output.
sys.stdout.write(output)
if not output.endswith('\n'):
sys.stdout.write('\n')
return output

View File

@@ -29,6 +29,7 @@ import tempfile
import unittest
from io import StringIO
from pathlib import Path
from typing import TypeVar
# Test configuration - disable slow extractors
TEST_CONFIG = {
@@ -58,6 +59,14 @@ TEST_CONFIG = {
os.environ.update(TEST_CONFIG)
T = TypeVar('T')
def require(value: T | None) -> T:
if value is None:
raise AssertionError('Expected value to be present')
return value
# =============================================================================
# JSONL Utility Tests
@@ -70,8 +79,7 @@ class TestJSONLParsing(unittest.TestCase):
"""Plain URLs should be parsed as Snapshot records."""
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
result = parse_line('https://example.com')
self.assertIsNotNone(result)
result = require(parse_line('https://example.com'))
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['url'], 'https://example.com')
@@ -80,8 +88,7 @@ class TestJSONLParsing(unittest.TestCase):
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
line = '{"type": "Snapshot", "url": "https://example.com", "tags": "test,demo"}'
result = parse_line(line)
self.assertIsNotNone(result)
result = require(parse_line(line))
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['url'], 'https://example.com')
self.assertEqual(result['tags'], 'test,demo')
@@ -91,8 +98,7 @@ class TestJSONLParsing(unittest.TestCase):
from archivebox.misc.jsonl import parse_line, TYPE_CRAWL
line = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com", "max_depth": 1}'
result = parse_line(line)
self.assertIsNotNone(result)
result = require(parse_line(line))
self.assertEqual(result['type'], TYPE_CRAWL)
self.assertEqual(result['id'], 'abc123')
self.assertEqual(result['urls'], 'https://example.com')
@@ -103,8 +109,7 @@ class TestJSONLParsing(unittest.TestCase):
from archivebox.misc.jsonl import parse_line
line = '{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}'
result = parse_line(line)
self.assertIsNotNone(result)
result = require(parse_line(line))
self.assertEqual(result['id'], 'abc123')
self.assertEqual(result['url'], 'https://example.com')
@@ -113,8 +118,7 @@ class TestJSONLParsing(unittest.TestCase):
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
uuid = '01234567-89ab-cdef-0123-456789abcdef'
result = parse_line(uuid)
self.assertIsNotNone(result)
result = require(parse_line(uuid))
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['id'], uuid)
@@ -144,8 +148,7 @@ class TestJSONLParsing(unittest.TestCase):
"""file:// URLs should be parsed."""
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
result = parse_line('file:///path/to/file.txt')
self.assertIsNotNone(result)
result = require(parse_line('file:///path/to/file.txt'))
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['url'], 'file:///path/to/file.txt')
@@ -501,9 +504,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Create crawl with multiple URLs (as newline-separated string)
urls = 'https://test-crawl-1.example.com\nhttps://test-crawl-2.example.com'
crawl = Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id})
self.assertIsNotNone(crawl)
crawl = require(Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id}))
self.assertIsNotNone(crawl.id)
self.assertEqual(crawl.urls, urls)
self.assertEqual(crawl.status, 'queued')
@@ -538,7 +539,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Step 1: Create crawl (simulating 'archivebox crawl')
urls = 'https://crawl-to-snap-1.example.com\nhttps://crawl-to-snap-2.example.com'
crawl = Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id})
crawl = require(Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id}))
crawl_output = crawl.to_json()
# Step 2: Parse crawl output as snapshot input
@@ -590,7 +591,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Create snapshot
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_json(records[0], overrides=overrides)
snapshot = require(Snapshot.from_json(records[0], overrides=overrides))
self.assertIsNotNone(snapshot.id)
self.assertEqual(snapshot.url, url)
@@ -618,7 +619,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Step 1: Create snapshot (simulating 'archivebox snapshot')
url = 'https://test-extract-1.example.com'
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_json({'url': url}, overrides=overrides)
snapshot = require(Snapshot.from_json({'url': url}, overrides=overrides))
snapshot_output = snapshot.to_json()
# Step 2: Parse snapshot output as extract input
@@ -657,7 +658,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# === archivebox crawl https://example.com ===
url = 'https://test-pipeline-full.example.com'
crawl = Crawl.from_json({'url': url}, overrides={'created_by_id': created_by_id})
crawl = require(Crawl.from_json({'url': url}, overrides={'created_by_id': created_by_id}))
crawl_jsonl = json.dumps(crawl.to_json())
# === | archivebox snapshot ===
@@ -728,12 +729,12 @@ class TestDepthWorkflows(unittest.TestCase):
# Create crawl with depth 0
url = 'https://depth0-test.example.com'
crawl = Crawl.from_json({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id})
crawl = require(Crawl.from_json({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id}))
self.assertEqual(crawl.max_depth, 0)
# Create snapshot
snapshot = Snapshot.from_json({'url': url}, overrides={'created_by_id': created_by_id})
snapshot = require(Snapshot.from_json({'url': url}, overrides={'created_by_id': created_by_id}))
self.assertEqual(snapshot.url, url)
def test_depth_metadata_in_crawl(self):
@@ -744,10 +745,10 @@ class TestDepthWorkflows(unittest.TestCase):
created_by_id = get_or_create_system_user_pk()
# Create crawl with depth
crawl = Crawl.from_json(
crawl = require(Crawl.from_json(
{'url': 'https://depth-meta-test.example.com', 'max_depth': 2},
overrides={'created_by_id': created_by_id}
)
))
self.assertEqual(crawl.max_depth, 2)

View File

@@ -17,7 +17,6 @@ import sys
from typing import Dict
from pathlib import Path
from collections.abc import Mapping
from benedict import benedict
@@ -46,7 +45,7 @@ from .version import detect_installed_version
###################### Config ##########################
class ConstantsDict(Mapping):
class ConstantsDict:
PACKAGE_DIR: Path = PACKAGE_DIR
DATA_DIR: Path = DATA_DIR
ARCHIVE_DIR: Path = ARCHIVE_DIR
@@ -225,16 +224,9 @@ class ConstantsDict(Mapping):
def __benedict__(cls):
# when casting to benedict, only include uppercase keys that don't start with an underscore
return benedict({key: value for key, value in cls.__dict__.items() if key.isupper() and not key.startswith('_')})
@classmethod
def __len__(cls):
return len(cls.__benedict__())
@classmethod
def __iter__(cls):
return iter(cls.__benedict__())
CONSTANTS = ConstantsDict()
CONSTANTS = ConstantsDict
CONSTANTS_CONFIG = CONSTANTS.__benedict__()
# add all key: values to globals() for easier importing, e.g.:

View File

@@ -1,16 +1,17 @@
__package__ = 'archivebox.core'
from django.contrib import admin
from django.utils.html import format_html, mark_safe
from django.utils.html import format_html
from django.utils.safestring import mark_safe
from archivebox.misc.paginators import AccelleratedPaginator
from archivebox.base_models.admin import BaseModelAdmin
from archivebox.core.models import Tag
from archivebox.core.models import SnapshotTag, Tag
class TagInline(admin.TabularInline):
model = Tag.snapshot_set.through # type: ignore
model = SnapshotTag
# fk_name = 'snapshot'
fields = ('id', 'tag')
extra = 1
@@ -173,4 +174,3 @@ class TagAdmin(BaseModelAdmin):
def register_admin(admin_site):
admin_site.register(Tag, TagAdmin)

View File

@@ -2,8 +2,9 @@ __package__ = 'archivebox.core'
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from django.utils.html import format_html, mark_safe
from django.contrib.auth import get_user_model
from django.utils.html import format_html
from django.utils.safestring import mark_safe
class CustomUserAdmin(UserAdmin):
@@ -16,7 +17,7 @@ class CustomUserAdmin(UserAdmin):
add_fieldsets = UserAdmin.add_fieldsets
# Extend fieldsets for change form only (not user creation)
fieldsets = [*UserAdmin.fieldsets, ('Data', {'fields': readonly_fields})]
fieldsets = [*(UserAdmin.fieldsets or ()), ('Data', {'fields': readonly_fields})]
@admin.display(description='Snapshots')
def snapshot_set(self, obj):

View File

@@ -22,12 +22,19 @@ def get_plugin_choices():
return [(name, name) for name in get_plugins()]
def get_choice_field(form: forms.Form, name: str) -> forms.ChoiceField:
field = form.fields[name]
if not isinstance(field, forms.ChoiceField):
raise TypeError(f'{name} must be a ChoiceField')
return field
class AddLinkForm(forms.Form):
# Basic fields
url = forms.RegexField(
label="URLs (one per line)",
regex=URL_REGEX,
min_length='6',
min_length=6,
strip=True,
widget=forms.Textarea,
required=True
@@ -162,22 +169,22 @@ class AddLinkForm(forms.Form):
extensions = {'twocaptcha', 'istilldontcareaboutcookies', 'ublock'}
# Populate plugin field choices
self.fields['chrome_plugins'].choices = [
get_choice_field(self, 'chrome_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in chrome_dependent
]
self.fields['archiving_plugins'].choices = [
get_choice_field(self, 'archiving_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in archiving
]
self.fields['parsing_plugins'].choices = [
get_choice_field(self, 'parsing_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in parsing
]
self.fields['search_plugins'].choices = [
get_choice_field(self, 'search_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in search
]
self.fields['binary_plugins'].choices = [
get_choice_field(self, 'binary_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in binary
]
self.fields['extension_plugins'].choices = [
get_choice_field(self, 'extension_plugins').choices = [
(p, p) for p in sorted(all_plugins) if p in extensions
]
@@ -185,13 +192,15 @@ class AddLinkForm(forms.Form):
self.fields['update'].initial = not ARCHIVING_CONFIG.ONLY_NEW
def clean(self):
cleaned_data = super().clean()
cleaned_data = super().clean() or {}
# Combine all plugin groups into single list
all_selected_plugins = []
for field in ['chrome_plugins', 'archiving_plugins', 'parsing_plugins',
'search_plugins', 'binary_plugins', 'extension_plugins']:
all_selected_plugins.extend(cleaned_data.get(field, []))
selected = cleaned_data.get(field)
if isinstance(selected, list):
all_selected_plugins.extend(selected)
# Store combined list for easy access
cleaned_data['plugins'] = all_selected_plugins
@@ -211,17 +220,13 @@ class AddLinkForm(forms.Form):
return schedule
class TagWidgetMixin:
class TagWidget(forms.TextInput):
def format_value(self, value):
if value is not None and not isinstance(value, str):
value = edit_string_for_tags(value)
return super().format_value(value)
class TagWidget(TagWidgetMixin, forms.TextInput):
pass
class TagField(forms.CharField):
widget = TagWidget
@@ -234,21 +239,21 @@ class TagField(forms.CharField):
"Please provide a comma-separated list of tags."
)
def has_changed(self, initial_value, data_value):
def has_changed(self, initial, data):
# Always return False if the field is disabled since self.bound_data
# always uses the initial value in this case.
if self.disabled:
return False
try:
data_value = self.clean(data_value)
cleaned_data = self.clean(data)
except forms.ValidationError:
pass
cleaned_data = data
if initial_value is None:
initial_value = []
initial_value = [] if initial is None else initial
initial_value = [tag.name for tag in initial_value]
initial_value.sort()
if not isinstance(initial_value, list):
initial_value = list(initial_value)
return initial_value != data_value
normalized_initial = sorted(tag.name for tag in initial_value)
return normalized_initial != cleaned_data

View File

@@ -2,8 +2,7 @@ __package__ = 'archivebox'
from django.core.management.base import BaseCommand
from .cli import run_subcommand
from archivebox.cli import main as run_cli
class Command(BaseCommand):
@@ -15,4 +14,5 @@ class Command(BaseCommand):
def handle(self, *args, **kwargs):
run_subcommand(kwargs['subcommand'], args=kwargs['command_args'])
command_args = [kwargs['subcommand'], *kwargs['command_args']]
run_cli(args=command_args)

View File

@@ -165,6 +165,8 @@ class ReverseProxyAuthMiddleware(RemoteUserMiddleware):
return
ip = request.META.get('REMOTE_ADDR')
if not isinstance(ip, str):
return
for cidr in SERVER_CONFIG.REVERSE_PROXY_WHITELIST.split(','):
try:

View File

@@ -3,10 +3,11 @@
import archivebox.base_models.models
import django.db.models.deletion
import django.utils.timezone
import uuid
from django.conf import settings
from django.db import migrations, models, connection
from archivebox.uuid_compat import uuid7
def copy_old_fields_to_new(apps, schema_editor):
"""Copy data from old field names to new field names after AddField operations."""
@@ -236,7 +237,7 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='archiveresult',
name='uuid',
field=models.UUIDField(blank=True, db_index=True, default=uuid.uuid7, null=True),
field=models.UUIDField(blank=True, db_index=True, default=uuid7, null=True),
),
migrations.AlterField(
model_name='snapshot',
@@ -246,7 +247,7 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='snapshot',
name='id',
field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
migrations.AlterField(
model_name='snapshot',

View File

@@ -1,8 +1,9 @@
# Generated by Django 6.0 on 2026-01-02 10:02
import uuid
from django.db import migrations, models
from archivebox.uuid_compat import uuid7
class Migration(migrations.Migration):
@@ -14,6 +15,6 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='archiveresult',
name='id',
field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
]

View File

@@ -2,9 +2,10 @@
import django.db.models.deletion
import django.utils.timezone
import uuid
from django.db import migrations, models
from archivebox.uuid_compat import uuid7
class Migration(migrations.Migration):
@@ -16,7 +17,7 @@ class Migration(migrations.Migration):
migrations.CreateModel(
name='Process',
fields=[
('id', models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True)),
('id', models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True)),
('created_at', models.DateTimeField(db_index=True, default=django.utils.timezone.now)),
('modified_at', models.DateTimeField(auto_now=True)),
('pwd', models.CharField(blank=True, default='', help_text='Working directory for process execution', max_length=512)),

View File

@@ -84,6 +84,7 @@ class TestMachineModel(TestCase):
result = Machine.from_json(record)
self.assertIsNotNone(result)
assert result is not None
self.assertEqual(result.config.get('WGET_BINARY'), '/usr/bin/wget')
def test_machine_from_jsonl_invalid(self):
@@ -179,6 +180,7 @@ class TestBinaryModel(TestCase):
result = Binary.objects.get_valid_binary('wget')
self.assertIsNotNone(result)
assert result is not None
self.assertEqual(result.abspath, '/usr/bin/wget')
def test_binary_update_and_requeue(self):
@@ -209,6 +211,8 @@ class TestBinaryModel(TestCase):
'overrides': overrides,
})
self.assertIsNotNone(binary)
assert binary is not None
self.assertEqual(binary.overrides, overrides)
def test_binary_from_json_does_not_coerce_legacy_override_shapes(self):
@@ -224,6 +228,8 @@ class TestBinaryModel(TestCase):
'overrides': overrides,
})
self.assertIsNotNone(binary)
assert binary is not None
self.assertEqual(binary.overrides, overrides)
def test_binary_from_json_prefers_published_readability_package(self):
@@ -238,6 +244,8 @@ class TestBinaryModel(TestCase):
},
})
self.assertIsNotNone(binary)
assert binary is not None
self.assertEqual(
binary.overrides,
{
@@ -265,7 +273,7 @@ class TestBinaryStateMachine(TestCase):
def test_binary_state_machine_initial_state(self):
"""BinaryMachine should start in queued state."""
sm = BinaryMachine(self.binary)
self.assertEqual(sm.current_state.value, Binary.StatusChoices.QUEUED)
self.assertEqual(sm.current_state_value, Binary.StatusChoices.QUEUED)
def test_binary_state_machine_can_start(self):
"""BinaryMachine.can_start() should check name and binproviders."""
@@ -604,7 +612,7 @@ class TestProcessStateMachine(TestCase):
def test_process_state_machine_initial_state(self):
"""ProcessMachine should start in queued state."""
sm = ProcessMachine(self.process)
self.assertEqual(sm.current_state.value, Process.StatusChoices.QUEUED)
self.assertEqual(sm.current_state_value, Process.StatusChoices.QUEUED)
def test_process_state_machine_can_start(self):
"""ProcessMachine.can_start() should check cmd and machine."""

View File

@@ -8,7 +8,7 @@ Click command metadata. Handles JSON-RPC 2.0 requests over stdio transport.
import sys
import json
import traceback
from typing import Optional
from typing import Any, Optional
import click
from click.testing import CliRunner
@@ -19,25 +19,25 @@ from archivebox.config.version import VERSION
class MCPJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles Click sentinel values and other special types"""
def default(self, obj):
def default(self, o):
# Handle Click's sentinel values
if hasattr(click, 'core') and hasattr(click.core, '_SentinelClass'):
if isinstance(obj, click.core._SentinelClass):
sentinel_type = getattr(click.core, '_SentinelClass', None)
if isinstance(sentinel_type, type) and isinstance(o, sentinel_type):
return None
# Handle tuples (convert to lists)
if isinstance(obj, tuple):
return list(obj)
if isinstance(o, tuple):
return list(o)
# Handle any other non-serializable objects
try:
return super().default(obj)
return super().default(o)
except TypeError:
return str(obj)
return str(o)
# Type mapping from Click types to JSON Schema types
def click_type_to_json_schema_type(click_type) -> dict:
def click_type_to_json_schema_type(click_type: click.ParamType) -> dict[str, Any]:
"""Convert a Click parameter type to JSON Schema type definition"""
if isinstance(click_type, click.types.StringParamType):
@@ -49,7 +49,7 @@ def click_type_to_json_schema_type(click_type) -> dict:
elif isinstance(click_type, click.types.BoolParamType):
return {"type": "boolean"}
elif isinstance(click_type, click.types.Choice):
return {"type": "string", "enum": click_type.choices}
return {"type": "string", "enum": list(click_type.choices)}
elif isinstance(click_type, click.types.Path):
return {"type": "string", "description": "File or directory path"}
elif isinstance(click_type, click.types.File):
@@ -62,7 +62,7 @@ def click_type_to_json_schema_type(click_type) -> dict:
return {"type": "string"}
def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict:
def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict[str, Any]:
"""
Convert a Click command to an MCP tool definition with JSON Schema.
@@ -70,20 +70,21 @@ def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> di
the input schema without manual definition.
"""
properties = {}
required = []
properties: dict[str, dict[str, Any]] = {}
required: list[str] = []
# Extract parameters from Click command
for param in click_command.params:
# Skip internal parameters
if param.name in ('help', 'version'):
if param.name is None or param.name in ('help', 'version'):
continue
param_schema = click_type_to_json_schema_type(param.type)
# Add description from Click help text
if param.help:
param_schema["description"] = param.help
help_text = getattr(param, 'help', None)
if help_text:
param_schema["description"] = help_text
# Handle default values
if param.default is not None and param.default != ():
@@ -248,7 +249,7 @@ class MCPServer:
if cmd_name not in self._tool_cache:
if cmd_name not in self.cli_group.all_subcommands:
return None
self._tool_cache[cmd_name] = self.cli_group.get_command(None, cmd_name)
self._tool_cache[cmd_name] = self.cli_group.get_command(click.Context(self.cli_group), cmd_name)
return self._tool_cache[cmd_name]
def handle_initialize(self, params: dict) -> dict:

View File

@@ -6,7 +6,7 @@ __package__ = 'archivebox.misc'
from io import StringIO
from pathlib import Path
from typing import List, Tuple
from typing import Any, List, Tuple
from archivebox.config import DATA_DIR
from archivebox.misc.util import enforce_types
@@ -48,8 +48,8 @@ def apply_migrations(out_dir: Path = DATA_DIR) -> List[str]:
@enforce_types
def get_admins(out_dir: Path = DATA_DIR) -> List:
def get_admins(out_dir: Path = DATA_DIR) -> List[Any]:
"""Get list of superuser accounts"""
from django.contrib.auth.models import User
return User.objects.filter(is_superuser=True).exclude(username='system')
return list(User.objects.filter(is_superuser=True).exclude(username='system'))

View File

@@ -14,7 +14,7 @@ from pathlib import Path
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Any, Optional, List, Dict, Union, Iterable, IO, TYPE_CHECKING
from typing import Any, Optional, List, Dict, Union, Iterable, IO, TYPE_CHECKING, cast
if TYPE_CHECKING:
from archivebox.core.models import Snapshot
@@ -397,7 +397,8 @@ def log_list_finished(snapshots):
from archivebox.core.models import Snapshot
print()
print('---------------------------------------------------------------------------------------------------')
print(Snapshot.objects.filter(pk__in=[s.pk for s in snapshots]).to_csv(cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | '))
csv_queryset = cast(Any, Snapshot.objects.filter(pk__in=[s.pk for s in snapshots]))
print(csv_queryset.to_csv(cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | '))
print('---------------------------------------------------------------------------------------------------')
print()

View File

@@ -13,7 +13,7 @@ django_stubs_ext.monkeypatch()
# monkey patch django timezone to add back utc (it was removed in Django 5.0)
timezone.utc = datetime.timezone.utc
setattr(timezone, 'utc', datetime.timezone.utc)
# monkey patch django-signals-webhooks to change how it shows up in Admin UI
# from signal_webhooks.apps import DjangoSignalWebhooksConfig

View File

@@ -13,12 +13,17 @@ class AccelleratedPaginator(Paginator):
@cached_property
def count(self):
if self.object_list._has_filters(): # type: ignore
has_filters = getattr(self.object_list, '_has_filters', None)
if callable(has_filters) and has_filters():
# fallback to normal count method on filtered queryset
return super().count
else:
# otherwise count total rows in a separate fast query
return self.object_list.model.objects.count()
model = getattr(self.object_list, 'model', None)
if model is None:
return super().count
# otherwise count total rows in a separate fast query
return model.objects.count()
# Alternative approach for PostgreSQL: fallback count takes > 200ms
# from django.db import connection, transaction, OperationalError

View File

@@ -17,7 +17,7 @@ from collections import deque
from pathlib import Path
from rich import box
from rich.console import Group
from rich.console import Group, RenderableType
from rich.layout import Layout
from rich.columns import Columns
from rich.panel import Panel
@@ -48,7 +48,7 @@ class CrawlQueuePanel:
self.max_crawl_workers = 8
self.crawl_id: Optional[str] = None
def __rich__(self) -> Panel:
def __rich__(self) -> RenderableType:
grid = Table.grid(expand=True)
grid.add_column(justify="left", ratio=1)
grid.add_column(justify="center", ratio=1)
@@ -104,7 +104,7 @@ class ProcessLogPanel:
self.compact = compact
self.bg_terminating = bg_terminating
def __rich__(self) -> Panel:
def __rich__(self) -> RenderableType:
completed_line = self._completed_output_line()
if completed_line:
style = "green" if self._completed_ok() else "yellow"

View File

@@ -111,7 +111,7 @@ def _render_markdown_fallback(text: str) -> str:
return _markdown.markdown(
text,
extensions=["extra", "toc", "sane_lists"],
output_format="html5",
output_format="html",
)
except Exception:
pass

View File

@@ -9,13 +9,14 @@ import sys
from json import dump
from pathlib import Path
from typing import Optional, Union, Tuple
from subprocess import _mswindows, PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired
from subprocess import PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired
from atomicwrites import atomic_write as lib_atomic_write
from archivebox.config.common import STORAGE_CONFIG
from archivebox.misc.util import enforce_types, ExtendedEncoder
IS_WINDOWS = os.name == 'nt'
def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False, text=False, start_new_session=True, **kwargs):
"""Patched of subprocess.run to kill forked child subprocesses and fix blocking io making timeout=innefective
@@ -47,13 +48,15 @@ def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False,
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired as exc:
process.kill()
if _mswindows:
if IS_WINDOWS:
# Windows accumulates the output in a single blocking
# read() call run on child threads, with the timeout
# being done in a join() on those threads. communicate()
# _after_ kill() is required to collect that and add it
# to the exception.
exc.stdout, exc.stderr = process.communicate()
timed_out_stdout, timed_out_stderr = process.communicate()
exc.stdout = timed_out_stdout.encode() if isinstance(timed_out_stdout, str) else timed_out_stdout
exc.stderr = timed_out_stderr.encode() if isinstance(timed_out_stderr, str) else timed_out_stderr
else:
# POSIX _communicate already populated the output so
# far into the TimeoutExpired exception.
@@ -71,11 +74,12 @@ def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False,
finally:
# force kill any straggler subprocesses that were forked from the main proc
try:
os.killpg(pgid, signal.SIGINT)
if pgid is not None:
os.killpg(pgid, signal.SIGINT)
except Exception:
pass
return CompletedProcess(process.args, retcode, stdout, stderr)
return CompletedProcess(process.args, retcode or 0, stdout, stderr)
@enforce_types

View File

@@ -42,7 +42,7 @@ def convert(ini_str: str) -> str:
"""Convert a string of INI config into its TOML equivalent (warning: strips comments)"""
config = configparser.ConfigParser()
config.optionxform = str # capitalize key names
setattr(config, 'optionxform', str) # capitalize key names
config.read_string(ini_str)
# Initialize an empty dictionary to store the TOML representation
@@ -77,12 +77,12 @@ class JSONSchemaWithLambdas(GenerateJsonSchema):
Usage:
>>> json.dumps(value, encoder=JSONSchemaWithLambdas())
"""
def encode_default(self, default: Any) -> Any:
def encode_default(self, dft: Any) -> Any:
config = self._config
if isinstance(default, Callable):
return '{{lambda ' + inspect.getsource(default).split('=lambda ')[-1].strip()[:-1] + '}}'
if isinstance(dft, Callable):
return '{{lambda ' + inspect.getsource(dft).split('=lambda ')[-1].strip()[:-1] + '}}'
return to_jsonable_python(
default,
dft,
timedelta_mode=config.ser_json_timedelta,
bytes_mode=config.ser_json_bytes,
serialize_unknown=True

View File

@@ -56,9 +56,19 @@ urldecode = lambda s: s and unquote(s)
htmlencode = lambda s: s and escape(s, quote=True)
htmldecode = lambda s: s and unescape(s)
short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
ts_to_date_str = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
def short_ts(ts: Any) -> str | None:
parsed = parse_date(ts)
return None if parsed is None else str(parsed.timestamp()).split('.')[0]
def ts_to_date_str(ts: Any) -> str | None:
parsed = parse_date(ts)
return None if parsed is None else parsed.strftime('%Y-%m-%d %H:%M')
def ts_to_iso(ts: Any) -> str | None:
parsed = parse_date(ts)
return None if parsed is None else parsed.isoformat()
COLOR_REGEX = re.compile(r'\[(?P<arg_1>\d+)(;(?P<arg_2>\d+)(;(?P<arg_3>\d+))?)?m')
@@ -175,7 +185,7 @@ def docstring(text: Optional[str]):
@enforce_types
def str_between(string: str, start: str, end: str=None) -> str:
def str_between(string: str, start: str, end: str | None = None) -> str:
"""(<abc>12345</def>, <abc>, </def>) -> 12345"""
content = string.split(start, 1)[-1]
@@ -186,7 +196,7 @@ def str_between(string: str, start: str, end: str=None) -> str:
@enforce_types
def parse_date(date: Any) -> datetime:
def parse_date(date: Any) -> datetime | None:
"""Parse unix timestamps, iso format, and human-readable strings"""
if date is None:
@@ -196,20 +206,24 @@ def parse_date(date: Any) -> datetime:
if date.tzinfo is None:
return date.replace(tzinfo=timezone.utc)
assert date.tzinfo.utcoffset(datetime.now()).seconds == 0, 'Refusing to load a non-UTC date!'
offset = date.utcoffset()
assert offset == datetime.now(timezone.utc).utcoffset(), 'Refusing to load a non-UTC date!'
return date
if isinstance(date, (float, int)):
date = str(date)
if isinstance(date, str):
return dateparser(date, settings={'TIMEZONE': 'UTC'}).astimezone(timezone.utc)
parsed_date = dateparser(date, settings={'TIMEZONE': 'UTC'})
if parsed_date is None:
raise ValueError(f'Tried to parse invalid date string! {date}')
return parsed_date.astimezone(timezone.utc)
raise ValueError('Tried to parse invalid date! {}'.format(date))
@enforce_types
def download_url(url: str, timeout: int=None) -> str:
def download_url(url: str, timeout: int | None = None) -> str:
"""Download the contents of a remote url and return the text"""
from archivebox.config.common import ARCHIVING_CONFIG
@@ -221,7 +235,8 @@ def download_url(url: str, timeout: int=None) -> str:
cookie_jar = http.cookiejar.MozillaCookieJar(ARCHIVING_CONFIG.COOKIES_FILE)
cookie_jar.load(ignore_discard=True, ignore_expires=True)
for cookie in cookie_jar:
session.cookies.set(cookie.name, cookie.value, domain=cookie.domain, path=cookie.path)
if cookie.value is not None:
session.cookies.set(cookie.name, cookie.value, domain=cookie.domain, path=cookie.path)
response = session.get(
url,
@@ -331,47 +346,47 @@ class ExtendedEncoder(pyjson.JSONEncoder):
fields and objects
"""
def default(self, obj):
cls_name = obj.__class__.__name__
def default(self, o):
cls_name = o.__class__.__name__
if hasattr(obj, '_asdict'):
return obj._asdict()
if hasattr(o, '_asdict'):
return o._asdict()
elif isinstance(obj, bytes):
return obj.decode()
elif isinstance(o, bytes):
return o.decode()
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(o, datetime):
return o.isoformat()
elif isinstance(obj, Exception):
return '{}: {}'.format(obj.__class__.__name__, obj)
elif isinstance(o, Exception):
return '{}: {}'.format(o.__class__.__name__, o)
elif isinstance(obj, Path):
return str(obj)
elif isinstance(o, Path):
return str(o)
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
return list(obj)
return list(o)
elif isinstance(obj, Callable):
return str(obj)
elif isinstance(o, Callable):
return str(o)
# Try dict/list conversion as fallback
try:
return dict(obj)
return dict(o)
except Exception:
pass
try:
return list(obj)
return list(o)
except Exception:
pass
try:
return str(obj)
return str(o)
except Exception:
pass
return pyjson.JSONEncoder.default(self, obj)
return pyjson.JSONEncoder.default(self, o)
@enforce_types

View File

@@ -1,3 +0,0 @@
[mypy]
plugins =
mypy_django_plugin.main

View File

@@ -1,8 +1,9 @@
# Generated by Django 6.0 on 2026-01-05 01:09
import uuid
from django.db import migrations, models
from archivebox.uuid_compat import uuid7
class Migration(migrations.Migration):
@@ -14,6 +15,6 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='persona',
name='id',
field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
field=models.UUIDField(default=uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
]

View File

@@ -400,13 +400,13 @@ def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str])
# Test Data Factories
# =============================================================================
def create_test_url(domain: str = 'example.com', path: str = None) -> str:
def create_test_url(domain: str = 'example.com', path: str | None = None) -> str:
"""Generate unique test URL."""
path = path or uuid7().hex[:8]
return f'https://{domain}/{path}'
def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
def create_test_crawl_json(urls: List[str] | None = None, **kwargs) -> Dict[str, Any]:
"""Create Crawl JSONL record for testing."""
urls = urls or [create_test_url()]
return {
@@ -419,7 +419,7 @@ def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
}
def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
def create_test_snapshot_json(url: str | None = None, **kwargs) -> Dict[str, Any]:
"""Create Snapshot JSONL record for testing."""
return {
'type': 'Snapshot',

View File

@@ -967,7 +967,7 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]:
# Helper Functions
# =============================================================================
def run_archivebox(data_dir: Path, args: list, timeout: int = 60, env: dict = None) -> subprocess.CompletedProcess:
def run_archivebox(data_dir: Path, args: list, timeout: int = 60, env: dict | None = None) -> subprocess.CompletedProcess:
"""Run archivebox command in subprocess with given data directory."""
base_env = os.environ.copy()
base_env['DATA_DIR'] = str(data_dir)

View File

@@ -1,166 +0,0 @@
import os
import sqlite3
import subprocess
def test_depth_flag_is_accepted(process, disable_extractors_dict):
arg_process = subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
assert 'unrecognized arguments: --depth' not in arg_process.stderr.decode("utf-8")
def test_depth_flag_fails_if_it_is_not_0_or_1(process, disable_extractors_dict):
arg_process = subprocess.run(
["archivebox", "add", "--index-only", "--depth=5", "https://example.com"],
capture_output=True,
env=disable_extractors_dict,
)
# Error message may say "invalid choice" or "is not one of"
stderr = arg_process.stderr.decode("utf-8")
assert 'invalid' in stderr.lower() or 'not one of' in stderr.lower()
arg_process = subprocess.run(
["archivebox", "add", "--index-only", "--depth=-1", "https://example.com"],
capture_output=True,
env=disable_extractors_dict,
)
stderr = arg_process.stderr.decode("utf-8")
assert 'invalid' in stderr.lower() or 'not one of' in stderr.lower()
def test_depth_flag_0_creates_source_file(tmp_path, process, disable_extractors_dict):
os.chdir(tmp_path)
subprocess.run(
["archivebox", "add", "--index-only", "--depth=0", "https://example.com"],
capture_output=True,
env=disable_extractors_dict,
)
# Check that source file was created with the URL
sources_dir = tmp_path / "sources"
assert sources_dir.exists()
source_files = list(sources_dir.glob("*cli_add.txt"))
assert len(source_files) >= 1
source_content = source_files[0].read_text()
assert "example.com" in source_content
def test_overwrite_flag_is_accepted(process, disable_extractors_dict):
subprocess.run(
["archivebox", "add", "--index-only", "--depth=0", "https://example.com"],
capture_output=True,
env=disable_extractors_dict,
)
arg_process = subprocess.run(
["archivebox", "add", "--index-only", "--overwrite", "https://example.com"],
capture_output=True,
env=disable_extractors_dict,
)
assert 'unrecognized arguments: --overwrite' not in arg_process.stderr.decode("utf-8")
def test_add_creates_crawl_in_database(tmp_path, process, disable_extractors_dict):
os.chdir(tmp_path)
subprocess.run(
["archivebox", "add", "--index-only", "--depth=0", "https://example.com"],
capture_output=True,
env=disable_extractors_dict,
)
# Check that a Crawl was created in database
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
count = c.execute("SELECT COUNT(*) FROM crawls_crawl").fetchone()[0]
conn.close()
assert count >= 1
def test_add_with_tags(tmp_path, process, disable_extractors_dict):
"""Test adding URL with tags."""
os.chdir(tmp_path)
subprocess.run(
["archivebox", "add", "--index-only", "--depth=0", "--tag=test,example", "https://example.com"],
capture_output=True,
env=disable_extractors_dict,
)
# Check that tags were created in database
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
tags = c.execute("SELECT name FROM core_tag").fetchall()
conn.close()
tag_names = [t[0] for t in tags]
assert 'test' in tag_names or 'example' in tag_names
def test_add_multiple_urls_single_call(tmp_path, process, disable_extractors_dict):
"""Test adding multiple URLs in a single call creates multiple snapshots."""
os.chdir(tmp_path)
subprocess.run(
["archivebox", "add", "--index-only", "--depth=0",
"https://example.com", "https://example.org"],
capture_output=True,
env=disable_extractors_dict,
)
# Check both URLs are in the source file
sources_dir = tmp_path / "sources"
source_files = list(sources_dir.glob("*cli_add.txt"))
assert len(source_files) >= 1
source_content = source_files[0].read_text()
assert "example.com" in source_content
assert "example.org" in source_content
def test_add_from_file(tmp_path, process, disable_extractors_dict):
"""Test adding URLs from a file."""
os.chdir(tmp_path)
# Create a file with URLs
urls_file = tmp_path / "urls.txt"
urls_file.write_text("https://example.com\nhttps://example.org\n")
subprocess.run(
["archivebox", "add", "--index-only", "--depth=0", str(urls_file)],
capture_output=True,
env=disable_extractors_dict,
)
# Check that a Crawl was created
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
count = c.execute("SELECT COUNT(*) FROM crawls_crawl").fetchone()[0]
conn.close()
assert count >= 1
class TestAddCLI:
"""Test the CLI interface for add command."""
def test_add_help(self, tmp_path, process):
"""Test that --help works for add command."""
os.chdir(tmp_path)
result = subprocess.run(
["archivebox", "add", "--help"],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert '--depth' in result.stdout or 'depth' in result.stdout
assert '--tag' in result.stdout or 'tag' in result.stdout
def test_add_no_args_shows_help(self, tmp_path, process):
"""Test that add with no args shows help or usage."""
os.chdir(tmp_path)
result = subprocess.run(
["archivebox", "add"],
capture_output=True,
text=True,
)
# Should either show help or error about missing URL
combined = result.stdout + result.stderr
assert 'usage' in combined.lower() or 'url' in combined.lower() or 'add' in combined.lower()

View File

@@ -9,9 +9,11 @@ Tests cover:
"""
import pytest
from typing import cast
from django.test import override_settings
from django.urls import reverse
from django.contrib.auth import get_user_model
from django.contrib.auth.models import UserManager
pytestmark = pytest.mark.django_db
@@ -24,7 +26,7 @@ PUBLIC_HOST = 'public.archivebox.localhost:8000'
@pytest.fixture
def admin_user(db):
"""Create admin user for tests."""
return User.objects.create_superuser(
return cast(UserManager, User.objects).create_superuser(
username='testadmin',
email='admin@test.com',
password='testpassword'

View File

@@ -7,6 +7,21 @@ Verify add creates snapshots in DB, crawls, source files, and archive directorie
import os
import sqlite3
import subprocess
from pathlib import Path
def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
candidates = {snapshot_id}
if len(snapshot_id) == 32:
candidates.add(f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}")
elif len(snapshot_id) == 36 and '-' in snapshot_id:
candidates.add(snapshot_id.replace('-', ''))
for needle in candidates:
for path in data_dir.rglob(needle):
if path.is_dir():
return path
return None
def test_add_single_url_creates_snapshot_in_db(tmp_path, process, disable_extractors_dict):
@@ -144,6 +159,21 @@ def test_add_with_depth_1_flag(tmp_path, process, disable_extractors_dict):
assert 'unrecognized arguments: --depth' not in result.stderr.decode('utf-8')
def test_add_rejects_invalid_depth_values(tmp_path, process, disable_extractors_dict):
"""Test that add rejects depth values outside the supported range."""
os.chdir(tmp_path)
for depth in ('5', '-1'):
result = subprocess.run(
['archivebox', 'add', '--index-only', f'--depth={depth}', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
stderr = result.stderr.decode('utf-8').lower()
assert result.returncode != 0
assert 'invalid' in stderr or 'not one of' in stderr
def test_add_with_tags(tmp_path, process, disable_extractors_dict):
"""Test adding URL with tags stores tags_str in crawl.
@@ -245,11 +275,8 @@ def test_add_with_overwrite_flag(tmp_path, process, disable_extractors_dict):
assert 'unrecognized arguments: --overwrite' not in result.stderr.decode('utf-8')
def test_add_creates_archive_subdirectory(tmp_path, process, disable_extractors_dict):
"""Test that add creates archive subdirectory for the snapshot.
Archive subdirectories are named by timestamp, not by snapshot ID.
"""
def test_add_creates_snapshot_output_directory(tmp_path, process, disable_extractors_dict):
"""Test that add creates the current snapshot output directory on disk."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
@@ -257,16 +284,44 @@ def test_add_creates_archive_subdirectory(tmp_path, process, disable_extractors_
env=disable_extractors_dict,
)
# Get the snapshot timestamp from the database
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
timestamp = c.execute("SELECT timestamp FROM core_snapshot").fetchone()[0]
snapshot_id = str(c.execute("SELECT id FROM core_snapshot").fetchone()[0])
conn.close()
# Check that archive subdirectory was created using timestamp
archive_dir = tmp_path / "archive" / str(timestamp)
assert archive_dir.exists()
assert archive_dir.is_dir()
snapshot_dir = _find_snapshot_dir(tmp_path, snapshot_id)
assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id}"
assert snapshot_dir.is_dir()
def test_add_help_shows_depth_and_tag_options(tmp_path, process):
"""Test that add --help documents the main filter and crawl options."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'add', '--help'],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert '--depth' in result.stdout
assert '--tag' in result.stdout
def test_add_without_args_shows_usage(tmp_path, process):
"""Test that add without URLs fails with a usage hint instead of crashing."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'add'],
capture_output=True,
text=True,
)
combined = result.stdout + result.stderr
assert result.returncode != 0
assert 'usage' in combined.lower() or 'url' in combined.lower()
def test_add_index_only_skips_extraction(tmp_path, process, disable_extractors_dict):

View File

@@ -241,3 +241,24 @@ def test_init_output_shows_collection_info(tmp_path):
output = result.stdout
# Should show some helpful info about the collection
assert 'ArchiveBox' in output or 'collection' in output.lower() or 'Initializing' in output
def test_init_ignores_unrecognized_archive_directories(tmp_path, process, disable_extractors_dict):
"""Test that init upgrades existing dirs without choking on extra folders."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
(tmp_path / "archive" / "some_random_folder").mkdir(parents=True, exist_ok=True)
result = subprocess.run(
['archivebox', 'init'],
capture_output=True,
text=True,
env=disable_extractors_dict,
)
assert result.returncode == 0, result.stdout + result.stderr

View File

@@ -93,6 +93,59 @@ def test_install_shows_binary_status(tmp_path, process):
assert len(output) > 50
def test_install_dry_run_prints_dry_run_message(tmp_path, process):
"""Test that install --dry-run clearly reports that no changes will be made."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'install', '--dry-run'],
capture_output=True,
text=True,
timeout=60,
)
assert result.returncode == 0
assert 'dry run' in result.stdout.lower()
def test_install_help_lists_dry_run_flag(tmp_path):
"""Test that install --help documents the dry-run option."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'install', '--help'],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert '--dry-run' in result.stdout or '-d' in result.stdout
def test_install_invalid_option_fails(tmp_path):
"""Test that invalid install options fail cleanly."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'install', '--invalid-option'],
capture_output=True,
text=True,
)
assert result.returncode != 0
def test_install_from_empty_dir_initializes_collection(tmp_path):
"""Test that install bootstraps an empty dir before performing work."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'install', '--dry-run'],
capture_output=True,
text=True,
)
output = result.stdout + result.stderr
assert result.returncode == 0
assert 'Initializing' in output or 'Dry run' in output or 'init' in output.lower()
def test_install_updates_binary_table(tmp_path, process):
"""Test that install completes and only mutates dependency state."""
os.chdir(tmp_path)

View File

@@ -0,0 +1,146 @@
#!/usr/bin/env python3
"""
Tests for archivebox list command.
Verify list emits snapshot JSONL and applies the documented filters.
"""
import json
import os
import sqlite3
import subprocess
def _parse_jsonl(stdout: str) -> list[dict]:
return [
json.loads(line)
for line in stdout.splitlines()
if line.strip().startswith('{')
]
def test_list_outputs_existing_snapshots_as_jsonl(tmp_path, process, disable_extractors_dict):
"""Test that list prints one JSON object per stored snapshot."""
os.chdir(tmp_path)
for url in ['https://example.com', 'https://iana.org']:
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', url],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
result = subprocess.run(
['archivebox', 'list'],
capture_output=True,
text=True,
timeout=30,
)
rows = _parse_jsonl(result.stdout)
urls = {row['url'] for row in rows}
assert result.returncode == 0, result.stderr
assert 'https://example.com' in urls
assert 'https://iana.org' in urls
def test_list_filters_by_url_icontains(tmp_path, process, disable_extractors_dict):
"""Test that list --url__icontains returns only matching snapshots."""
os.chdir(tmp_path)
for url in ['https://example.com', 'https://iana.org']:
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', url],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
result = subprocess.run(
['archivebox', 'list', '--url__icontains', 'example.com'],
capture_output=True,
text=True,
timeout=30,
)
rows = _parse_jsonl(result.stdout)
assert result.returncode == 0, result.stderr
assert len(rows) == 1
assert rows[0]['url'] == 'https://example.com'
def test_list_filters_by_crawl_id_and_limit(tmp_path, process, disable_extractors_dict):
"""Test that crawl-id and limit filters constrain the result set."""
os.chdir(tmp_path)
for url in ['https://example.com', 'https://iana.org']:
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', url],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
crawl_id = str(c.execute(
"SELECT crawl_id FROM core_snapshot WHERE url = ?",
('https://example.com',),
).fetchone()[0])
conn.close()
result = subprocess.run(
['archivebox', 'list', '--crawl-id', crawl_id, '--limit', '1'],
capture_output=True,
text=True,
timeout=30,
)
rows = _parse_jsonl(result.stdout)
assert result.returncode == 0, result.stderr
assert len(rows) == 1
assert rows[0]['crawl_id'].replace('-', '') == crawl_id.replace('-', '')
assert rows[0]['url'] == 'https://example.com'
def test_list_filters_by_status(tmp_path, process, disable_extractors_dict):
"""Test that list can filter using the current snapshot status."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
status = c.execute("SELECT status FROM core_snapshot LIMIT 1").fetchone()[0]
conn.close()
result = subprocess.run(
['archivebox', 'list', '--status', status],
capture_output=True,
text=True,
timeout=30,
)
rows = _parse_jsonl(result.stdout)
assert result.returncode == 0, result.stderr
assert len(rows) == 1
assert rows[0]['status'] == status
def test_list_help_lists_filter_options(tmp_path, process):
"""Test that list --help documents the supported filter flags."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'list', '--help'],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0
assert '--url__icontains' in result.stdout
assert '--crawl-id' in result.stdout
assert '--limit' in result.stdout

View File

@@ -7,6 +7,21 @@ Verify remove deletes snapshots from DB and filesystem.
import os
import sqlite3
import subprocess
from pathlib import Path
def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
candidates = {snapshot_id}
if len(snapshot_id) == 32:
candidates.add(f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}")
elif len(snapshot_id) == 36 and '-' in snapshot_id:
candidates.add(snapshot_id.replace('-', ''))
for needle in candidates:
for path in data_dir.rglob(needle):
if path.is_dir():
return path
return None
def test_remove_deletes_snapshot_from_db(tmp_path, process, disable_extractors_dict):
@@ -44,10 +59,7 @@ def test_remove_deletes_snapshot_from_db(tmp_path, process, disable_extractors_d
def test_remove_deletes_archive_directory(tmp_path, process, disable_extractors_dict):
"""Test that remove deletes the archive directory when using --delete flag.
Archive directories are named by timestamp, not by snapshot ID.
"""
"""Test that remove --delete removes the current snapshot output directory."""
os.chdir(tmp_path)
# Add a snapshot
@@ -57,24 +69,21 @@ def test_remove_deletes_archive_directory(tmp_path, process, disable_extractors_
env=disable_extractors_dict,
)
# Get snapshot timestamp
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
timestamp = c.execute("SELECT timestamp FROM core_snapshot").fetchone()[0]
snapshot_id = str(c.execute("SELECT id FROM core_snapshot").fetchone()[0])
conn.close()
archive_dir = tmp_path / "archive" / str(timestamp)
assert archive_dir.exists()
snapshot_dir = _find_snapshot_dir(tmp_path, snapshot_id)
assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id}"
# Remove snapshot with --delete to remove both DB record and directory
subprocess.run(
['archivebox', 'remove', 'https://example.com', '--yes', '--delete'],
capture_output=True,
env=disable_extractors_dict,
)
# Archive directory should be deleted
assert not archive_dir.exists()
assert not snapshot_dir.exists()
def test_remove_yes_flag_skips_confirmation(tmp_path, process, disable_extractors_dict):
@@ -158,6 +167,35 @@ def test_remove_with_filter(tmp_path, process, disable_extractors_dict):
assert result.returncode in [0, 1, 2]
def test_remove_with_regex_filter_deletes_all_matches(tmp_path, process, disable_extractors_dict):
"""Test regex filters remove every matching snapshot."""
os.chdir(tmp_path)
for url in ['https://example.com', 'https://iana.org']:
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', url],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
result = subprocess.run(
['archivebox', 'remove', '--filter-type=regex', '.*', '--yes'],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
count_after = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
conn.close()
output = result.stdout.decode("utf-8") + result.stderr.decode("utf-8")
assert count_after == 0
assert 'Removed' in output or 'Found' in output
def test_remove_nonexistent_url_fails_gracefully(tmp_path, process, disable_extractors_dict):
"""Test that removing non-existent URL fails gracefully."""
os.chdir(tmp_path)
@@ -169,7 +207,8 @@ def test_remove_nonexistent_url_fails_gracefully(tmp_path, process, disable_extr
)
# Should fail or show error
assert result.returncode != 0 or 'not found' in result.stdout.lower() or 'no matches' in result.stdout.lower()
stdout_text = result.stdout.decode('utf-8', errors='replace').lower()
assert result.returncode != 0 or 'not found' in stdout_text or 'no matches' in stdout_text
def test_remove_reports_remaining_link_count_correctly(tmp_path, process, disable_extractors_dict):

View File

@@ -4,6 +4,7 @@ Tests for archivebox search command.
Verify search queries snapshots from DB.
"""
import json
import os
import subprocess
@@ -65,3 +66,145 @@ def test_search_on_empty_archive(tmp_path, process):
# Should complete without error
assert result.returncode in [0, 1]
def test_search_json_outputs_matching_snapshots(tmp_path, process, disable_extractors_dict):
"""Test that search --json returns parseable matching snapshot rows."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
result = subprocess.run(
['archivebox', 'search', '--json'],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0, result.stderr
payload = json.loads(result.stdout)
assert any('example.com' in row.get('url', '') for row in payload)
def test_search_json_with_headers_wraps_links_payload(tmp_path, process, disable_extractors_dict):
"""Test that search --json --with-headers returns a headers envelope."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
result = subprocess.run(
['archivebox', 'search', '--json', '--with-headers'],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0, result.stderr
payload = json.loads(result.stdout)
links = payload.get('links', payload)
assert any('example.com' in row.get('url', '') for row in links)
def test_search_html_outputs_markup(tmp_path, process, disable_extractors_dict):
"""Test that search --html renders an HTML response."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
result = subprocess.run(
['archivebox', 'search', '--html'],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0, result.stderr
assert '<' in result.stdout
def test_search_csv_outputs_requested_column(tmp_path, process, disable_extractors_dict):
"""Test that search --csv emits the requested fields."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
result = subprocess.run(
['archivebox', 'search', '--csv', 'url', '--with-headers'],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0, result.stderr
assert 'url' in result.stdout
assert 'example.com' in result.stdout
def test_search_with_headers_requires_structured_output_format(tmp_path, process):
"""Test that --with-headers is rejected without --json, --html, or --csv."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'search', '--with-headers'],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode != 0
assert 'requires' in result.stderr.lower() or 'json' in result.stderr.lower()
def test_search_sort_option_runs_successfully(tmp_path, process, disable_extractors_dict):
"""Test that search --sort accepts sortable fields."""
os.chdir(tmp_path)
for url in ['https://iana.org', 'https://example.com']:
subprocess.run(
['archivebox', 'add', '--index-only', '--depth=0', url],
capture_output=True,
env=disable_extractors_dict,
check=True,
)
result = subprocess.run(
['archivebox', 'search', '--csv', 'url', '--sort=url'],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0, result.stderr
assert 'example.com' in result.stdout or 'iana.org' in result.stdout
def test_search_help_lists_supported_filters(tmp_path, process):
"""Test that search --help documents the available filters and output modes."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'search', '--help'],
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0
assert '--filter-type' in result.stdout or '-f' in result.stdout
assert '--status' in result.stdout
assert '--sort' in result.stdout

View File

@@ -202,3 +202,24 @@ def test_status_shows_index_file_info(tmp_path, process):
# Should mention index
assert 'index' in result.stdout.lower() or 'Index' in result.stdout
def test_status_help_lists_available_options(tmp_path, process):
"""Test that status --help works and documents the command."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'status', '--help'],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert 'status' in result.stdout.lower() or 'statistic' in result.stdout.lower()
def test_status_shows_data_directory_path(tmp_path, process):
"""Test that status reports which collection directory it is inspecting."""
os.chdir(tmp_path)
result = subprocess.run(['archivebox', 'status'], capture_output=True, text=True)
assert 'archive' in result.stdout.lower() or str(tmp_path) in result.stdout

View File

@@ -77,6 +77,17 @@ def test_version_quiet_outputs_version_number(tmp_path):
assert len(parts) >= 2
def test_version_flag_outputs_version_number(tmp_path):
"""Test that top-level --version reports the package version."""
os.chdir(tmp_path)
result = subprocess.run(['archivebox', '--version'], capture_output=True, text=True)
assert result.returncode == 0
version = result.stdout.strip()
assert version
assert len(version.split('.')) >= 2
def test_version_shows_system_info_in_initialized_dir(tmp_path, process):
"""Test that version shows system metadata in initialized directory."""
os.chdir(tmp_path)
@@ -148,3 +159,20 @@ def test_version_auto_selects_short_tmp_dir_for_deep_collection_path(tmp_path):
assert reported_tmp_dir.exists()
assert not reported_tmp_dir.is_relative_to(default_tmp_dir)
assert len(f"file://{reported_tmp_dir / 'supervisord.sock'}") <= 96
def test_version_help_lists_quiet_flag(tmp_path):
"""Test that version --help documents the quiet output mode."""
os.chdir(tmp_path)
result = subprocess.run(['archivebox', 'version', '--help'], capture_output=True, text=True)
assert result.returncode == 0
assert '--quiet' in result.stdout or '-q' in result.stdout
def test_version_invalid_option_fails(tmp_path):
"""Test that invalid version options fail cleanly."""
os.chdir(tmp_path)
result = subprocess.run(['archivebox', 'version', '--invalid-option'], capture_output=True, text=True)
assert result.returncode != 0

View File

@@ -1,94 +0,0 @@
# archivebox init
# archivebox add
import os
import subprocess
import sqlite3
from archivebox.config.common import STORAGE_CONFIG
from .fixtures import disable_extractors_dict, process
FIXTURES = (disable_extractors_dict, process)
DIR_PERMISSIONS = STORAGE_CONFIG.OUTPUT_PERMISSIONS.replace('6', '7').replace('4', '5')
def test_init(tmp_path, process):
assert "Initializing a new ArchiveBox" in process.stdout.decode("utf-8")
def test_update(tmp_path, process):
os.chdir(tmp_path)
update_process = subprocess.run(['archivebox', 'init'], capture_output=True)
assert "updating existing ArchiveBox" in update_process.stdout.decode("utf-8")
def test_add_link(tmp_path, process, disable_extractors_dict):
os.chdir(tmp_path)
add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True, env=disable_extractors_dict)
assert add_process.returncode == 0, add_process.stderr.decode("utf-8")
# In the new architecture, URLs are saved to source files
# Check that a source file was created with the URL
sources_dir = tmp_path / "sources"
assert sources_dir.exists(), "Sources directory should be created"
source_files = list(sources_dir.glob("*cli_add.txt"))
assert len(source_files) >= 1, "Source file should be created"
source_content = source_files[0].read_text()
assert "https://example.com" in source_content
def test_add_multiple_urls(tmp_path, process, disable_extractors_dict):
"""Test adding multiple URLs via command line arguments"""
os.chdir(tmp_path)
add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com', 'https://iana.org'],
capture_output=True, env=disable_extractors_dict)
assert add_process.returncode == 0, add_process.stderr.decode("utf-8")
# Check that a source file was created with both URLs
sources_dir = tmp_path / "sources"
assert sources_dir.exists(), "Sources directory should be created"
source_files = list(sources_dir.glob("*cli_add.txt"))
assert len(source_files) >= 1, "Source file should be created"
source_content = source_files[-1].read_text()
assert "https://example.com" in source_content
assert "https://iana.org" in source_content
def test_correct_permissions_output_folder(tmp_path, process):
index_files = ['index.sqlite3', 'archive']
for file in index_files:
file_path = tmp_path / file
assert oct(file_path.stat().st_mode)[-3:] in (STORAGE_CONFIG.OUTPUT_PERMISSIONS, DIR_PERMISSIONS)
def test_correct_permissions_add_command_results(tmp_path, process, disable_extractors_dict):
os.chdir(tmp_path)
add_process = subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True,
env=disable_extractors_dict)
assert add_process.returncode == 0, add_process.stderr.decode("utf-8")
# Check database permissions
assert oct((tmp_path / "index.sqlite3").stat().st_mode)[-3:] in (STORAGE_CONFIG.OUTPUT_PERMISSIONS, DIR_PERMISSIONS)
def test_collision_urls_different_timestamps(tmp_path, process, disable_extractors_dict):
os.chdir(tmp_path)
subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True,
env=disable_extractors_dict)
subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True,
env=disable_extractors_dict)
# Check both URLs are in database
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
count = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
conn.close()
assert count == 2
def test_unrecognized_folders(tmp_path, process, disable_extractors_dict):
os.chdir(tmp_path)
subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True,
env=disable_extractors_dict)
(tmp_path / "archive" / "some_random_folder").mkdir(parents=True, exist_ok=True)
init_process = subprocess.run(['archivebox', 'init'], capture_output=True, env=disable_extractors_dict)
# Just check that init completes successfully
assert init_process.returncode == 0

View File

@@ -1,128 +0,0 @@
#!/usr/bin/env python3
"""Integration tests for archivebox install command."""
import os
import subprocess
import sqlite3
import pytest
class TestInstallDryRun:
"""Test the dry-run mode of install command."""
def test_dry_run_prints_message(self, tmp_path, process):
"""Test that dry-run mode prints appropriate message."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'install', '--dry-run'],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert 'Dry run' in result.stdout
def test_dry_run_does_not_create_crawl(self, tmp_path, process):
"""Test that dry-run mode doesn't create a crawl."""
os.chdir(tmp_path)
# Get initial crawl count
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM crawls_crawl")
initial_count = c.fetchone()[0]
conn.close()
# Run install with dry-run
result = subprocess.run(
['archivebox', 'install', '--dry-run'],
capture_output=True,
text=True,
)
assert result.returncode == 0
# Check crawl count unchanged
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM crawls_crawl")
final_count = c.fetchone()[0]
conn.close()
assert final_count == initial_count
class TestInstallOutput:
"""Test the output/messages from install command."""
def test_install_prints_detecting_message(self, tmp_path, process, disable_extractors_dict):
"""Test that install prints detecting dependencies message."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'install', '--dry-run'],
capture_output=True,
text=True,
env=disable_extractors_dict,
)
assert result.returncode == 0
# Should mention detecting or dependencies
output = result.stdout.lower()
assert 'detect' in output or 'dependenc' in output or 'dry run' in output
class TestInstallCLI:
"""Test the CLI interface for install command."""
def test_cli_help(self, tmp_path):
"""Test that --help works for install command."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'install', '--help'],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert '--dry-run' in result.stdout or '-d' in result.stdout
def test_cli_invalid_option(self, tmp_path):
"""Test that invalid options are handled."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'install', '--invalid-option'],
capture_output=True,
text=True,
)
# Should fail with non-zero exit code
assert result.returncode != 0
class TestInstallInitialization:
"""Test that install initializes the data directory if needed."""
def test_install_from_empty_dir(self, tmp_path):
"""Test that install from empty dir initializes first."""
os.chdir(tmp_path)
# Don't use process fixture - start from empty dir
result = subprocess.run(
['archivebox', 'install', '--dry-run'],
capture_output=True,
text=True,
)
# Should either initialize or show dry run message
output = result.stdout
assert 'Initializing' in output or 'Dry run' in output or 'init' in output.lower()
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -1,98 +0,0 @@
import json
import subprocess
from .fixtures import disable_extractors_dict, process
FIXTURES = (disable_extractors_dict, process)
def test_search_json(process, disable_extractors_dict):
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--json"], capture_output=True)
output_str = search_process.stdout.decode("utf-8").strip()
# Handle potential control characters in output
try:
output_json = json.loads(output_str)
except json.JSONDecodeError:
# Try with strict=False if there are control characters
import re
# Remove ANSI escape sequences and control characters
clean_str = re.sub(r'\x1b\[[0-9;]*m', '', output_str)
clean_str = re.sub(r'[\x00-\x1f\x7f]', lambda m: ' ' if m.group(0) in '\t\n\r' else '', clean_str)
output_json = json.loads(clean_str)
# Verify we get at least one snapshot back
assert len(output_json) >= 1
# Should include the requested URL
assert any("example.com" in entry.get("url", "") for entry in output_json)
def test_search_json_headers(process, disable_extractors_dict):
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--json", "--with-headers"], capture_output=True)
output_str = search_process.stdout.decode("utf-8").strip()
# Handle potential control characters in output
try:
output_json = json.loads(output_str)
except json.JSONDecodeError:
# Try with strict=False if there are control characters
import re
# Remove ANSI escape sequences and control characters
clean_str = re.sub(r'\x1b\[[0-9;]*m', '', output_str)
clean_str = re.sub(r'[\x00-\x1f\x7f]', lambda m: ' ' if m.group(0) in '\t\n\r' else '', clean_str)
output_json = json.loads(clean_str)
# The response should have a links key with headers mode
links = output_json.get("links", output_json)
assert len(links) >= 1
def test_search_html(process, disable_extractors_dict):
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--html"], capture_output=True)
output_html = search_process.stdout.decode("utf-8")
# Should contain some HTML and reference to the source file
assert "sources" in output_html or "cli_add" in output_html or "<" in output_html
def test_search_html_headers(process, disable_extractors_dict):
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--html", "--with-headers"], capture_output=True)
output_html = search_process.stdout.decode("utf-8")
# Should contain HTML
assert "<" in output_html
def test_search_csv(process, disable_extractors_dict):
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--csv", "url"], capture_output=True)
output_csv = search_process.stdout.decode("utf-8")
# Should contain the requested URL
assert "example.com" in output_csv
def test_search_csv_headers(process, disable_extractors_dict):
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
search_process = subprocess.run(["archivebox", "search", "--csv", "url", "--with-headers"], capture_output=True)
output_csv = search_process.stdout.decode("utf-8")
# Should have url header and requested URL
assert "url" in output_csv
assert "example.com" in output_csv
def test_search_with_headers_requires_format(process):
search_process = subprocess.run(["archivebox", "search", "--with-headers"], capture_output=True)
stderr = search_process.stderr.decode("utf-8")
assert "--with-headers" in stderr and ("requires" in stderr or "can only be used" in stderr)
def test_sort_by_url(process, disable_extractors_dict):
# Add two URLs - they will create separate source files
subprocess.run(["archivebox", "add", "--index-only", "https://iana.org", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
subprocess.run(["archivebox", "add", "--index-only", "https://example.com", "--depth=0"],
capture_output=True, env=disable_extractors_dict)
# Search with sort should return results (even if they're file:// URLs)
search_process = subprocess.run(["archivebox", "search", "--csv", "url", "--sort=url"], capture_output=True)
output = search_process.stdout.decode("utf-8")
lines = [line for line in output.strip().split("\n") if line]
# Should have at least 2 snapshots (the source file snapshots)
assert len(lines) >= 2

View File

@@ -12,6 +12,7 @@ import sqlite3
import tempfile
import unittest
from pathlib import Path
from typing import cast
from .migrations_helpers import (
SCHEMA_0_4,
@@ -74,7 +75,7 @@ class TestMigrationFrom04x(unittest.TestCase):
# Collect unique tags from original data
original_tags = set()
for tags_str in self.original_data['tags_str']:
for tags_str in cast(list[str], self.original_data['tags_str']):
if tags_str:
for tag in tags_str.split(','):
original_tags.add(tag.strip())

View File

@@ -1,89 +0,0 @@
import os
import sqlite3
import subprocess
from .fixtures import disable_extractors_dict, process
FIXTURES = (disable_extractors_dict, process)
def test_remove_single_snapshot(tmp_path, process, disable_extractors_dict):
"""Test removing a snapshot by URL pattern"""
os.chdir(tmp_path)
# Add a URL - creates source file snapshot
subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict)
# Verify snapshot exists
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
count_before = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0]
conn.close()
assert count_before >= 1
# Remove all snapshots (including source file snapshots)
remove_process = subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes'], capture_output=True)
# Check that it ran successfully (either output indicates success or return code 0)
output = remove_process.stdout.decode("utf-8") + remove_process.stderr.decode("utf-8")
assert remove_process.returncode == 0 or "removed" in output.lower() or "Found" in output
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
count = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0]
conn.close()
assert count == 0
def test_remove_with_delete_flag(tmp_path, process, disable_extractors_dict):
"""Test removing snapshot with --delete also removes archive folder"""
os.chdir(tmp_path)
subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict)
# Get archives before delete
archive_dir = tmp_path / "archive"
archives_before = list(archive_dir.iterdir()) if archive_dir.exists() else []
# Only run the rest of the test if archives were created
if archives_before:
subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True)
archives_after = list(archive_dir.iterdir()) if archive_dir.exists() else []
assert len(archives_after) < len(archives_before)
else:
# With --index-only, archive folders may not be created immediately
# Just verify that remove command doesn't error
remove_result = subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True)
assert remove_result.returncode in (0, 1) # 0 = success, 1 = no matches
def test_remove_regex(tmp_path, process, disable_extractors_dict):
"""Test removing snapshots by regex pattern"""
os.chdir(tmp_path)
subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict)
subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True, env=disable_extractors_dict)
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
count_before = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0]
conn.close()
assert count_before >= 2
subprocess.run(['archivebox', 'remove', '--filter-type=regex', '.*', '--yes', '--delete'], capture_output=True)
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
count_after = c.execute("SELECT COUNT() FROM core_snapshot").fetchone()[0]
conn.close()
assert count_after == 0
def test_add_creates_crawls(tmp_path, process, disable_extractors_dict):
"""Test that adding URLs creates crawls in database"""
os.chdir(tmp_path)
subprocess.run(['archivebox', 'add', '--index-only', 'https://example.com'], capture_output=True, env=disable_extractors_dict)
subprocess.run(['archivebox', 'add', '--index-only', 'https://iana.org'], capture_output=True, env=disable_extractors_dict)
conn = sqlite3.connect("index.sqlite3")
c = conn.cursor()
crawl_count = c.execute("SELECT COUNT() FROM crawls_crawl").fetchone()[0]
conn.close()
assert crawl_count == 2

View File

@@ -1,142 +0,0 @@
#!/usr/bin/env python3
"""Integration tests for archivebox search command."""
import os
import subprocess
import pytest
def test_search_returns_snapshots(tmp_path, process, disable_extractors_dict):
"""Test that search returns snapshots."""
os.chdir(tmp_path)
# Add some snapshots
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
result = subprocess.run(
['archivebox', 'search'],
capture_output=True,
text=True,
)
# Should return some output (path or URL info)
assert result.stdout.strip() != '' or result.returncode == 0
def test_search_filter_by_substring(tmp_path, process, disable_extractors_dict):
"""Test that substring filter works."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
# Search with filter - may not find if URL isn't stored as expected
result = subprocess.run(
['archivebox', 'search', '--filter-type=substring', 'example'],
capture_output=True,
text=True,
)
# Should run without error
assert result.returncode == 0 or 'No Snapshots' in result.stderr
def test_search_sort_option(tmp_path, process, disable_extractors_dict):
"""Test that --sort option works."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
result = subprocess.run(
['archivebox', 'search', '--sort=url'],
capture_output=True,
text=True,
)
# Should run without error
assert result.returncode == 0
def test_search_with_headers_requires_format(tmp_path, process):
"""Test that --with-headers requires --json, --html, or --csv."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'search', '--with-headers'],
capture_output=True,
text=True,
)
# Should fail with error message
assert result.returncode != 0
assert 'requires' in result.stderr.lower() or 'json' in result.stderr.lower()
def test_search_status_option(tmp_path, process, disable_extractors_dict):
"""Test that --status option filters by status."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
result = subprocess.run(
['archivebox', 'search', '--status=indexed'],
capture_output=True,
text=True,
)
# Should run without error
assert result.returncode == 0
def test_search_no_snapshots_message(tmp_path, process):
"""Test that searching empty archive shows appropriate output."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'search'],
capture_output=True,
text=True,
)
# Should complete (empty results are OK)
assert result.returncode == 0
class TestSearchCLI:
"""Test the CLI interface for search command."""
def test_cli_help(self, tmp_path, process):
"""Test that --help works for search command."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'search', '--help'],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert '--filter-type' in result.stdout or '-f' in result.stdout
assert '--status' in result.stdout
assert '--sort' in result.stdout
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -1,195 +0,0 @@
#!/usr/bin/env python3
"""Integration tests for archivebox status command."""
import os
import subprocess
import pytest
def test_status_shows_index_info(tmp_path, process):
"""Test that status shows index information."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should show index scanning info
assert 'index' in result.stdout.lower() or 'Index' in result.stdout
def test_status_shows_snapshot_count(tmp_path, process, disable_extractors_dict):
"""Test that status shows snapshot count."""
os.chdir(tmp_path)
# Add some snapshots
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://iana.org'],
capture_output=True,
env=disable_extractors_dict,
)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should show link/snapshot count
assert '2' in result.stdout or 'links' in result.stdout.lower()
def test_status_shows_archive_size(tmp_path, process, disable_extractors_dict):
"""Test that status shows archive size information."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should show size info (bytes, KB, MB, etc)
assert 'Size' in result.stdout or 'size' in result.stdout or 'B' in result.stdout
def test_status_shows_indexed_count(tmp_path, process, disable_extractors_dict):
"""Test that status shows indexed folder count."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should show indexed count
assert 'indexed' in result.stdout.lower()
def test_status_shows_archived_vs_unarchived(tmp_path, process, disable_extractors_dict):
"""Test that status shows archived vs unarchived counts."""
os.chdir(tmp_path)
# Add index-only snapshot (unarchived)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should show archived/unarchived categories
assert 'archived' in result.stdout.lower() or 'unarchived' in result.stdout.lower()
def test_status_shows_data_directory_info(tmp_path, process):
"""Test that status shows data directory path."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should show data directory or archive path
assert 'archive' in result.stdout.lower() or str(tmp_path) in result.stdout
def test_status_shows_user_info(tmp_path, process):
"""Test that status shows user information."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should show user info section
assert 'user' in result.stdout.lower() or 'login' in result.stdout.lower()
def test_status_empty_archive(tmp_path, process):
"""Test status on empty archive shows zero counts."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should still run successfully
assert result.returncode == 0 or 'index' in result.stdout.lower()
# Should show 0 links
assert '0' in result.stdout or 'links' in result.stdout.lower()
def test_status_shows_valid_vs_invalid(tmp_path, process, disable_extractors_dict):
"""Test that status shows valid vs invalid folder counts."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'add', '--index-only', 'https://example.com'],
capture_output=True,
env=disable_extractors_dict,
)
result = subprocess.run(
['archivebox', 'status'],
capture_output=True,
text=True,
)
# Should show valid/invalid categories
assert 'valid' in result.stdout.lower() or 'present' in result.stdout.lower()
class TestStatusCLI:
"""Test the CLI interface for status command."""
def test_cli_help(self, tmp_path, process):
"""Test that --help works for status command."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'status', '--help'],
capture_output=True,
text=True,
)
assert result.returncode == 0
# Help should show some info about the command
assert 'status' in result.stdout.lower() or 'statistic' in result.stdout.lower()
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -1,158 +0,0 @@
#!/usr/bin/env python3
"""Integration tests for archivebox version command."""
import os
import subprocess
import pytest
class TestVersionQuiet:
"""Test the quiet/minimal version output."""
def test_version_prints_version_number(self, tmp_path):
"""Test that version prints the version number."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'version', '--quiet'],
capture_output=True,
text=True,
)
assert result.returncode == 0
# Should contain a version string like "0.8.0" or similar
version = result.stdout.strip()
assert version
# Version should be a valid semver-ish format
parts = version.split('.')
assert len(parts) >= 2 # At least major.minor
def test_version_flag_prints_version_number(self, tmp_path):
"""Test that --version flag prints the version number."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', '--version'],
capture_output=True,
text=True,
)
assert result.returncode == 0
version = result.stdout.strip()
assert version
parts = version.split('.')
assert len(parts) >= 2
class TestVersionFull:
"""Test the full version output."""
def test_version_shows_system_info(self, tmp_path, process):
"""Test that version shows system information."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'version'],
capture_output=True,
text=True,
)
output = result.stdout
# Should show basic system info (exit code may be 1 if binaries missing)
assert 'ArchiveBox' in output
def test_version_shows_binary_section(self, tmp_path, process):
"""Test that version shows binary dependencies section."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'version'],
capture_output=True,
text=True,
)
output = result.stdout
# Should show binary dependencies section
assert 'Binary' in output or 'Dependenc' in output
def test_version_shows_data_locations(self, tmp_path, process):
"""Test that version shows data locations."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'version'],
capture_output=True,
text=True,
)
output = result.stdout
# Should show data/code locations
assert 'Data' in output or 'location' in output.lower() or 'DIR' in output or 'Code' in output
class TestVersionWithBinaries:
"""Test version output after running install."""
def test_version_shows_binary_status(self, tmp_path, process, disable_extractors_dict):
"""Test that version shows binary status (installed or not)."""
os.chdir(tmp_path)
# First run install (with dry-run to speed up)
subprocess.run(
['archivebox', 'install', '--dry-run'],
capture_output=True,
text=True,
env=disable_extractors_dict,
)
# Now check version
result = subprocess.run(
['archivebox', 'version'],
capture_output=True,
text=True,
env=disable_extractors_dict,
)
output = result.stdout
# Should show binary status (either installed or not installed)
assert 'installed' in output.lower() or 'Binary' in output
class TestVersionCLI:
"""Test the CLI interface for version command."""
def test_cli_help(self, tmp_path):
"""Test that --help works for version command."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'version', '--help'],
capture_output=True,
text=True,
)
assert result.returncode == 0
assert '--quiet' in result.stdout or '-q' in result.stdout
def test_cli_invalid_option(self, tmp_path):
"""Test that invalid options are handled."""
os.chdir(tmp_path)
result = subprocess.run(
['archivebox', 'version', '--invalid-option'],
capture_output=True,
text=True,
)
# Should fail with non-zero exit code
assert result.returncode != 0
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -1,40 +1,17 @@
"""UUID7 compatibility layer for Python 3.13+
Python 3.14+ has native uuid7 support. For Python 3.13, we use uuid_extensions.
IMPORTANT: We also monkey-patch uuid.uuid7 for backward compatibility with
migrations that were auto-generated on Python 3.14+ systems.
"""
"""UUID7 compatibility layer."""
import sys
import uuid
import functools
from importlib import import_module
if sys.version_info >= (3, 14):
from uuid import uuid7 as _uuid7
_UUID7_GENERATOR = getattr(uuid, 'uuid7')
else:
try:
from uuid_extensions import uuid7 as _uuid7
except ImportError:
raise ImportError(
"uuid_extensions package is required for Python <3.14. "
"Install it with: pip install uuid_extensions"
)
# Monkey-patch uuid module for migrations generated on Python 3.14+
# that reference uuid.uuid7 directly
if not hasattr(uuid, 'uuid7'):
uuid.uuid7 = _uuid7
_UUID7_GENERATOR = getattr(import_module('uuid_extensions'), 'uuid7')
@functools.wraps(_uuid7)
def uuid7():
"""Generate a UUID7 (time-ordered UUID).
This wrapper ensures Django migrations always reference
'archivebox.uuid_compat.uuid7' regardless of Python version.
"""
return _uuid7()
def uuid7() -> uuid.UUID:
return _UUID7_GENERATOR()
__all__ = ['uuid7']

View File

@@ -31,7 +31,7 @@ __package__ = 'archivebox.workers'
import os
import time
from typing import Type
from datetime import timedelta
from datetime import datetime, timedelta
from multiprocessing import Process as MPProcess
from pathlib import Path
@@ -189,7 +189,7 @@ class Orchestrator:
event='Shutting down',
indent_level=0,
pid=self.pid,
error=error if error and not isinstance(error, KeyboardInterrupt) else None,
error=error if isinstance(error, Exception) and not isinstance(error, KeyboardInterrupt) else None,
)
def get_total_worker_count(self) -> int:
@@ -567,7 +567,8 @@ class Orchestrator:
status=ArchiveResult.StatusChoices.STARTED,
).select_related('process')
for ar in started_ars:
if ar.process_id and ar.process and ar.process.status == Process.StatusChoices.RUNNING:
process_id = getattr(ar, 'process_id', None)
if process_id and ar.process and ar.process.status == Process.StatusChoices.RUNNING:
try:
ar.process.kill_tree(graceful_timeout=0.0)
except Exception:
@@ -904,28 +905,29 @@ class Orchestrator:
size = ''
stderr_tail = ''
if ar:
if ar.process_id and ar.process:
process_id = getattr(ar, 'process_id', None)
if process_id and ar.process:
stderr_tail = _tail_stderr_line(ar.process)
if ar.status == ArchiveResult.StatusChoices.STARTED:
status = 'started'
is_running = True
is_pending = False
start_ts = ar.start_ts or (ar.process.started_at if ar.process_id and ar.process else None)
start_ts = ar.start_ts or (ar.process.started_at if process_id and ar.process else None)
if start_ts:
elapsed = _format_seconds((now - start_ts).total_seconds())
hook_timeout = None
if ar.process_id and ar.process and ar.process.timeout:
if process_id and ar.process and ar.process.timeout:
hook_timeout = ar.process.timeout
hook_timeout = hook_timeout or hook_timeouts.get(hook_name)
if hook_timeout:
timeout = _format_seconds(hook_timeout)
else:
status = ar.status
if ar.process_id and ar.process and ar.process.exit_code == 137:
if process_id and ar.process and ar.process.exit_code == 137:
status = 'failed'
is_pending = False
start_ts = ar.start_ts or (ar.process.started_at if ar.process_id and ar.process else None)
end_ts = ar.end_ts or (ar.process.ended_at if ar.process_id and ar.process else None)
start_ts = ar.start_ts or (ar.process.started_at if process_id and ar.process else None)
end_ts = ar.end_ts or (ar.process.ended_at if process_id and ar.process else None)
if start_ts and end_ts:
elapsed = _format_seconds((end_ts - start_ts).total_seconds())
size = _format_size(getattr(ar, 'output_size', None))
@@ -1093,7 +1095,7 @@ class Orchestrator:
from archivebox.core.models import Snapshot
# Get all started snapshots (optionally filtered by crawl_id)
snapshot_filter = {'status': 'started'}
snapshot_filter: dict[str, str | datetime] = {'status': 'started'}
if self.crawl_id:
snapshot_filter['crawl_id'] = self.crawl_id
else:

View File

@@ -335,6 +335,7 @@ def start_worker(supervisor, daemon, lazy=False):
for added in added:
supervisor.addProcessGroup(added)
procs = []
for _ in range(25):
procs = supervisor.getAllProcessInfo()
for proc in procs:

View File

@@ -1,7 +1,9 @@
from datetime import timedelta
from typing import cast
from unittest.mock import patch
from django.contrib.auth import get_user_model
from django.contrib.auth.models import UserManager
from django.test import TestCase
from django.utils import timezone
@@ -12,7 +14,8 @@ from archivebox.workers.worker import CrawlWorker
class TestScheduledCrawlMaterialization(TestCase):
def setUp(self):
self.user = get_user_model().objects.create_user(
user_manager = cast(UserManager, get_user_model().objects)
self.user = user_manager.create_user(
username='schedule-user',
password='password',
)
@@ -52,6 +55,8 @@ class TestScheduledCrawlMaterialization(TestCase):
self.assertEqual(scheduled_crawls.count(), 2)
queued_crawl = scheduled_crawls.last()
self.assertIsNotNone(queued_crawl)
assert queued_crawl is not None
self.assertEqual(queued_crawl.status, Crawl.StatusChoices.QUEUED)
self.assertEqual(queued_crawl.urls, 'https://example.com/feed.xml')
self.assertEqual(queued_crawl.max_depth, 1)
@@ -63,7 +68,7 @@ class TestScheduledCrawlMaterialization(TestCase):
Orchestrator(exit_on_idle=True)._materialize_due_schedules()
self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1)
Orchestrator(exit_on_idle=False, crawl_id=str(schedule.template_id))._materialize_due_schedules()
Orchestrator(exit_on_idle=False, crawl_id=str(schedule.template.id))._materialize_due_schedules()
self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1)
@patch.object(CrawlWorker, 'start')

View File

@@ -1,5 +1,6 @@
from pathlib import Path
from types import SimpleNamespace
from typing import Any, cast
from unittest.mock import patch
from django.test import SimpleTestCase
@@ -11,14 +12,14 @@ class TestSnapshotWorkerRetryForegroundHooks(SimpleTestCase):
def _make_worker(self):
worker = SnapshotWorker.__new__(SnapshotWorker)
worker.pid = 12345
worker.snapshot = SimpleNamespace(
cast(Any, worker).snapshot = SimpleNamespace(
status='started',
refresh_from_db=lambda: None,
)
worker._snapshot_exceeded_hard_timeout = lambda: False
worker._seal_snapshot_due_to_timeout = lambda: None
worker._run_hook = lambda *args, **kwargs: SimpleNamespace()
worker._wait_for_hook = lambda *args, **kwargs: None
worker._wait_for_hook = lambda process, ar: None
return worker
@patch('archivebox.workers.worker.log_worker_event')
@@ -49,10 +50,10 @@ class TestSnapshotWorkerRetryForegroundHooks(SimpleTestCase):
run_calls.append((args, kwargs))
return SimpleNamespace()
def wait_for_hook(process, archive_result):
wait_calls.append((process, archive_result))
archive_result.status = 'succeeded'
archive_result.output_files = {'singlefile.html': {}}
def wait_for_hook(process, ar):
wait_calls.append((process, ar))
ar.status = 'succeeded'
ar.output_files = {'singlefile.html': {}}
archive_result = SimpleNamespace(
status='failed',