mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-03 01:15:57 +10:00
Add comprehensive unit tests for the CLI piping architecture: - test_cli_crawl.py: crawl create/list/update/delete tests - test_cli_snapshot.py: snapshot create/list/update/delete tests - test_cli_archiveresult.py: archiveresult create/list/update/delete tests - test_cli_run.py: run command create-or-update and pass-through tests Extend tests_piping.py with: - TestPassThroughBehavior: tests for pass-through behavior in all commands - TestPipelineAccumulation: tests for accumulating records through pipeline All tests use pytest fixtures from conftest.py with isolated DATA_DIR.
24 KiB
24 KiB
ArchiveBox CLI Pipeline Architecture
Overview
This plan implements a JSONL-based CLI pipeline for ArchiveBox, enabling Unix-style piping between commands:
archivebox crawl create URL | archivebox snapshot create | archivebox archiveresult create | archivebox run
Design Principles
- Maximize model method reuse: Use
.to_json(),.from_json(),.to_jsonl(),.from_jsonl()everywhere - Pass-through behavior: All commands output input records + newly created records (accumulating pipeline)
- Create-or-update: Commands create records if they don't exist, update if ID matches existing
- Auto-cascade:
archivebox runautomatically creates Snapshots from Crawls and ArchiveResults from Snapshots - Generic filtering: Implement filters as functions that take queryset → return queryset
- Minimal code: Extract duplicated
apply_filters()to shared module
Real-World Use Cases
These examples demonstrate the JSONL piping architecture. Key points:
archivebox runauto-cascades (Crawl → Snapshots → ArchiveResults)archivebox runemits JSONL of everything it creates, enabling chained processing- Use CLI args (
--status=,--plugin=) for efficient DB filtering; use jq for transforms
1. Basic Archive
# Simple URL archive (run auto-creates snapshots and archive results)
archivebox crawl create https://example.com | archivebox run
# Multiple URLs from a file
archivebox crawl create < urls.txt | archivebox run
# With depth crawling (follow links)
archivebox crawl create --depth=2 https://docs.python.org | archivebox run
2. Retry Failed Extractions
# Retry all failed extractions
archivebox archiveresult list --status=failed | archivebox run
# Retry only failed PDFs from a specific domain
archivebox archiveresult list --status=failed --plugin=pdf --url__icontains=nytimes.com \
| archivebox run
3. Import Bookmarks from Pinboard (jq transform)
# Fetch Pinboard API, transform fields to match ArchiveBox schema, archive
curl -s "https://api.pinboard.in/v1/posts/all?format=json&auth_token=$TOKEN" \
| jq -c '.[] | {url: .href, tags_str: .tags, title: .description}' \
| archivebox crawl create \
| archivebox run
4. Retry Failed with Different Binary (jq transform + re-run)
# Get failed wget results, transform to use wget2 binary instead, re-queue as new attempts
archivebox archiveresult list --status=failed --plugin=wget \
| jq -c '{snapshot_id, plugin, status: "queued", overrides: {WGET_BINARY: "wget2"}}' \
| archivebox archiveresult create \
| archivebox run
# Chain processing: archive, then re-run any failures with increased timeout
archivebox crawl create https://slow-site.com \
| archivebox run \
| jq -c 'select(.type == "ArchiveResult" and .status == "failed")
| del(.id) | .status = "queued" | .overrides.TIMEOUT = "120"' \
| archivebox archiveresult create \
| archivebox run
5. Selective Extraction
# Create only screenshot extractions for queued snapshots
archivebox snapshot list --status=queued \
| archivebox archiveresult create --plugin=screenshot \
| archivebox run
# Re-run singlefile on everything that was skipped
archivebox archiveresult list --plugin=singlefile --status=skipped \
| archivebox archiveresult update --status=queued \
| archivebox run
6. Bulk Tag Management
# Tag all Twitter/X URLs (efficient DB filter, no jq needed)
archivebox snapshot list --url__icontains=twitter.com \
| archivebox snapshot update --tag=twitter
# Tag snapshots based on computed criteria (jq for logic DB can't do)
archivebox snapshot list --status=sealed \
| jq -c 'select(.archiveresult_count > 5) | . + {tags_str: (.tags_str + ",well-archived")}' \
| archivebox snapshot update
7. RSS Feed Monitoring
# Archive all items from an RSS feed
curl -s "https://hnrss.org/frontpage" \
| xq -r '.rss.channel.item[].link' \
| archivebox crawl create --tag=hackernews-$(date +%Y%m%d) \
| archivebox run
8. Recursive Link Following (run output → filter → re-run)
# Archive a page, then archive all PDFs it links to
archivebox crawl create https://research-papers.org/index.html \
| archivebox run \
| jq -c 'select(.type == "Snapshot") | .discovered_urls[]?
| select(endswith(".pdf")) | {url: .}' \
| archivebox crawl create --tag=linked-pdfs \
| archivebox run
# Depth crawl with custom handling: retry timeouts with longer timeout
archivebox crawl create --depth=1 https://example.com \
| archivebox run \
| jq -c 'select(.type == "ArchiveResult" and .status == "failed" and .error contains "timeout")
| del(.id) | .overrides.TIMEOUT = "300"' \
| archivebox archiveresult create \
| archivebox run
Composability Summary
| Pattern | Example |
|---|---|
| Filter → Process | list --status=failed --plugin=pdf | run |
| Transform → Archive | curl API | jq '{url, tags_str}' | crawl create | run |
| Retry w/ Changes | run | jq 'select(.status=="failed") | del(.id)' | create | run |
| Selective Extract | snapshot list | archiveresult create --plugin=screenshot |
| Bulk Update | list --url__icontains=X | update --tag=Y |
| Chain Processing | crawl | run | jq transform | create | run |
The key insight: archivebox run emits JSONL of everything it creates, enabling:
- Retry failed items with different settings (timeouts, binaries, etc.)
- Recursive crawling (archive page → extract links → archive those)
- Chained transforms (filter failures, modify config, re-queue)
Code Reuse Findings
Existing Model Methods (USE THESE)
Crawl.to_json(),Crawl.from_json(),Crawl.to_jsonl(),Crawl.from_jsonl()Snapshot.to_json(),Snapshot.from_json(),Snapshot.to_jsonl(),Snapshot.from_jsonl()Tag.to_json(),Tag.from_json(),Tag.to_jsonl(),Tag.from_jsonl()
Missing Model Methods (MUST IMPLEMENT)
ArchiveResult.from_json()- Does not exist, must be addedArchiveResult.from_jsonl()- Does not exist, must be added
Existing Utilities (USE THESE)
archivebox/misc/jsonl.py:read_stdin(),read_args_or_stdin(),write_record(),parse_line()- Type constants:
TYPE_CRAWL,TYPE_SNAPSHOT,TYPE_ARCHIVERESULT, etc.
Duplicated Code (EXTRACT)
apply_filters()duplicated in 7 CLI files → extract toarchivebox/cli/cli_utils.py
Supervisord Config (UPDATE)
archivebox/workers/supervisord_util.pyline ~35:"command": "archivebox manage orchestrator"→"command": "archivebox run"
Field Name Standardization (FIX)
- Issue:
Crawl.to_json()outputstags_str, butSnapshot.to_json()outputstags - Fix: Standardize all models to use
tags_strin JSONL output (matches model property names)
Implementation Order
Phase 1: Model Prerequisites
-
Implement
ArchiveResult.from_json()inarchivebox/core/models.py- Pattern: Match
Snapshot.from_json()andCrawl.from_json()style - Handle: ID lookup (update existing) or create new
- Required fields:
snapshot_id,plugin - Optional fields:
status,hook_name, etc.
- Pattern: Match
-
Implement
ArchiveResult.from_jsonl()inarchivebox/core/models.py- Filter records by
type='ArchiveResult' - Call
from_json()for each matching record
- Filter records by
-
Fix
Snapshot.to_json()field name- Change
'tags': self.tags_str()→'tags_str': self.tags_str() - Update any code that depends on
tagskey in Snapshot JSONL
- Change
Phase 2: Shared Utilities
- Extract
apply_filters()toarchivebox/cli/cli_utils.py- Generic queryset filtering from CLI kwargs
- Support
--id__in=[csv],--url__icontains=str, etc. - Remove duplicates from 7 CLI files
Phase 3: Pass-Through Behavior (NEW FEATURE)
-
Add pass-through to
archivebox crawl create- Output non-Crawl input records unchanged
- Output created Crawl records
-
Add pass-through to
archivebox snapshot create- Output non-Snapshot/non-Crawl input records unchanged
- Process Crawl records → create Snapshots
- Output both original Crawl and created Snapshots
-
Add pass-through to
archivebox archiveresult create- Output non-Snapshot/non-ArchiveResult input records unchanged
- Process Snapshot records → create ArchiveResults
- Output both original Snapshots and created ArchiveResults
-
Add create-or-update to
archivebox run- Records WITH id: lookup and queue existing
- Records WITHOUT id: create via
Model.from_json(), then queue - Pass-through output of all processed records
Phase 4: Test Infrastructure
- Create
archivebox/tests/conftest.pywith pytest-django- Use
pytest-djangofor proper test database handling - Isolated DATA_DIR per test via
tmp_pathfixture run_archivebox_cmd()helper for subprocess testing
- Use
Phase 5: Unit Tests
- Create
archivebox/tests/test_cli_crawl.py- crawl create/list/pass-through tests - Create
archivebox/tests/test_cli_snapshot.py- snapshot create/list/pass-through tests - Create
archivebox/tests/test_cli_archiveresult.py- archiveresult create/list/pass-through tests - Create
archivebox/tests/test_cli_run.py- run command create-or-update tests
Phase 6: Integration & Config
- Extend
archivebox/cli/tests_piping.py- Add pass-through integration tests - Update supervisord config -
orchestrator→run
Future Work (Deferred)
Commands to Defer
archivebox tag create|list|update|delete- Already works, defer improvementsarchivebox binary create|list|update|delete- Lower priorityarchivebox process list- Lower priorityarchivebox apikey create|list|update|delete- Lower priority
archivebox add Relationship
- Current:
archivebox addis the primary user-facing command, stays as-is - Future: Refactor
addto internally usecrawl create | snapshot create | runpipeline - Note: This refactor is deferred;
addcontinues to work independently for now
Key Files
| File | Action | Phase |
|---|---|---|
archivebox/core/models.py |
Add ArchiveResult.from_json(), from_jsonl() |
1 |
archivebox/core/models.py |
Fix Snapshot.to_json() → tags_str |
1 |
archivebox/cli/cli_utils.py |
NEW - shared apply_filters() |
2 |
archivebox/cli/archivebox_crawl.py |
Add pass-through to create | 3 |
archivebox/cli/archivebox_snapshot.py |
Add pass-through to create | 3 |
archivebox/cli/archivebox_archiveresult.py |
Add pass-through to create | 3 |
archivebox/cli/archivebox_run.py |
Add create-or-update, pass-through | 3 |
archivebox/tests/conftest.py |
NEW - pytest fixtures | 4 |
archivebox/tests/test_cli_crawl.py |
NEW - crawl unit tests | 5 |
archivebox/tests/test_cli_snapshot.py |
NEW - snapshot unit tests | 5 |
archivebox/tests/test_cli_archiveresult.py |
NEW - archiveresult unit tests | 5 |
archivebox/tests/test_cli_run.py |
NEW - run unit tests | 5 |
archivebox/cli/tests_piping.py |
Extend with pass-through tests | 6 |
archivebox/workers/supervisord_util.py |
Update orchestrator→run | 6 |
Implementation Details
ArchiveResult.from_json() Design
@staticmethod
def from_json(record: Dict[str, Any], overrides: Dict[str, Any] = None) -> 'ArchiveResult | None':
"""
Create or update a single ArchiveResult from a JSON record dict.
Args:
record: Dict with 'snapshot_id' and 'plugin' (required for create),
or 'id' (for update)
overrides: Dict of field overrides
Returns:
ArchiveResult instance or None if invalid
"""
from django.utils import timezone
overrides = overrides or {}
# If 'id' is provided, lookup and update existing
result_id = record.get('id')
if result_id:
try:
result = ArchiveResult.objects.get(id=result_id)
# Update fields from record
if record.get('status'):
result.status = record['status']
result.retry_at = timezone.now()
result.save()
return result
except ArchiveResult.DoesNotExist:
pass # Fall through to create
# Required fields for creation
snapshot_id = record.get('snapshot_id')
plugin = record.get('plugin')
if not snapshot_id or not plugin:
return None
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
except Snapshot.DoesNotExist:
return None
# Create or get existing result
result, created = ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin=plugin,
defaults={
'status': record.get('status', ArchiveResult.StatusChoices.QUEUED),
'retry_at': timezone.now(),
'hook_name': record.get('hook_name', ''),
**overrides,
}
)
# If not created, optionally reset for retry
if not created and record.get('status'):
result.status = record['status']
result.retry_at = timezone.now()
result.save()
return result
Pass-Through Pattern
All create commands follow this pattern:
def create_X(args, ...):
is_tty = sys.stdout.isatty()
records = list(read_args_or_stdin(args))
for record in records:
record_type = record.get('type')
# Pass-through: output records we don't handle
if record_type not in HANDLED_TYPES:
if not is_tty:
write_record(record)
continue
# Handle our type: create via Model.from_json()
obj = Model.from_json(record, overrides={...})
# Output created record (hydrated with db id)
if obj and not is_tty:
write_record(obj.to_json())
Pass-Through Semantics Example
Input:
{"type": "Crawl", "id": "abc", "urls": "https://example.com", ...}
{"type": "Tag", "name": "important"}
archivebox snapshot create output:
{"type": "Crawl", "id": "abc", ...} # pass-through (not our type)
{"type": "Tag", "name": "important"} # pass-through (not our type)
{"type": "Snapshot", "id": "xyz", ...} # created from Crawl URLs
Create-or-Update Pattern for archivebox run
def process_stdin_records() -> int:
records = list(read_stdin())
is_tty = sys.stdout.isatty()
for record in records:
record_type = record.get('type')
record_id = record.get('id')
# Create-or-update based on whether ID exists
if record_type == TYPE_CRAWL:
if record_id:
try:
obj = Crawl.objects.get(id=record_id)
except Crawl.DoesNotExist:
obj = Crawl.from_json(record)
else:
obj = Crawl.from_json(record)
if obj:
obj.retry_at = timezone.now()
obj.save()
if not is_tty:
write_record(obj.to_json())
# Similar for Snapshot, ArchiveResult...
Shared apply_filters() Design
Extract to archivebox/cli/cli_utils.py:
"""Shared CLI utilities for ArchiveBox commands."""
from typing import Optional
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""
Apply Django-style filters from CLI kwargs to a QuerySet.
Supports: --status=queued, --url__icontains=example, --id__in=uuid1,uuid2
Args:
queryset: Django QuerySet to filter
filter_kwargs: Dict of filter key-value pairs from CLI
limit: Optional limit on results
Returns:
Filtered QuerySet
"""
filters = {}
for key, value in filter_kwargs.items():
if value is None or key in ('limit', 'offset'):
continue
# Handle CSV lists for __in filters
if key.endswith('__in') and isinstance(value, str):
value = [v.strip() for v in value.split(',')]
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset
conftest.py Design (pytest-django)
"""archivebox/tests/conftest.py - Pytest fixtures for CLI tests."""
import os
import sys
import json
import subprocess
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import pytest
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture
def isolated_data_dir(tmp_path, settings):
"""
Create isolated DATA_DIR for each test.
Uses tmp_path for isolation, configures Django settings.
"""
data_dir = tmp_path / 'archivebox_data'
data_dir.mkdir()
# Set environment for subprocess calls
os.environ['DATA_DIR'] = str(data_dir)
# Update Django settings
settings.DATA_DIR = data_dir
yield data_dir
# Cleanup handled by tmp_path fixture
@pytest.fixture
def initialized_archive(isolated_data_dir):
"""
Initialize ArchiveBox archive in isolated directory.
Runs `archivebox init` to set up database and directories.
"""
from archivebox.cli.archivebox_init import init
init(setup=True, quick=True)
return isolated_data_dir
@pytest.fixture
def cli_env(initialized_archive):
"""
Environment dict for CLI subprocess calls.
Includes DATA_DIR and disables slow extractors.
"""
return {
**os.environ,
'DATA_DIR': str(initialized_archive),
'USE_COLOR': 'False',
'SHOW_PROGRESS': 'False',
'SAVE_TITLE': 'True',
'SAVE_FAVICON': 'False',
'SAVE_WGET': 'False',
'SAVE_WARC': 'False',
'SAVE_PDF': 'False',
'SAVE_SCREENSHOT': 'False',
'SAVE_DOM': 'False',
'SAVE_SINGLEFILE': 'False',
'SAVE_READABILITY': 'False',
'SAVE_MERCURY': 'False',
'SAVE_GIT': 'False',
'SAVE_YTDLP': 'False',
'SAVE_HEADERS': 'False',
}
# =============================================================================
# CLI Helpers
# =============================================================================
def run_archivebox_cmd(
args: List[str],
stdin: Optional[str] = None,
cwd: Optional[Path] = None,
env: Optional[Dict[str, str]] = None,
timeout: int = 60,
) -> Tuple[str, str, int]:
"""
Run archivebox command, return (stdout, stderr, returncode).
Args:
args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
stdin: Optional string to pipe to stdin
cwd: Working directory (defaults to DATA_DIR from env)
env: Environment variables (defaults to os.environ with DATA_DIR)
timeout: Command timeout in seconds
Returns:
Tuple of (stdout, stderr, returncode)
"""
cmd = [sys.executable, '-m', 'archivebox'] + args
env = env or {**os.environ}
cwd = cwd or Path(env.get('DATA_DIR', '.'))
result = subprocess.run(
cmd,
input=stdin,
capture_output=True,
text=True,
cwd=cwd,
env=env,
timeout=timeout,
)
return result.stdout, result.stderr, result.returncode
# =============================================================================
# Output Assertions
# =============================================================================
def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]:
"""Parse JSONL output into list of dicts."""
records = []
for line in stdout.strip().split('\n'):
line = line.strip()
if line and line.startswith('{'):
try:
records.append(json.loads(line))
except json.JSONDecodeError:
pass
return records
def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1):
"""Assert output contains at least min_count records of type."""
records = parse_jsonl_output(stdout)
matching = [r for r in records if r.get('type') == record_type]
assert len(matching) >= min_count, \
f"Expected >= {min_count} {record_type}, got {len(matching)}"
return matching
def assert_jsonl_pass_through(stdout: str, input_records: List[Dict[str, Any]]):
"""Assert that input records appear in output (pass-through behavior)."""
output_records = parse_jsonl_output(stdout)
output_ids = {r.get('id') for r in output_records if r.get('id')}
for input_rec in input_records:
input_id = input_rec.get('id')
if input_id:
assert input_id in output_ids, \
f"Input record {input_id} not found in output (pass-through failed)"
def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str]):
"""Assert record has all required fields with non-None values."""
for field in required_fields:
assert field in record, f"Record missing field: {field}"
assert record[field] is not None, f"Record field is None: {field}"
# =============================================================================
# Database Assertions
# =============================================================================
def assert_db_count(model_class, filters: Dict[str, Any], expected: int):
"""Assert database count matches expected."""
actual = model_class.objects.filter(**filters).count()
assert actual == expected, \
f"Expected {expected} {model_class.__name__}, got {actual}"
def assert_db_exists(model_class, **filters):
"""Assert at least one record exists matching filters."""
assert model_class.objects.filter(**filters).exists(), \
f"No {model_class.__name__} found matching {filters}"
# =============================================================================
# Test Data Factories
# =============================================================================
def create_test_url(domain: str = 'example.com', path: str = None) -> str:
"""Generate unique test URL."""
import uuid
path = path or uuid.uuid4().hex[:8]
return f'https://{domain}/{path}'
def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
"""Create Crawl JSONL record for testing."""
from archivebox.misc.jsonl import TYPE_CRAWL
urls = urls or [create_test_url()]
return {
'type': TYPE_CRAWL,
'urls': '\n'.join(urls),
'max_depth': kwargs.get('max_depth', 0),
'tags_str': kwargs.get('tags_str', ''),
'status': kwargs.get('status', 'queued'),
**{k: v for k, v in kwargs.items() if k not in ('max_depth', 'tags_str', 'status')},
}
def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
"""Create Snapshot JSONL record for testing."""
from archivebox.misc.jsonl import TYPE_SNAPSHOT
return {
'type': TYPE_SNAPSHOT,
'url': url or create_test_url(),
'tags_str': kwargs.get('tags_str', ''),
'status': kwargs.get('status', 'queued'),
**{k: v for k, v in kwargs.items() if k not in ('tags_str', 'status')},
}
Test Rules
- NO SKIPPING - Every test runs
- NO MOCKING - Real subprocess calls, real database
- NO DISABLING - Failing tests identify real problems
- MINIMAL CODE - Import helpers from conftest.py
- ISOLATED - Each test gets its own DATA_DIR via
tmp_path
Task Checklist
Phase 1: Model Prerequisites
- Implement
ArchiveResult.from_json()inarchivebox/core/models.py - Implement
ArchiveResult.from_jsonl()inarchivebox/core/models.py - Fix
Snapshot.to_json()to usetags_strinstead oftags
Phase 2: Shared Utilities
- Create
archivebox/cli/cli_utils.pywith sharedapply_filters() - Update 7 CLI files to import from
cli_utils.py
Phase 3: Pass-Through Behavior
- Add pass-through to
archivebox_crawl.pycreate - Add pass-through to
archivebox_snapshot.pycreate - Add pass-through to
archivebox_archiveresult.pycreate - Add create-or-update to
archivebox_run.py - Add pass-through output to
archivebox_run.py
Phase 4: Test Infrastructure
- Create
archivebox/tests/conftest.pywith pytest-django fixtures
Phase 5: Unit Tests
- Create
archivebox/tests/test_cli_crawl.py - Create
archivebox/tests/test_cli_snapshot.py - Create
archivebox/tests/test_cli_archiveresult.py - Create
archivebox/tests/test_cli_run.py
Phase 6: Integration & Config
- Extend
archivebox/cli/tests_piping.pywith pass-through tests - Update
archivebox/workers/supervisord_util.py: orchestrator→run