mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-04 18:05:36 +10:00
Phase 1: Model Prerequisites - Add ArchiveResult.from_json() and from_jsonl() methods - Fix Snapshot.to_json() to use tags_str (consistent with Crawl) Phase 2: Shared Utilities - Create archivebox/cli/cli_utils.py with shared apply_filters() - Update 7 CLI files to import from cli_utils.py instead of duplicating Phase 3: Pass-Through Behavior - Add pass-through to crawl create (non-Crawl records pass unchanged) - Add pass-through to snapshot create (Crawl records + others pass through) - Add pass-through to archiveresult create (Snapshot records + others) - Add create-or-update behavior to run command: - Records WITHOUT id: Create via Model.from_json() - Records WITH id: Lookup existing, re-queue - Outputs JSONL of all processed records for chaining Phase 4: Test Infrastructure - Create archivebox/tests/conftest.py with pytest-django fixtures - Include CLI helpers, output assertions, database assertions Phase 6: Config Update - Update supervisord_util.py: orchestrator -> run command This enables Unix-style piping: archivebox crawl create URL | archivebox run archivebox archiveresult list --status=failed | archivebox run curl API | jq transform | archivebox crawl create | archivebox run
219 lines
6.8 KiB
Python
219 lines
6.8 KiB
Python
"""archivebox/tests/conftest.py - Pytest fixtures for CLI tests."""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
import pytest
|
|
|
|
|
|
# =============================================================================
|
|
# Fixtures
|
|
# =============================================================================
|
|
|
|
@pytest.fixture
|
|
def isolated_data_dir(tmp_path, settings):
|
|
"""
|
|
Create isolated DATA_DIR for each test.
|
|
|
|
Uses tmp_path for isolation, configures Django settings.
|
|
"""
|
|
data_dir = tmp_path / 'archivebox_data'
|
|
data_dir.mkdir()
|
|
|
|
# Set environment for subprocess calls
|
|
os.environ['DATA_DIR'] = str(data_dir)
|
|
|
|
# Update Django settings
|
|
settings.DATA_DIR = data_dir
|
|
|
|
yield data_dir
|
|
|
|
# Cleanup handled by tmp_path fixture
|
|
|
|
|
|
@pytest.fixture
|
|
def initialized_archive(isolated_data_dir):
|
|
"""
|
|
Initialize ArchiveBox archive in isolated directory.
|
|
|
|
Runs `archivebox init` to set up database and directories.
|
|
"""
|
|
from archivebox.cli.archivebox_init import init
|
|
init(setup=True, quick=True)
|
|
return isolated_data_dir
|
|
|
|
|
|
@pytest.fixture
|
|
def cli_env(initialized_archive):
|
|
"""
|
|
Environment dict for CLI subprocess calls.
|
|
|
|
Includes DATA_DIR and disables slow extractors.
|
|
"""
|
|
return {
|
|
**os.environ,
|
|
'DATA_DIR': str(initialized_archive),
|
|
'USE_COLOR': 'False',
|
|
'SHOW_PROGRESS': 'False',
|
|
'SAVE_TITLE': 'True',
|
|
'SAVE_FAVICON': 'False',
|
|
'SAVE_WGET': 'False',
|
|
'SAVE_WARC': 'False',
|
|
'SAVE_PDF': 'False',
|
|
'SAVE_SCREENSHOT': 'False',
|
|
'SAVE_DOM': 'False',
|
|
'SAVE_SINGLEFILE': 'False',
|
|
'SAVE_READABILITY': 'False',
|
|
'SAVE_MERCURY': 'False',
|
|
'SAVE_GIT': 'False',
|
|
'SAVE_YTDLP': 'False',
|
|
'SAVE_HEADERS': 'False',
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# CLI Helpers
|
|
# =============================================================================
|
|
|
|
def run_archivebox_cmd(
|
|
args: List[str],
|
|
stdin: Optional[str] = None,
|
|
cwd: Optional[Path] = None,
|
|
env: Optional[Dict[str, str]] = None,
|
|
timeout: int = 60,
|
|
) -> Tuple[str, str, int]:
|
|
"""
|
|
Run archivebox command, return (stdout, stderr, returncode).
|
|
|
|
Args:
|
|
args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
|
|
stdin: Optional string to pipe to stdin
|
|
cwd: Working directory (defaults to DATA_DIR from env)
|
|
env: Environment variables (defaults to os.environ with DATA_DIR)
|
|
timeout: Command timeout in seconds
|
|
|
|
Returns:
|
|
Tuple of (stdout, stderr, returncode)
|
|
"""
|
|
cmd = [sys.executable, '-m', 'archivebox'] + args
|
|
|
|
env = env or {**os.environ}
|
|
cwd = cwd or Path(env.get('DATA_DIR', '.'))
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
input=stdin,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=cwd,
|
|
env=env,
|
|
timeout=timeout,
|
|
)
|
|
|
|
return result.stdout, result.stderr, result.returncode
|
|
|
|
|
|
# =============================================================================
|
|
# Output Assertions
|
|
# =============================================================================
|
|
|
|
def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]:
|
|
"""Parse JSONL output into list of dicts."""
|
|
records = []
|
|
for line in stdout.strip().split('\n'):
|
|
line = line.strip()
|
|
if line and line.startswith('{'):
|
|
try:
|
|
records.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return records
|
|
|
|
|
|
def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1):
|
|
"""Assert output contains at least min_count records of type."""
|
|
records = parse_jsonl_output(stdout)
|
|
matching = [r for r in records if r.get('type') == record_type]
|
|
assert len(matching) >= min_count, \
|
|
f"Expected >= {min_count} {record_type}, got {len(matching)}"
|
|
return matching
|
|
|
|
|
|
def assert_jsonl_pass_through(stdout: str, input_records: List[Dict[str, Any]]):
|
|
"""Assert that input records appear in output (pass-through behavior)."""
|
|
output_records = parse_jsonl_output(stdout)
|
|
output_ids = {r.get('id') for r in output_records if r.get('id')}
|
|
|
|
for input_rec in input_records:
|
|
input_id = input_rec.get('id')
|
|
if input_id:
|
|
assert input_id in output_ids, \
|
|
f"Input record {input_id} not found in output (pass-through failed)"
|
|
|
|
|
|
def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str]):
|
|
"""Assert record has all required fields with non-None values."""
|
|
for field in required_fields:
|
|
assert field in record, f"Record missing field: {field}"
|
|
assert record[field] is not None, f"Record field is None: {field}"
|
|
|
|
|
|
# =============================================================================
|
|
# Database Assertions
|
|
# =============================================================================
|
|
|
|
def assert_db_count(model_class, filters: Dict[str, Any], expected: int):
|
|
"""Assert database count matches expected."""
|
|
actual = model_class.objects.filter(**filters).count()
|
|
assert actual == expected, \
|
|
f"Expected {expected} {model_class.__name__}, got {actual}"
|
|
|
|
|
|
def assert_db_exists(model_class, **filters):
|
|
"""Assert at least one record exists matching filters."""
|
|
assert model_class.objects.filter(**filters).exists(), \
|
|
f"No {model_class.__name__} found matching {filters}"
|
|
|
|
|
|
# =============================================================================
|
|
# Test Data Factories
|
|
# =============================================================================
|
|
|
|
def create_test_url(domain: str = 'example.com', path: str = None) -> str:
|
|
"""Generate unique test URL."""
|
|
import uuid
|
|
path = path or uuid.uuid4().hex[:8]
|
|
return f'https://{domain}/{path}'
|
|
|
|
|
|
def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
|
|
"""Create Crawl JSONL record for testing."""
|
|
from archivebox.misc.jsonl import TYPE_CRAWL
|
|
|
|
urls = urls or [create_test_url()]
|
|
return {
|
|
'type': TYPE_CRAWL,
|
|
'urls': '\n'.join(urls),
|
|
'max_depth': kwargs.get('max_depth', 0),
|
|
'tags_str': kwargs.get('tags_str', ''),
|
|
'status': kwargs.get('status', 'queued'),
|
|
**{k: v for k, v in kwargs.items() if k not in ('max_depth', 'tags_str', 'status')},
|
|
}
|
|
|
|
|
|
def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
|
|
"""Create Snapshot JSONL record for testing."""
|
|
from archivebox.misc.jsonl import TYPE_SNAPSHOT
|
|
|
|
return {
|
|
'type': TYPE_SNAPSHOT,
|
|
'url': url or create_test_url(),
|
|
'tags_str': kwargs.get('tags_str', ''),
|
|
'status': kwargs.get('status', 'queued'),
|
|
**{k: v for k, v in kwargs.items() if k not in ('tags_str', 'status')},
|
|
}
|