mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-02 17:05:38 +10:00
Add comprehensive unit tests for the CLI piping architecture: - test_cli_crawl.py: crawl create/list/update/delete tests - test_cli_snapshot.py: snapshot create/list/update/delete tests - test_cli_archiveresult.py: archiveresult create/list/update/delete tests - test_cli_run.py: run command create-or-update and pass-through tests Extend tests_piping.py with: - TestPassThroughBehavior: tests for pass-through behavior in all commands - TestPipelineAccumulation: tests for accumulating records through pipeline All tests use pytest fixtures from conftest.py with isolated DATA_DIR.
717 lines
24 KiB
Markdown
717 lines
24 KiB
Markdown
# ArchiveBox CLI Pipeline Architecture
|
|
|
|
## Overview
|
|
|
|
This plan implements a JSONL-based CLI pipeline for ArchiveBox, enabling Unix-style piping between commands:
|
|
|
|
```bash
|
|
archivebox crawl create URL | archivebox snapshot create | archivebox archiveresult create | archivebox run
|
|
```
|
|
|
|
## Design Principles
|
|
|
|
1. **Maximize model method reuse**: Use `.to_json()`, `.from_json()`, `.to_jsonl()`, `.from_jsonl()` everywhere
|
|
2. **Pass-through behavior**: All commands output input records + newly created records (accumulating pipeline)
|
|
3. **Create-or-update**: Commands create records if they don't exist, update if ID matches existing
|
|
4. **Auto-cascade**: `archivebox run` automatically creates Snapshots from Crawls and ArchiveResults from Snapshots
|
|
5. **Generic filtering**: Implement filters as functions that take queryset → return queryset
|
|
6. **Minimal code**: Extract duplicated `apply_filters()` to shared module
|
|
|
|
---
|
|
|
|
## Real-World Use Cases
|
|
|
|
These examples demonstrate the JSONL piping architecture. Key points:
|
|
- `archivebox run` auto-cascades (Crawl → Snapshots → ArchiveResults)
|
|
- `archivebox run` **emits JSONL** of everything it creates, enabling chained processing
|
|
- Use CLI args (`--status=`, `--plugin=`) for efficient DB filtering; use jq for transforms
|
|
|
|
### 1. Basic Archive
|
|
```bash
|
|
# Simple URL archive (run auto-creates snapshots and archive results)
|
|
archivebox crawl create https://example.com | archivebox run
|
|
|
|
# Multiple URLs from a file
|
|
archivebox crawl create < urls.txt | archivebox run
|
|
|
|
# With depth crawling (follow links)
|
|
archivebox crawl create --depth=2 https://docs.python.org | archivebox run
|
|
```
|
|
|
|
### 2. Retry Failed Extractions
|
|
```bash
|
|
# Retry all failed extractions
|
|
archivebox archiveresult list --status=failed | archivebox run
|
|
|
|
# Retry only failed PDFs from a specific domain
|
|
archivebox archiveresult list --status=failed --plugin=pdf --url__icontains=nytimes.com \
|
|
| archivebox run
|
|
```
|
|
|
|
### 3. Import Bookmarks from Pinboard (jq transform)
|
|
```bash
|
|
# Fetch Pinboard API, transform fields to match ArchiveBox schema, archive
|
|
curl -s "https://api.pinboard.in/v1/posts/all?format=json&auth_token=$TOKEN" \
|
|
| jq -c '.[] | {url: .href, tags_str: .tags, title: .description}' \
|
|
| archivebox crawl create \
|
|
| archivebox run
|
|
```
|
|
|
|
### 4. Retry Failed with Different Binary (jq transform + re-run)
|
|
```bash
|
|
# Get failed wget results, transform to use wget2 binary instead, re-queue as new attempts
|
|
archivebox archiveresult list --status=failed --plugin=wget \
|
|
| jq -c '{snapshot_id, plugin, status: "queued", overrides: {WGET_BINARY: "wget2"}}' \
|
|
| archivebox archiveresult create \
|
|
| archivebox run
|
|
|
|
# Chain processing: archive, then re-run any failures with increased timeout
|
|
archivebox crawl create https://slow-site.com \
|
|
| archivebox run \
|
|
| jq -c 'select(.type == "ArchiveResult" and .status == "failed")
|
|
| del(.id) | .status = "queued" | .overrides.TIMEOUT = "120"' \
|
|
| archivebox archiveresult create \
|
|
| archivebox run
|
|
```
|
|
|
|
### 5. Selective Extraction
|
|
```bash
|
|
# Create only screenshot extractions for queued snapshots
|
|
archivebox snapshot list --status=queued \
|
|
| archivebox archiveresult create --plugin=screenshot \
|
|
| archivebox run
|
|
|
|
# Re-run singlefile on everything that was skipped
|
|
archivebox archiveresult list --plugin=singlefile --status=skipped \
|
|
| archivebox archiveresult update --status=queued \
|
|
| archivebox run
|
|
```
|
|
|
|
### 6. Bulk Tag Management
|
|
```bash
|
|
# Tag all Twitter/X URLs (efficient DB filter, no jq needed)
|
|
archivebox snapshot list --url__icontains=twitter.com \
|
|
| archivebox snapshot update --tag=twitter
|
|
|
|
# Tag snapshots based on computed criteria (jq for logic DB can't do)
|
|
archivebox snapshot list --status=sealed \
|
|
| jq -c 'select(.archiveresult_count > 5) | . + {tags_str: (.tags_str + ",well-archived")}' \
|
|
| archivebox snapshot update
|
|
```
|
|
|
|
### 7. RSS Feed Monitoring
|
|
```bash
|
|
# Archive all items from an RSS feed
|
|
curl -s "https://hnrss.org/frontpage" \
|
|
| xq -r '.rss.channel.item[].link' \
|
|
| archivebox crawl create --tag=hackernews-$(date +%Y%m%d) \
|
|
| archivebox run
|
|
```
|
|
|
|
### 8. Recursive Link Following (run output → filter → re-run)
|
|
```bash
|
|
# Archive a page, then archive all PDFs it links to
|
|
archivebox crawl create https://research-papers.org/index.html \
|
|
| archivebox run \
|
|
| jq -c 'select(.type == "Snapshot") | .discovered_urls[]?
|
|
| select(endswith(".pdf")) | {url: .}' \
|
|
| archivebox crawl create --tag=linked-pdfs \
|
|
| archivebox run
|
|
|
|
# Depth crawl with custom handling: retry timeouts with longer timeout
|
|
archivebox crawl create --depth=1 https://example.com \
|
|
| archivebox run \
|
|
| jq -c 'select(.type == "ArchiveResult" and .status == "failed" and .error contains "timeout")
|
|
| del(.id) | .overrides.TIMEOUT = "300"' \
|
|
| archivebox archiveresult create \
|
|
| archivebox run
|
|
```
|
|
|
|
### Composability Summary
|
|
|
|
| Pattern | Example |
|
|
|---------|---------|
|
|
| **Filter → Process** | `list --status=failed --plugin=pdf \| run` |
|
|
| **Transform → Archive** | `curl API \| jq '{url, tags_str}' \| crawl create \| run` |
|
|
| **Retry w/ Changes** | `run \| jq 'select(.status=="failed") \| del(.id)' \| create \| run` |
|
|
| **Selective Extract** | `snapshot list \| archiveresult create --plugin=screenshot` |
|
|
| **Bulk Update** | `list --url__icontains=X \| update --tag=Y` |
|
|
| **Chain Processing** | `crawl \| run \| jq transform \| create \| run` |
|
|
|
|
The key insight: **`archivebox run` emits JSONL of everything it creates**, enabling:
|
|
- Retry failed items with different settings (timeouts, binaries, etc.)
|
|
- Recursive crawling (archive page → extract links → archive those)
|
|
- Chained transforms (filter failures, modify config, re-queue)
|
|
|
|
---
|
|
|
|
## Code Reuse Findings
|
|
|
|
### Existing Model Methods (USE THESE)
|
|
- `Crawl.to_json()`, `Crawl.from_json()`, `Crawl.to_jsonl()`, `Crawl.from_jsonl()`
|
|
- `Snapshot.to_json()`, `Snapshot.from_json()`, `Snapshot.to_jsonl()`, `Snapshot.from_jsonl()`
|
|
- `Tag.to_json()`, `Tag.from_json()`, `Tag.to_jsonl()`, `Tag.from_jsonl()`
|
|
|
|
### Missing Model Methods (MUST IMPLEMENT)
|
|
- **`ArchiveResult.from_json()`** - Does not exist, must be added
|
|
- **`ArchiveResult.from_jsonl()`** - Does not exist, must be added
|
|
|
|
### Existing Utilities (USE THESE)
|
|
- `archivebox/misc/jsonl.py`: `read_stdin()`, `read_args_or_stdin()`, `write_record()`, `parse_line()`
|
|
- Type constants: `TYPE_CRAWL`, `TYPE_SNAPSHOT`, `TYPE_ARCHIVERESULT`, etc.
|
|
|
|
### Duplicated Code (EXTRACT)
|
|
- `apply_filters()` duplicated in 7 CLI files → extract to `archivebox/cli/cli_utils.py`
|
|
|
|
### Supervisord Config (UPDATE)
|
|
- `archivebox/workers/supervisord_util.py` line ~35: `"command": "archivebox manage orchestrator"` → `"command": "archivebox run"`
|
|
|
|
### Field Name Standardization (FIX)
|
|
- **Issue**: `Crawl.to_json()` outputs `tags_str`, but `Snapshot.to_json()` outputs `tags`
|
|
- **Fix**: Standardize all models to use `tags_str` in JSONL output (matches model property names)
|
|
|
|
---
|
|
|
|
## Implementation Order
|
|
|
|
### Phase 1: Model Prerequisites
|
|
1. **Implement `ArchiveResult.from_json()`** in `archivebox/core/models.py`
|
|
- Pattern: Match `Snapshot.from_json()` and `Crawl.from_json()` style
|
|
- Handle: ID lookup (update existing) or create new
|
|
- Required fields: `snapshot_id`, `plugin`
|
|
- Optional fields: `status`, `hook_name`, etc.
|
|
|
|
2. **Implement `ArchiveResult.from_jsonl()`** in `archivebox/core/models.py`
|
|
- Filter records by `type='ArchiveResult'`
|
|
- Call `from_json()` for each matching record
|
|
|
|
3. **Fix `Snapshot.to_json()` field name**
|
|
- Change `'tags': self.tags_str()` → `'tags_str': self.tags_str()`
|
|
- Update any code that depends on `tags` key in Snapshot JSONL
|
|
|
|
### Phase 2: Shared Utilities
|
|
4. **Extract `apply_filters()` to `archivebox/cli/cli_utils.py`**
|
|
- Generic queryset filtering from CLI kwargs
|
|
- Support `--id__in=[csv]`, `--url__icontains=str`, etc.
|
|
- Remove duplicates from 7 CLI files
|
|
|
|
### Phase 3: Pass-Through Behavior (NEW FEATURE)
|
|
5. **Add pass-through to `archivebox crawl create`**
|
|
- Output non-Crawl input records unchanged
|
|
- Output created Crawl records
|
|
|
|
6. **Add pass-through to `archivebox snapshot create`**
|
|
- Output non-Snapshot/non-Crawl input records unchanged
|
|
- Process Crawl records → create Snapshots
|
|
- Output both original Crawl and created Snapshots
|
|
|
|
7. **Add pass-through to `archivebox archiveresult create`**
|
|
- Output non-Snapshot/non-ArchiveResult input records unchanged
|
|
- Process Snapshot records → create ArchiveResults
|
|
- Output both original Snapshots and created ArchiveResults
|
|
|
|
8. **Add create-or-update to `archivebox run`**
|
|
- Records WITH id: lookup and queue existing
|
|
- Records WITHOUT id: create via `Model.from_json()`, then queue
|
|
- Pass-through output of all processed records
|
|
|
|
### Phase 4: Test Infrastructure
|
|
9. **Create `archivebox/tests/conftest.py`** with pytest-django
|
|
- Use `pytest-django` for proper test database handling
|
|
- Isolated DATA_DIR per test via `tmp_path` fixture
|
|
- `run_archivebox_cmd()` helper for subprocess testing
|
|
|
|
### Phase 5: Unit Tests
|
|
10. **Create `archivebox/tests/test_cli_crawl.py`** - crawl create/list/pass-through tests
|
|
11. **Create `archivebox/tests/test_cli_snapshot.py`** - snapshot create/list/pass-through tests
|
|
12. **Create `archivebox/tests/test_cli_archiveresult.py`** - archiveresult create/list/pass-through tests
|
|
13. **Create `archivebox/tests/test_cli_run.py`** - run command create-or-update tests
|
|
|
|
### Phase 6: Integration & Config
|
|
14. **Extend `archivebox/cli/tests_piping.py`** - Add pass-through integration tests
|
|
15. **Update supervisord config** - `orchestrator` → `run`
|
|
|
|
---
|
|
|
|
## Future Work (Deferred)
|
|
|
|
### Commands to Defer
|
|
- `archivebox tag create|list|update|delete` - Already works, defer improvements
|
|
- `archivebox binary create|list|update|delete` - Lower priority
|
|
- `archivebox process list` - Lower priority
|
|
- `archivebox apikey create|list|update|delete` - Lower priority
|
|
|
|
### `archivebox add` Relationship
|
|
- **Current**: `archivebox add` is the primary user-facing command, stays as-is
|
|
- **Future**: Refactor `add` to internally use `crawl create | snapshot create | run` pipeline
|
|
- **Note**: This refactor is deferred; `add` continues to work independently for now
|
|
|
|
---
|
|
|
|
## Key Files
|
|
|
|
| File | Action | Phase |
|
|
|------|--------|-------|
|
|
| `archivebox/core/models.py` | Add `ArchiveResult.from_json()`, `from_jsonl()` | 1 |
|
|
| `archivebox/core/models.py` | Fix `Snapshot.to_json()` → `tags_str` | 1 |
|
|
| `archivebox/cli/cli_utils.py` | NEW - shared `apply_filters()` | 2 |
|
|
| `archivebox/cli/archivebox_crawl.py` | Add pass-through to create | 3 |
|
|
| `archivebox/cli/archivebox_snapshot.py` | Add pass-through to create | 3 |
|
|
| `archivebox/cli/archivebox_archiveresult.py` | Add pass-through to create | 3 |
|
|
| `archivebox/cli/archivebox_run.py` | Add create-or-update, pass-through | 3 |
|
|
| `archivebox/tests/conftest.py` | NEW - pytest fixtures | 4 |
|
|
| `archivebox/tests/test_cli_crawl.py` | NEW - crawl unit tests | 5 |
|
|
| `archivebox/tests/test_cli_snapshot.py` | NEW - snapshot unit tests | 5 |
|
|
| `archivebox/tests/test_cli_archiveresult.py` | NEW - archiveresult unit tests | 5 |
|
|
| `archivebox/tests/test_cli_run.py` | NEW - run unit tests | 5 |
|
|
| `archivebox/cli/tests_piping.py` | Extend with pass-through tests | 6 |
|
|
| `archivebox/workers/supervisord_util.py` | Update orchestrator→run | 6 |
|
|
|
|
---
|
|
|
|
## Implementation Details
|
|
|
|
### ArchiveResult.from_json() Design
|
|
|
|
```python
|
|
@staticmethod
|
|
def from_json(record: Dict[str, Any], overrides: Dict[str, Any] = None) -> 'ArchiveResult | None':
|
|
"""
|
|
Create or update a single ArchiveResult from a JSON record dict.
|
|
|
|
Args:
|
|
record: Dict with 'snapshot_id' and 'plugin' (required for create),
|
|
or 'id' (for update)
|
|
overrides: Dict of field overrides
|
|
|
|
Returns:
|
|
ArchiveResult instance or None if invalid
|
|
"""
|
|
from django.utils import timezone
|
|
|
|
overrides = overrides or {}
|
|
|
|
# If 'id' is provided, lookup and update existing
|
|
result_id = record.get('id')
|
|
if result_id:
|
|
try:
|
|
result = ArchiveResult.objects.get(id=result_id)
|
|
# Update fields from record
|
|
if record.get('status'):
|
|
result.status = record['status']
|
|
result.retry_at = timezone.now()
|
|
result.save()
|
|
return result
|
|
except ArchiveResult.DoesNotExist:
|
|
pass # Fall through to create
|
|
|
|
# Required fields for creation
|
|
snapshot_id = record.get('snapshot_id')
|
|
plugin = record.get('plugin')
|
|
|
|
if not snapshot_id or not plugin:
|
|
return None
|
|
|
|
try:
|
|
snapshot = Snapshot.objects.get(id=snapshot_id)
|
|
except Snapshot.DoesNotExist:
|
|
return None
|
|
|
|
# Create or get existing result
|
|
result, created = ArchiveResult.objects.get_or_create(
|
|
snapshot=snapshot,
|
|
plugin=plugin,
|
|
defaults={
|
|
'status': record.get('status', ArchiveResult.StatusChoices.QUEUED),
|
|
'retry_at': timezone.now(),
|
|
'hook_name': record.get('hook_name', ''),
|
|
**overrides,
|
|
}
|
|
)
|
|
|
|
# If not created, optionally reset for retry
|
|
if not created and record.get('status'):
|
|
result.status = record['status']
|
|
result.retry_at = timezone.now()
|
|
result.save()
|
|
|
|
return result
|
|
```
|
|
|
|
### Pass-Through Pattern
|
|
|
|
All `create` commands follow this pattern:
|
|
|
|
```python
|
|
def create_X(args, ...):
|
|
is_tty = sys.stdout.isatty()
|
|
records = list(read_args_or_stdin(args))
|
|
|
|
for record in records:
|
|
record_type = record.get('type')
|
|
|
|
# Pass-through: output records we don't handle
|
|
if record_type not in HANDLED_TYPES:
|
|
if not is_tty:
|
|
write_record(record)
|
|
continue
|
|
|
|
# Handle our type: create via Model.from_json()
|
|
obj = Model.from_json(record, overrides={...})
|
|
|
|
# Output created record (hydrated with db id)
|
|
if obj and not is_tty:
|
|
write_record(obj.to_json())
|
|
```
|
|
|
|
### Pass-Through Semantics Example
|
|
|
|
```
|
|
Input:
|
|
{"type": "Crawl", "id": "abc", "urls": "https://example.com", ...}
|
|
{"type": "Tag", "name": "important"}
|
|
|
|
archivebox snapshot create output:
|
|
{"type": "Crawl", "id": "abc", ...} # pass-through (not our type)
|
|
{"type": "Tag", "name": "important"} # pass-through (not our type)
|
|
{"type": "Snapshot", "id": "xyz", ...} # created from Crawl URLs
|
|
```
|
|
|
|
### Create-or-Update Pattern for `archivebox run`
|
|
|
|
```python
|
|
def process_stdin_records() -> int:
|
|
records = list(read_stdin())
|
|
is_tty = sys.stdout.isatty()
|
|
|
|
for record in records:
|
|
record_type = record.get('type')
|
|
record_id = record.get('id')
|
|
|
|
# Create-or-update based on whether ID exists
|
|
if record_type == TYPE_CRAWL:
|
|
if record_id:
|
|
try:
|
|
obj = Crawl.objects.get(id=record_id)
|
|
except Crawl.DoesNotExist:
|
|
obj = Crawl.from_json(record)
|
|
else:
|
|
obj = Crawl.from_json(record)
|
|
|
|
if obj:
|
|
obj.retry_at = timezone.now()
|
|
obj.save()
|
|
if not is_tty:
|
|
write_record(obj.to_json())
|
|
|
|
# Similar for Snapshot, ArchiveResult...
|
|
```
|
|
|
|
### Shared apply_filters() Design
|
|
|
|
Extract to `archivebox/cli/cli_utils.py`:
|
|
|
|
```python
|
|
"""Shared CLI utilities for ArchiveBox commands."""
|
|
|
|
from typing import Optional
|
|
|
|
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
|
|
"""
|
|
Apply Django-style filters from CLI kwargs to a QuerySet.
|
|
|
|
Supports: --status=queued, --url__icontains=example, --id__in=uuid1,uuid2
|
|
|
|
Args:
|
|
queryset: Django QuerySet to filter
|
|
filter_kwargs: Dict of filter key-value pairs from CLI
|
|
limit: Optional limit on results
|
|
|
|
Returns:
|
|
Filtered QuerySet
|
|
"""
|
|
filters = {}
|
|
for key, value in filter_kwargs.items():
|
|
if value is None or key in ('limit', 'offset'):
|
|
continue
|
|
# Handle CSV lists for __in filters
|
|
if key.endswith('__in') and isinstance(value, str):
|
|
value = [v.strip() for v in value.split(',')]
|
|
filters[key] = value
|
|
|
|
if filters:
|
|
queryset = queryset.filter(**filters)
|
|
if limit:
|
|
queryset = queryset[:limit]
|
|
|
|
return queryset
|
|
```
|
|
|
|
---
|
|
|
|
## conftest.py Design (pytest-django)
|
|
|
|
```python
|
|
"""archivebox/tests/conftest.py - Pytest fixtures for CLI tests."""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
import pytest
|
|
|
|
|
|
# =============================================================================
|
|
# Fixtures
|
|
# =============================================================================
|
|
|
|
@pytest.fixture
|
|
def isolated_data_dir(tmp_path, settings):
|
|
"""
|
|
Create isolated DATA_DIR for each test.
|
|
|
|
Uses tmp_path for isolation, configures Django settings.
|
|
"""
|
|
data_dir = tmp_path / 'archivebox_data'
|
|
data_dir.mkdir()
|
|
|
|
# Set environment for subprocess calls
|
|
os.environ['DATA_DIR'] = str(data_dir)
|
|
|
|
# Update Django settings
|
|
settings.DATA_DIR = data_dir
|
|
|
|
yield data_dir
|
|
|
|
# Cleanup handled by tmp_path fixture
|
|
|
|
|
|
@pytest.fixture
|
|
def initialized_archive(isolated_data_dir):
|
|
"""
|
|
Initialize ArchiveBox archive in isolated directory.
|
|
|
|
Runs `archivebox init` to set up database and directories.
|
|
"""
|
|
from archivebox.cli.archivebox_init import init
|
|
init(setup=True, quick=True)
|
|
return isolated_data_dir
|
|
|
|
|
|
@pytest.fixture
|
|
def cli_env(initialized_archive):
|
|
"""
|
|
Environment dict for CLI subprocess calls.
|
|
|
|
Includes DATA_DIR and disables slow extractors.
|
|
"""
|
|
return {
|
|
**os.environ,
|
|
'DATA_DIR': str(initialized_archive),
|
|
'USE_COLOR': 'False',
|
|
'SHOW_PROGRESS': 'False',
|
|
'SAVE_TITLE': 'True',
|
|
'SAVE_FAVICON': 'False',
|
|
'SAVE_WGET': 'False',
|
|
'SAVE_WARC': 'False',
|
|
'SAVE_PDF': 'False',
|
|
'SAVE_SCREENSHOT': 'False',
|
|
'SAVE_DOM': 'False',
|
|
'SAVE_SINGLEFILE': 'False',
|
|
'SAVE_READABILITY': 'False',
|
|
'SAVE_MERCURY': 'False',
|
|
'SAVE_GIT': 'False',
|
|
'SAVE_YTDLP': 'False',
|
|
'SAVE_HEADERS': 'False',
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# CLI Helpers
|
|
# =============================================================================
|
|
|
|
def run_archivebox_cmd(
|
|
args: List[str],
|
|
stdin: Optional[str] = None,
|
|
cwd: Optional[Path] = None,
|
|
env: Optional[Dict[str, str]] = None,
|
|
timeout: int = 60,
|
|
) -> Tuple[str, str, int]:
|
|
"""
|
|
Run archivebox command, return (stdout, stderr, returncode).
|
|
|
|
Args:
|
|
args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
|
|
stdin: Optional string to pipe to stdin
|
|
cwd: Working directory (defaults to DATA_DIR from env)
|
|
env: Environment variables (defaults to os.environ with DATA_DIR)
|
|
timeout: Command timeout in seconds
|
|
|
|
Returns:
|
|
Tuple of (stdout, stderr, returncode)
|
|
"""
|
|
cmd = [sys.executable, '-m', 'archivebox'] + args
|
|
|
|
env = env or {**os.environ}
|
|
cwd = cwd or Path(env.get('DATA_DIR', '.'))
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
input=stdin,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=cwd,
|
|
env=env,
|
|
timeout=timeout,
|
|
)
|
|
|
|
return result.stdout, result.stderr, result.returncode
|
|
|
|
|
|
# =============================================================================
|
|
# Output Assertions
|
|
# =============================================================================
|
|
|
|
def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]:
|
|
"""Parse JSONL output into list of dicts."""
|
|
records = []
|
|
for line in stdout.strip().split('\n'):
|
|
line = line.strip()
|
|
if line and line.startswith('{'):
|
|
try:
|
|
records.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return records
|
|
|
|
|
|
def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1):
|
|
"""Assert output contains at least min_count records of type."""
|
|
records = parse_jsonl_output(stdout)
|
|
matching = [r for r in records if r.get('type') == record_type]
|
|
assert len(matching) >= min_count, \
|
|
f"Expected >= {min_count} {record_type}, got {len(matching)}"
|
|
return matching
|
|
|
|
|
|
def assert_jsonl_pass_through(stdout: str, input_records: List[Dict[str, Any]]):
|
|
"""Assert that input records appear in output (pass-through behavior)."""
|
|
output_records = parse_jsonl_output(stdout)
|
|
output_ids = {r.get('id') for r in output_records if r.get('id')}
|
|
|
|
for input_rec in input_records:
|
|
input_id = input_rec.get('id')
|
|
if input_id:
|
|
assert input_id in output_ids, \
|
|
f"Input record {input_id} not found in output (pass-through failed)"
|
|
|
|
|
|
def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str]):
|
|
"""Assert record has all required fields with non-None values."""
|
|
for field in required_fields:
|
|
assert field in record, f"Record missing field: {field}"
|
|
assert record[field] is not None, f"Record field is None: {field}"
|
|
|
|
|
|
# =============================================================================
|
|
# Database Assertions
|
|
# =============================================================================
|
|
|
|
def assert_db_count(model_class, filters: Dict[str, Any], expected: int):
|
|
"""Assert database count matches expected."""
|
|
actual = model_class.objects.filter(**filters).count()
|
|
assert actual == expected, \
|
|
f"Expected {expected} {model_class.__name__}, got {actual}"
|
|
|
|
|
|
def assert_db_exists(model_class, **filters):
|
|
"""Assert at least one record exists matching filters."""
|
|
assert model_class.objects.filter(**filters).exists(), \
|
|
f"No {model_class.__name__} found matching {filters}"
|
|
|
|
|
|
# =============================================================================
|
|
# Test Data Factories
|
|
# =============================================================================
|
|
|
|
def create_test_url(domain: str = 'example.com', path: str = None) -> str:
|
|
"""Generate unique test URL."""
|
|
import uuid
|
|
path = path or uuid.uuid4().hex[:8]
|
|
return f'https://{domain}/{path}'
|
|
|
|
|
|
def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
|
|
"""Create Crawl JSONL record for testing."""
|
|
from archivebox.misc.jsonl import TYPE_CRAWL
|
|
|
|
urls = urls or [create_test_url()]
|
|
return {
|
|
'type': TYPE_CRAWL,
|
|
'urls': '\n'.join(urls),
|
|
'max_depth': kwargs.get('max_depth', 0),
|
|
'tags_str': kwargs.get('tags_str', ''),
|
|
'status': kwargs.get('status', 'queued'),
|
|
**{k: v for k, v in kwargs.items() if k not in ('max_depth', 'tags_str', 'status')},
|
|
}
|
|
|
|
|
|
def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
|
|
"""Create Snapshot JSONL record for testing."""
|
|
from archivebox.misc.jsonl import TYPE_SNAPSHOT
|
|
|
|
return {
|
|
'type': TYPE_SNAPSHOT,
|
|
'url': url or create_test_url(),
|
|
'tags_str': kwargs.get('tags_str', ''),
|
|
'status': kwargs.get('status', 'queued'),
|
|
**{k: v for k, v in kwargs.items() if k not in ('tags_str', 'status')},
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Test Rules
|
|
|
|
- **NO SKIPPING** - Every test runs
|
|
- **NO MOCKING** - Real subprocess calls, real database
|
|
- **NO DISABLING** - Failing tests identify real problems
|
|
- **MINIMAL CODE** - Import helpers from conftest.py
|
|
- **ISOLATED** - Each test gets its own DATA_DIR via `tmp_path`
|
|
|
|
---
|
|
|
|
## Task Checklist
|
|
|
|
### Phase 1: Model Prerequisites
|
|
- [x] Implement `ArchiveResult.from_json()` in `archivebox/core/models.py`
|
|
- [x] Implement `ArchiveResult.from_jsonl()` in `archivebox/core/models.py`
|
|
- [x] Fix `Snapshot.to_json()` to use `tags_str` instead of `tags`
|
|
|
|
### Phase 2: Shared Utilities
|
|
- [x] Create `archivebox/cli/cli_utils.py` with shared `apply_filters()`
|
|
- [x] Update 7 CLI files to import from `cli_utils.py`
|
|
|
|
### Phase 3: Pass-Through Behavior
|
|
- [x] Add pass-through to `archivebox_crawl.py` create
|
|
- [x] Add pass-through to `archivebox_snapshot.py` create
|
|
- [x] Add pass-through to `archivebox_archiveresult.py` create
|
|
- [x] Add create-or-update to `archivebox_run.py`
|
|
- [x] Add pass-through output to `archivebox_run.py`
|
|
|
|
### Phase 4: Test Infrastructure
|
|
- [x] Create `archivebox/tests/conftest.py` with pytest-django fixtures
|
|
|
|
### Phase 5: Unit Tests
|
|
- [x] Create `archivebox/tests/test_cli_crawl.py`
|
|
- [x] Create `archivebox/tests/test_cli_snapshot.py`
|
|
- [x] Create `archivebox/tests/test_cli_archiveresult.py`
|
|
- [x] Create `archivebox/tests/test_cli_run.py`
|
|
|
|
### Phase 6: Integration & Config
|
|
- [x] Extend `archivebox/cli/tests_piping.py` with pass-through tests
|
|
- [x] Update `archivebox/workers/supervisord_util.py`: orchestrator→run
|