Fix CLI tests to use subprocess and remove mocks

- Fix conftest.py: use subprocess for init, remove unused cli_env fixture
- Update all test files to use data_dir parameter instead of env
- Remove mock-based TestJSONLOutput class from tests_piping.py
- Remove unused imports (MagicMock, patch)
- Fix file permissions for cli_utils.py

All tests now use real subprocess calls per CLAUDE.md guidelines:
- NO MOCKS - tests exercise real code paths
- NO SKIPS - every test runs
This commit is contained in:
Claude
2025-12-31 10:53:45 +00:00
parent bb52b5902a
commit b87bbbbecb
6 changed files with 218 additions and 279 deletions

View File

@@ -10,29 +10,83 @@ from typing import List, Dict, Any, Optional, Tuple
import pytest
# =============================================================================
# CLI Helpers (defined before fixtures that use them)
# =============================================================================
def run_archivebox_cmd(
args: List[str],
data_dir: Path,
stdin: Optional[str] = None,
timeout: int = 60,
env: Optional[Dict[str, str]] = None,
) -> Tuple[str, str, int]:
"""
Run archivebox command via subprocess, return (stdout, stderr, returncode).
Args:
args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
data_dir: The DATA_DIR to use
stdin: Optional string to pipe to stdin
timeout: Command timeout in seconds
env: Additional environment variables
Returns:
Tuple of (stdout, stderr, returncode)
"""
cmd = [sys.executable, '-m', 'archivebox'] + args
base_env = os.environ.copy()
base_env['DATA_DIR'] = str(data_dir)
base_env['USE_COLOR'] = 'False'
base_env['SHOW_PROGRESS'] = 'False'
# Disable slow extractors for faster tests
base_env['SAVE_ARCHIVEDOTORG'] = 'False'
base_env['SAVE_TITLE'] = 'False'
base_env['SAVE_FAVICON'] = 'False'
base_env['SAVE_WGET'] = 'False'
base_env['SAVE_WARC'] = 'False'
base_env['SAVE_PDF'] = 'False'
base_env['SAVE_SCREENSHOT'] = 'False'
base_env['SAVE_DOM'] = 'False'
base_env['SAVE_SINGLEFILE'] = 'False'
base_env['SAVE_READABILITY'] = 'False'
base_env['SAVE_MERCURY'] = 'False'
base_env['SAVE_GIT'] = 'False'
base_env['SAVE_YTDLP'] = 'False'
base_env['SAVE_HEADERS'] = 'False'
base_env['SAVE_HTMLTOTEXT'] = 'False'
if env:
base_env.update(env)
result = subprocess.run(
cmd,
input=stdin,
capture_output=True,
text=True,
cwd=data_dir,
env=base_env,
timeout=timeout,
)
return result.stdout, result.stderr, result.returncode
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture
def isolated_data_dir(tmp_path, settings):
def isolated_data_dir(tmp_path):
"""
Create isolated DATA_DIR for each test.
Uses tmp_path for isolation, configures Django settings.
Uses tmp_path for complete isolation.
"""
data_dir = tmp_path / 'archivebox_data'
data_dir.mkdir()
# Set environment for subprocess calls
os.environ['DATA_DIR'] = str(data_dir)
# Update Django settings
settings.DATA_DIR = data_dir
yield data_dir
# Cleanup handled by tmp_path fixture
return data_dir
@pytest.fixture
@@ -40,81 +94,15 @@ def initialized_archive(isolated_data_dir):
"""
Initialize ArchiveBox archive in isolated directory.
Runs `archivebox init` to set up database and directories.
Runs `archivebox init` via subprocess to set up database and directories.
"""
from archivebox.cli.archivebox_init import init
init(setup=True, quick=True)
return isolated_data_dir
@pytest.fixture
def cli_env(initialized_archive):
"""
Environment dict for CLI subprocess calls.
Includes DATA_DIR and disables slow extractors.
"""
return {
**os.environ,
'DATA_DIR': str(initialized_archive),
'USE_COLOR': 'False',
'SHOW_PROGRESS': 'False',
'SAVE_TITLE': 'True',
'SAVE_FAVICON': 'False',
'SAVE_WGET': 'False',
'SAVE_WARC': 'False',
'SAVE_PDF': 'False',
'SAVE_SCREENSHOT': 'False',
'SAVE_DOM': 'False',
'SAVE_SINGLEFILE': 'False',
'SAVE_READABILITY': 'False',
'SAVE_MERCURY': 'False',
'SAVE_GIT': 'False',
'SAVE_YTDLP': 'False',
'SAVE_HEADERS': 'False',
}
# =============================================================================
# CLI Helpers
# =============================================================================
def run_archivebox_cmd(
args: List[str],
stdin: Optional[str] = None,
cwd: Optional[Path] = None,
env: Optional[Dict[str, str]] = None,
timeout: int = 60,
) -> Tuple[str, str, int]:
"""
Run archivebox command, return (stdout, stderr, returncode).
Args:
args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
stdin: Optional string to pipe to stdin
cwd: Working directory (defaults to DATA_DIR from env)
env: Environment variables (defaults to os.environ with DATA_DIR)
timeout: Command timeout in seconds
Returns:
Tuple of (stdout, stderr, returncode)
"""
cmd = [sys.executable, '-m', 'archivebox'] + args
env = env or {**os.environ}
cwd = cwd or Path(env.get('DATA_DIR', '.'))
result = subprocess.run(
cmd,
input=stdin,
capture_output=True,
text=True,
cwd=cwd,
env=env,
timeout=timeout,
stdout, stderr, returncode = run_archivebox_cmd(
['init', '--quick'],
data_dir=isolated_data_dir,
timeout=60,
)
return result.stdout, result.stderr, result.returncode
assert returncode == 0, f"archivebox init failed: {stderr}"
return isolated_data_dir
# =============================================================================
@@ -162,23 +150,6 @@ def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str])
assert record[field] is not None, f"Record field is None: {field}"
# =============================================================================
# Database Assertions
# =============================================================================
def assert_db_count(model_class, filters: Dict[str, Any], expected: int):
"""Assert database count matches expected."""
actual = model_class.objects.filter(**filters).count()
assert actual == expected, \
f"Expected {expected} {model_class.__name__}, got {actual}"
def assert_db_exists(model_class, **filters):
"""Assert at least one record exists matching filters."""
assert model_class.objects.filter(**filters).exists(), \
f"No {model_class.__name__} found matching {filters}"
# =============================================================================
# Test Data Factories
# =============================================================================
@@ -192,11 +163,9 @@ def create_test_url(domain: str = 'example.com', path: str = None) -> str:
def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
"""Create Crawl JSONL record for testing."""
from archivebox.misc.jsonl import TYPE_CRAWL
urls = urls or [create_test_url()]
return {
'type': TYPE_CRAWL,
'type': 'Crawl',
'urls': '\n'.join(urls),
'max_depth': kwargs.get('max_depth', 0),
'tags_str': kwargs.get('tags_str', ''),
@@ -207,10 +176,8 @@ def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
"""Create Snapshot JSONL record for testing."""
from archivebox.misc.jsonl import TYPE_SNAPSHOT
return {
'type': TYPE_SNAPSHOT,
'type': 'Snapshot',
'url': url or create_test_url(),
'tags_str': kwargs.get('tags_str', ''),
'status': kwargs.get('status', 'queued'),

View File

@@ -21,19 +21,19 @@ from archivebox.tests.conftest import (
class TestArchiveResultCreate:
"""Tests for `archivebox archiveresult create`."""
def test_create_from_snapshot_jsonl(self, cli_env, initialized_archive):
def test_create_from_snapshot_jsonl(self, initialized_archive):
"""Create archive results from Snapshot JSONL input."""
url = create_test_url()
# Create a snapshot first
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
# Pipe snapshot to archiveresult create
stdout2, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -47,16 +47,16 @@ class TestArchiveResultCreate:
ar = next(r for r in records if r['type'] == 'ArchiveResult')
assert ar['plugin'] == 'title'
def test_create_with_specific_plugin(self, cli_env, initialized_archive):
def test_create_with_specific_plugin(self, initialized_archive):
"""Create archive result for specific plugin."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=screenshot'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -65,25 +65,25 @@ class TestArchiveResultCreate:
assert len(ar_records) >= 1
assert ar_records[0]['plugin'] == 'screenshot'
def test_create_pass_through_crawl(self, cli_env, initialized_archive):
def test_create_pass_through_crawl(self, initialized_archive):
"""Pass-through Crawl records unchanged."""
url = create_test_url()
# Create crawl and snapshot
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['snapshot', 'create'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
# Now pipe all to archiveresult create
stdout3, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=stdout2,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -94,14 +94,14 @@ class TestArchiveResultCreate:
assert 'Snapshot' in types
assert 'ArchiveResult' in types
def test_create_pass_through_only_when_no_snapshots(self, cli_env, initialized_archive):
def test_create_pass_through_only_when_no_snapshots(self, initialized_archive):
"""Only pass-through records but no new snapshots returns success."""
crawl_record = {'type': 'Crawl', 'id': 'fake-id', 'urls': 'https://example.com'}
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'create'],
stdin=json.dumps(crawl_record),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -111,31 +111,31 @@ class TestArchiveResultCreate:
class TestArchiveResultList:
"""Tests for `archivebox archiveresult list`."""
def test_list_empty(self, cli_env, initialized_archive):
def test_list_empty(self, initialized_archive):
"""List with no archive results returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 archive results' in stderr
def test_list_filter_by_status(self, cli_env, initialized_archive):
def test_list_filter_by_status(self, initialized_archive):
"""Filter archive results by status."""
# Create snapshot and archive result
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--status=queued'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -143,20 +143,20 @@ class TestArchiveResultList:
for r in records:
assert r['status'] == 'queued'
def test_list_filter_by_plugin(self, cli_env, initialized_archive):
def test_list_filter_by_plugin(self, initialized_archive):
"""Filter archive results by plugin."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--plugin=title'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -164,22 +164,22 @@ class TestArchiveResultList:
for r in records:
assert r['plugin'] == 'title'
def test_list_with_limit(self, cli_env, initialized_archive):
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
# Create multiple archive results
for _ in range(3):
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--limit=2'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -190,23 +190,23 @@ class TestArchiveResultList:
class TestArchiveResultUpdate:
"""Tests for `archivebox archiveresult update`."""
def test_update_status(self, cli_env, initialized_archive):
def test_update_status(self, initialized_archive):
"""Update archive result status."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout3, stderr, code = run_archivebox_cmd(
['archiveresult', 'update', '--status=failed'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -219,45 +219,45 @@ class TestArchiveResultUpdate:
class TestArchiveResultDelete:
"""Tests for `archivebox archiveresult delete`."""
def test_delete_requires_yes(self, cli_env, initialized_archive):
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'delete'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, cli_env, initialized_archive):
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'delete', '--yes'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0

View File

@@ -23,13 +23,13 @@ from archivebox.tests.conftest import (
class TestCrawlCreate:
"""Tests for `archivebox crawl create`."""
def test_create_from_url_args(self, cli_env, initialized_archive):
def test_create_from_url_args(self, initialized_archive):
"""Create crawl from URL arguments."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -41,7 +41,7 @@ class TestCrawlCreate:
assert records[0]['type'] == 'Crawl'
assert url in records[0]['urls']
def test_create_from_stdin_urls(self, cli_env, initialized_archive):
def test_create_from_stdin_urls(self, initialized_archive):
"""Create crawl from stdin URLs (one per line)."""
urls = [create_test_url() for _ in range(3)]
stdin = '\n'.join(urls)
@@ -49,7 +49,7 @@ class TestCrawlCreate:
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=stdin,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -62,33 +62,33 @@ class TestCrawlCreate:
for url in urls:
assert url in crawl['urls']
def test_create_with_depth(self, cli_env, initialized_archive):
def test_create_with_depth(self, initialized_archive):
"""Create crawl with --depth flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', '--depth=2', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert records[0]['max_depth'] == 2
def test_create_with_tag(self, cli_env, initialized_archive):
def test_create_with_tag(self, initialized_archive):
"""Create crawl with --tag flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', '--tag=test-tag', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert 'test-tag' in records[0].get('tags_str', '')
def test_create_pass_through_other_types(self, cli_env, initialized_archive):
def test_create_pass_through_other_types(self, initialized_archive):
"""Pass-through records of other types unchanged."""
tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'}
url = create_test_url()
@@ -97,7 +97,7 @@ class TestCrawlCreate:
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=stdin,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -108,18 +108,18 @@ class TestCrawlCreate:
assert 'Tag' in types
assert 'Crawl' in types
def test_create_pass_through_existing_crawl(self, cli_env, initialized_archive):
def test_create_pass_through_existing_crawl(self, initialized_archive):
"""Existing Crawl records (with id) are passed through."""
# First create a crawl
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Now pipe it back - should pass through
stdout2, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -131,24 +131,24 @@ class TestCrawlCreate:
class TestCrawlList:
"""Tests for `archivebox crawl list`."""
def test_list_empty(self, cli_env, initialized_archive):
def test_list_empty(self, initialized_archive):
"""List with no crawls returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 crawls' in stderr
def test_list_returns_created(self, cli_env, initialized_archive):
def test_list_returns_created(self, initialized_archive):
"""List returns previously created crawls."""
url = create_test_url()
run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -156,14 +156,14 @@ class TestCrawlList:
assert len(records) >= 1
assert any(url in r.get('urls', '') for r in records)
def test_list_filter_by_status(self, cli_env, initialized_archive):
def test_list_filter_by_status(self, initialized_archive):
"""Filter crawls by status."""
url = create_test_url()
run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list', '--status=queued'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -171,15 +171,15 @@ class TestCrawlList:
for r in records:
assert r['status'] == 'queued'
def test_list_with_limit(self, cli_env, initialized_archive):
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
# Create multiple crawls
for _ in range(3):
run_archivebox_cmd(['crawl', 'create', create_test_url()], env=cli_env)
run_archivebox_cmd(['crawl', 'create', create_test_url()], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list', '--limit=2'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -190,18 +190,18 @@ class TestCrawlList:
class TestCrawlUpdate:
"""Tests for `archivebox crawl update`."""
def test_update_status(self, cli_env, initialized_archive):
def test_update_status(self, initialized_archive):
"""Update crawl status."""
# Create a crawl
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Update it
stdout2, stderr, code = run_archivebox_cmd(
['crawl', 'update', '--status=started'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -214,46 +214,46 @@ class TestCrawlUpdate:
class TestCrawlDelete:
"""Tests for `archivebox crawl delete`."""
def test_delete_requires_yes(self, cli_env, initialized_archive):
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, cli_env, initialized_archive):
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete', '--yes'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Deleted 1 crawls' in stderr
def test_delete_dry_run(self, cli_env, initialized_archive):
def test_delete_dry_run(self, initialized_archive):
"""Dry run shows what would be deleted."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete', '--dry-run'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0

View File

@@ -22,14 +22,14 @@ from archivebox.tests.conftest import (
class TestRunWithCrawl:
"""Tests for `archivebox run` with Crawl input."""
def test_run_with_new_crawl(self, cli_env, initialized_archive):
def test_run_with_new_crawl(self, initialized_archive):
"""Run creates and processes a new Crawl (no id)."""
crawl_record = create_test_crawl_json()
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl_record),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -41,19 +41,19 @@ class TestRunWithCrawl:
assert len(crawl_records) >= 1
assert crawl_records[0].get('id') # Should have an id now
def test_run_with_existing_crawl(self, cli_env, initialized_archive):
def test_run_with_existing_crawl(self, initialized_archive):
"""Run re-queues an existing Crawl (with id)."""
url = create_test_url()
# First create a crawl
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Run with the existing crawl
stdout2, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -65,14 +65,14 @@ class TestRunWithCrawl:
class TestRunWithSnapshot:
"""Tests for `archivebox run` with Snapshot input."""
def test_run_with_new_snapshot(self, cli_env, initialized_archive):
def test_run_with_new_snapshot(self, initialized_archive):
"""Run creates and processes a new Snapshot (no id, just url)."""
snapshot_record = create_test_snapshot_json()
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(snapshot_record),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -83,19 +83,19 @@ class TestRunWithSnapshot:
assert len(snapshot_records) >= 1
assert snapshot_records[0].get('id')
def test_run_with_existing_snapshot(self, cli_env, initialized_archive):
def test_run_with_existing_snapshot(self, initialized_archive):
"""Run re-queues an existing Snapshot (with id)."""
url = create_test_url()
# First create a snapshot
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
# Run with the existing snapshot
stdout2, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -103,7 +103,7 @@ class TestRunWithSnapshot:
records = parse_jsonl_output(stdout2)
assert len(records) >= 1
def test_run_with_plain_url(self, cli_env, initialized_archive):
def test_run_with_plain_url(self, initialized_archive):
"""Run accepts plain URL records (no type field)."""
url = create_test_url()
url_record = {'url': url}
@@ -111,7 +111,7 @@ class TestRunWithSnapshot:
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(url_record),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -123,18 +123,18 @@ class TestRunWithSnapshot:
class TestRunWithArchiveResult:
"""Tests for `archivebox run` with ArchiveResult input."""
def test_run_requeues_failed_archiveresult(self, cli_env, initialized_archive):
def test_run_requeues_failed_archiveresult(self, initialized_archive):
"""Run re-queues a failed ArchiveResult."""
url = create_test_url()
# Create snapshot and archive result
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
@@ -143,14 +143,14 @@ class TestRunWithArchiveResult:
run_archivebox_cmd(
['archiveresult', 'update', '--status=failed'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
)
# Now run should re-queue it
stdout3, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(ar),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -163,14 +163,14 @@ class TestRunWithArchiveResult:
class TestRunPassThrough:
"""Tests for pass-through behavior in `archivebox run`."""
def test_run_passes_through_unknown_types(self, cli_env, initialized_archive):
def test_run_passes_through_unknown_types(self, initialized_archive):
"""Run passes through records with unknown types."""
unknown_record = {'type': 'Unknown', 'id': 'fake-id', 'data': 'test'}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(unknown_record),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -179,7 +179,7 @@ class TestRunPassThrough:
assert len(unknown_records) == 1
assert unknown_records[0]['data'] == 'test'
def test_run_outputs_all_processed_records(self, cli_env, initialized_archive):
def test_run_outputs_all_processed_records(self, initialized_archive):
"""Run outputs all processed records for chaining."""
url = create_test_url()
crawl_record = create_test_crawl_json(urls=[url])
@@ -187,7 +187,7 @@ class TestRunPassThrough:
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl_record),
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -200,7 +200,7 @@ class TestRunPassThrough:
class TestRunMixedInput:
"""Tests for `archivebox run` with mixed record types."""
def test_run_handles_mixed_types(self, cli_env, initialized_archive):
def test_run_handles_mixed_types(self, initialized_archive):
"""Run handles mixed Crawl/Snapshot/ArchiveResult input."""
crawl = create_test_crawl_json()
snapshot = create_test_snapshot_json()
@@ -215,7 +215,7 @@ class TestRunMixedInput:
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=stdin,
env=cli_env,
data_dir=initialized_archive,
timeout=120,
)
@@ -230,24 +230,24 @@ class TestRunMixedInput:
class TestRunEmpty:
"""Tests for `archivebox run` edge cases."""
def test_run_empty_stdin(self, cli_env, initialized_archive):
def test_run_empty_stdin(self, initialized_archive):
"""Run with empty stdin returns success."""
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin='',
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
def test_run_no_records_to_process(self, cli_env, initialized_archive):
def test_run_no_records_to_process(self, initialized_archive):
"""Run with only pass-through records shows message."""
unknown = {'type': 'Unknown', 'id': 'fake'}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(unknown),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0

View File

@@ -22,13 +22,13 @@ from archivebox.tests.conftest import (
class TestSnapshotCreate:
"""Tests for `archivebox snapshot create`."""
def test_create_from_url_args(self, cli_env, initialized_archive):
def test_create_from_url_args(self, initialized_archive):
"""Create snapshot from URL arguments."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -39,19 +39,19 @@ class TestSnapshotCreate:
assert records[0]['type'] == 'Snapshot'
assert records[0]['url'] == url
def test_create_from_crawl_jsonl(self, cli_env, initialized_archive):
def test_create_from_crawl_jsonl(self, initialized_archive):
"""Create snapshots from Crawl JSONL input."""
url = create_test_url()
# First create a crawl
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Pipe crawl to snapshot create
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'create'],
stdin=json.dumps(crawl),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
@@ -65,20 +65,20 @@ class TestSnapshotCreate:
snapshot = next(r for r in records if r['type'] == 'Snapshot')
assert snapshot['url'] == url
def test_create_with_tag(self, cli_env, initialized_archive):
def test_create_with_tag(self, initialized_archive):
"""Create snapshot with --tag flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create', '--tag=test-tag', url],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert 'test-tag' in records[0].get('tags_str', '')
def test_create_pass_through_other_types(self, cli_env, initialized_archive):
def test_create_pass_through_other_types(self, initialized_archive):
"""Pass-through records of other types unchanged."""
tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'}
url = create_test_url()
@@ -87,7 +87,7 @@ class TestSnapshotCreate:
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create'],
stdin=stdin,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -97,13 +97,13 @@ class TestSnapshotCreate:
assert 'Tag' in types
assert 'Snapshot' in types
def test_create_multiple_urls(self, cli_env, initialized_archive):
def test_create_multiple_urls(self, initialized_archive):
"""Create snapshots from multiple URLs."""
urls = [create_test_url() for _ in range(3)]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create'] + urls,
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -118,24 +118,24 @@ class TestSnapshotCreate:
class TestSnapshotList:
"""Tests for `archivebox snapshot list`."""
def test_list_empty(self, cli_env, initialized_archive):
def test_list_empty(self, initialized_archive):
"""List with no snapshots returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 snapshots' in stderr
def test_list_returns_created(self, cli_env, initialized_archive):
def test_list_returns_created(self, initialized_archive):
"""List returns previously created snapshots."""
url = create_test_url()
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -143,14 +143,14 @@ class TestSnapshotList:
assert len(records) >= 1
assert any(r.get('url') == url for r in records)
def test_list_filter_by_status(self, cli_env, initialized_archive):
def test_list_filter_by_status(self, initialized_archive):
"""Filter snapshots by status."""
url = create_test_url()
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--status=queued'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -158,14 +158,14 @@ class TestSnapshotList:
for r in records:
assert r['status'] == 'queued'
def test_list_filter_by_url_contains(self, cli_env, initialized_archive):
def test_list_filter_by_url_contains(self, initialized_archive):
"""Filter snapshots by URL contains."""
url = create_test_url(domain='unique-domain-12345.com')
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--url__icontains=unique-domain-12345'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -173,14 +173,14 @@ class TestSnapshotList:
assert len(records) == 1
assert 'unique-domain-12345' in records[0]['url']
def test_list_with_limit(self, cli_env, initialized_archive):
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
for _ in range(3):
run_archivebox_cmd(['snapshot', 'create', create_test_url()], env=cli_env)
run_archivebox_cmd(['snapshot', 'create', create_test_url()], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--limit=2'],
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -191,16 +191,16 @@ class TestSnapshotList:
class TestSnapshotUpdate:
"""Tests for `archivebox snapshot update`."""
def test_update_status(self, cli_env, initialized_archive):
def test_update_status(self, initialized_archive):
"""Update snapshot status."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'update', '--status=started'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -209,16 +209,16 @@ class TestSnapshotUpdate:
records = parse_jsonl_output(stdout2)
assert records[0]['status'] == 'started'
def test_update_add_tag(self, cli_env, initialized_archive):
def test_update_add_tag(self, initialized_archive):
"""Update snapshot by adding tag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'update', '--tag=new-tag'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
@@ -228,46 +228,46 @@ class TestSnapshotUpdate:
class TestSnapshotDelete:
"""Tests for `archivebox snapshot delete`."""
def test_delete_requires_yes(self, cli_env, initialized_archive):
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, cli_env, initialized_archive):
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete', '--yes'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0
assert 'Deleted 1 snapshots' in stderr
def test_delete_dry_run(self, cli_env, initialized_archive):
def test_delete_dry_run(self, initialized_archive):
"""Dry run shows what would be deleted."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete', '--dry-run'],
stdin=json.dumps(snapshot),
env=cli_env,
data_dir=initialized_archive,
)
assert code == 0