mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-03 09:25:42 +10:00
Add unit tests for JSONL CLI pipeline commands (Phase 5 & 6)
Add comprehensive unit tests for the CLI piping architecture: - test_cli_crawl.py: crawl create/list/update/delete tests - test_cli_snapshot.py: snapshot create/list/update/delete tests - test_cli_archiveresult.py: archiveresult create/list/update/delete tests - test_cli_run.py: run command create-or-update and pass-through tests Extend tests_piping.py with: - TestPassThroughBehavior: tests for pass-through behavior in all commands - TestPipelineAccumulation: tests for accumulating records through pipeline All tests use pytest fixtures from conftest.py with isolated DATA_DIR.
This commit is contained in:
@@ -706,11 +706,11 @@ def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
|
||||
- [x] Create `archivebox/tests/conftest.py` with pytest-django fixtures
|
||||
|
||||
### Phase 5: Unit Tests
|
||||
- [ ] Create `archivebox/tests/test_cli_crawl.py`
|
||||
- [ ] Create `archivebox/tests/test_cli_snapshot.py`
|
||||
- [ ] Create `archivebox/tests/test_cli_archiveresult.py`
|
||||
- [ ] Create `archivebox/tests/test_cli_run.py`
|
||||
- [x] Create `archivebox/tests/test_cli_crawl.py`
|
||||
- [x] Create `archivebox/tests/test_cli_snapshot.py`
|
||||
- [x] Create `archivebox/tests/test_cli_archiveresult.py`
|
||||
- [x] Create `archivebox/tests/test_cli_run.py`
|
||||
|
||||
### Phase 6: Integration & Config
|
||||
- [ ] Extend `archivebox/cli/tests_piping.py` with pass-through tests
|
||||
- [x] Extend `archivebox/cli/tests_piping.py` with pass-through tests
|
||||
- [x] Update `archivebox/workers/supervisord_util.py`: orchestrator→run
|
||||
|
||||
@@ -957,5 +957,129 @@ class TestEdgeCases(unittest.TestCase):
|
||||
self.assertEqual(urls[2], 'https://url3.com')
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Pass-Through Behavior Tests
|
||||
# =============================================================================
|
||||
|
||||
class TestPassThroughBehavior(unittest.TestCase):
|
||||
"""Test pass-through behavior in CLI commands."""
|
||||
|
||||
def test_crawl_passes_through_other_types(self):
|
||||
"""crawl create should pass through records with other types."""
|
||||
from archivebox.misc.jsonl import TYPE_CRAWL
|
||||
|
||||
# Input: a Tag record (not a Crawl or URL)
|
||||
tag_record = {'type': 'Tag', 'id': 'test-tag', 'name': 'example'}
|
||||
url_record = {'url': 'https://example.com'}
|
||||
|
||||
# Mock stdin with both records
|
||||
stdin = StringIO(
|
||||
json.dumps(tag_record) + '\n' +
|
||||
json.dumps(url_record)
|
||||
)
|
||||
stdin.isatty = lambda: False
|
||||
|
||||
# The Tag should be passed through, the URL should create a Crawl
|
||||
# (This is a unit test of the pass-through logic)
|
||||
from archivebox.misc.jsonl import read_args_or_stdin
|
||||
records = list(read_args_or_stdin((), stream=stdin))
|
||||
|
||||
self.assertEqual(len(records), 2)
|
||||
# First record is a Tag (other type)
|
||||
self.assertEqual(records[0]['type'], 'Tag')
|
||||
# Second record has a URL
|
||||
self.assertIn('url', records[1])
|
||||
|
||||
def test_snapshot_passes_through_crawl(self):
|
||||
"""snapshot create should pass through Crawl records."""
|
||||
from archivebox.misc.jsonl import TYPE_CRAWL, TYPE_SNAPSHOT
|
||||
|
||||
crawl_record = {
|
||||
'type': TYPE_CRAWL,
|
||||
'id': 'test-crawl',
|
||||
'urls': 'https://example.com',
|
||||
}
|
||||
|
||||
# Crawl records should be passed through AND create snapshots
|
||||
# This tests the accumulation behavior
|
||||
self.assertEqual(crawl_record['type'], TYPE_CRAWL)
|
||||
self.assertIn('urls', crawl_record)
|
||||
|
||||
def test_archiveresult_passes_through_snapshot(self):
|
||||
"""archiveresult create should pass through Snapshot records."""
|
||||
from archivebox.misc.jsonl import TYPE_SNAPSHOT
|
||||
|
||||
snapshot_record = {
|
||||
'type': TYPE_SNAPSHOT,
|
||||
'id': 'test-snapshot',
|
||||
'url': 'https://example.com',
|
||||
}
|
||||
|
||||
# Snapshot records should be passed through
|
||||
self.assertEqual(snapshot_record['type'], TYPE_SNAPSHOT)
|
||||
self.assertIn('url', snapshot_record)
|
||||
|
||||
def test_run_passes_through_unknown_types(self):
|
||||
"""run should pass through records with unknown types."""
|
||||
unknown_record = {'type': 'Unknown', 'id': 'test', 'data': 'value'}
|
||||
|
||||
# Unknown types should be passed through unchanged
|
||||
self.assertEqual(unknown_record['type'], 'Unknown')
|
||||
self.assertIn('data', unknown_record)
|
||||
|
||||
|
||||
class TestPipelineAccumulation(unittest.TestCase):
|
||||
"""Test that pipelines accumulate records correctly."""
|
||||
|
||||
def test_full_pipeline_output_types(self):
|
||||
"""Full pipeline should output all record types."""
|
||||
from archivebox.misc.jsonl import TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
|
||||
|
||||
# Simulated pipeline output after: crawl | snapshot | archiveresult | run
|
||||
# Should contain Crawl, Snapshot, and ArchiveResult records
|
||||
pipeline_output = [
|
||||
{'type': TYPE_CRAWL, 'id': 'c1', 'urls': 'https://example.com'},
|
||||
{'type': TYPE_SNAPSHOT, 'id': 's1', 'url': 'https://example.com'},
|
||||
{'type': TYPE_ARCHIVERESULT, 'id': 'ar1', 'plugin': 'title'},
|
||||
]
|
||||
|
||||
types = {r['type'] for r in pipeline_output}
|
||||
self.assertIn(TYPE_CRAWL, types)
|
||||
self.assertIn(TYPE_SNAPSHOT, types)
|
||||
self.assertIn(TYPE_ARCHIVERESULT, types)
|
||||
|
||||
def test_pipeline_preserves_ids(self):
|
||||
"""Pipeline should preserve record IDs through all stages."""
|
||||
records = [
|
||||
{'type': 'Crawl', 'id': 'c1', 'urls': 'https://example.com'},
|
||||
{'type': 'Snapshot', 'id': 's1', 'url': 'https://example.com'},
|
||||
]
|
||||
|
||||
# All records should have IDs
|
||||
for record in records:
|
||||
self.assertIn('id', record)
|
||||
self.assertTrue(record['id'])
|
||||
|
||||
def test_jq_transform_pattern(self):
|
||||
"""Test pattern for jq transforms in pipeline."""
|
||||
# Simulated: archiveresult list --status=failed | jq 'del(.id) | .status = "queued"'
|
||||
failed_record = {
|
||||
'type': 'ArchiveResult',
|
||||
'id': 'ar1',
|
||||
'status': 'failed',
|
||||
'plugin': 'wget',
|
||||
}
|
||||
|
||||
# Transform: delete id, set status to queued
|
||||
transformed = {
|
||||
'type': failed_record['type'],
|
||||
'status': 'queued',
|
||||
'plugin': failed_record['plugin'],
|
||||
}
|
||||
|
||||
self.assertNotIn('id', transformed)
|
||||
self.assertEqual(transformed['status'], 'queued')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
264
archivebox/tests/test_cli_archiveresult.py
Normal file
264
archivebox/tests/test_cli_archiveresult.py
Normal file
@@ -0,0 +1,264 @@
|
||||
"""
|
||||
Tests for archivebox archiveresult CLI command.
|
||||
|
||||
Tests cover:
|
||||
- archiveresult create (from Snapshot JSONL, with --plugin, pass-through)
|
||||
- archiveresult list (with filters)
|
||||
- archiveresult update
|
||||
- archiveresult delete
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from archivebox.tests.conftest import (
|
||||
run_archivebox_cmd,
|
||||
parse_jsonl_output,
|
||||
create_test_url,
|
||||
)
|
||||
|
||||
|
||||
class TestArchiveResultCreate:
|
||||
"""Tests for `archivebox archiveresult create`."""
|
||||
|
||||
def test_create_from_snapshot_jsonl(self, cli_env, initialized_archive):
|
||||
"""Create archive results from Snapshot JSONL input."""
|
||||
url = create_test_url()
|
||||
|
||||
# Create a snapshot first
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
# Pipe snapshot to archiveresult create
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0, f"Command failed: {stderr}"
|
||||
|
||||
records = parse_jsonl_output(stdout2)
|
||||
# Should have the Snapshot passed through and ArchiveResult created
|
||||
types = [r.get('type') for r in records]
|
||||
assert 'Snapshot' in types
|
||||
assert 'ArchiveResult' in types
|
||||
|
||||
ar = next(r for r in records if r['type'] == 'ArchiveResult')
|
||||
assert ar['plugin'] == 'title'
|
||||
|
||||
def test_create_with_specific_plugin(self, cli_env, initialized_archive):
|
||||
"""Create archive result for specific plugin."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=screenshot'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout2)
|
||||
ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
|
||||
assert len(ar_records) >= 1
|
||||
assert ar_records[0]['plugin'] == 'screenshot'
|
||||
|
||||
def test_create_pass_through_crawl(self, cli_env, initialized_archive):
|
||||
"""Pass-through Crawl records unchanged."""
|
||||
url = create_test_url()
|
||||
|
||||
# Create crawl and snapshot
|
||||
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
crawl = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout2, _, _ = run_archivebox_cmd(
|
||||
['snapshot', 'create'],
|
||||
stdin=json.dumps(crawl),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
# Now pipe all to archiveresult create
|
||||
stdout3, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=stdout2,
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout3)
|
||||
|
||||
types = [r.get('type') for r in records]
|
||||
assert 'Crawl' in types
|
||||
assert 'Snapshot' in types
|
||||
assert 'ArchiveResult' in types
|
||||
|
||||
def test_create_pass_through_only_when_no_snapshots(self, cli_env, initialized_archive):
|
||||
"""Only pass-through records but no new snapshots returns success."""
|
||||
crawl_record = {'type': 'Crawl', 'id': 'fake-id', 'urls': 'https://example.com'}
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'create'],
|
||||
stdin=json.dumps(crawl_record),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Passed through' in stderr
|
||||
|
||||
|
||||
class TestArchiveResultList:
|
||||
"""Tests for `archivebox archiveresult list`."""
|
||||
|
||||
def test_list_empty(self, cli_env, initialized_archive):
|
||||
"""List with no archive results returns empty."""
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'list'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Listed 0 archive results' in stderr
|
||||
|
||||
def test_list_filter_by_status(self, cli_env, initialized_archive):
|
||||
"""Filter archive results by status."""
|
||||
# Create snapshot and archive result
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'list', '--status=queued'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
for r in records:
|
||||
assert r['status'] == 'queued'
|
||||
|
||||
def test_list_filter_by_plugin(self, cli_env, initialized_archive):
|
||||
"""Filter archive results by plugin."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'list', '--plugin=title'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
for r in records:
|
||||
assert r['plugin'] == 'title'
|
||||
|
||||
def test_list_with_limit(self, cli_env, initialized_archive):
|
||||
"""Limit number of results."""
|
||||
# Create multiple archive results
|
||||
for _ in range(3):
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'list', '--limit=2'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) == 2
|
||||
|
||||
|
||||
class TestArchiveResultUpdate:
|
||||
"""Tests for `archivebox archiveresult update`."""
|
||||
|
||||
def test_update_status(self, cli_env, initialized_archive):
|
||||
"""Update archive result status."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout2, _, _ = run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
|
||||
|
||||
stdout3, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'update', '--status=failed'],
|
||||
stdin=json.dumps(ar),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Updated 1 archive results' in stderr
|
||||
|
||||
records = parse_jsonl_output(stdout3)
|
||||
assert records[0]['status'] == 'failed'
|
||||
|
||||
|
||||
class TestArchiveResultDelete:
|
||||
"""Tests for `archivebox archiveresult delete`."""
|
||||
|
||||
def test_delete_requires_yes(self, cli_env, initialized_archive):
|
||||
"""Delete requires --yes flag."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout2, _, _ = run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'delete'],
|
||||
stdin=json.dumps(ar),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 1
|
||||
assert '--yes' in stderr
|
||||
|
||||
def test_delete_with_yes(self, cli_env, initialized_archive):
|
||||
"""Delete with --yes flag works."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout2, _, _ = run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['archiveresult', 'delete', '--yes'],
|
||||
stdin=json.dumps(ar),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Deleted 1 archive results' in stderr
|
||||
261
archivebox/tests/test_cli_crawl.py
Normal file
261
archivebox/tests/test_cli_crawl.py
Normal file
@@ -0,0 +1,261 @@
|
||||
"""
|
||||
Tests for archivebox crawl CLI command.
|
||||
|
||||
Tests cover:
|
||||
- crawl create (with URLs, from stdin, pass-through)
|
||||
- crawl list (with filters)
|
||||
- crawl update
|
||||
- crawl delete
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from archivebox.tests.conftest import (
|
||||
run_archivebox_cmd,
|
||||
parse_jsonl_output,
|
||||
assert_jsonl_contains_type,
|
||||
create_test_url,
|
||||
create_test_crawl_json,
|
||||
)
|
||||
|
||||
|
||||
class TestCrawlCreate:
|
||||
"""Tests for `archivebox crawl create`."""
|
||||
|
||||
def test_create_from_url_args(self, cli_env, initialized_archive):
|
||||
"""Create crawl from URL arguments."""
|
||||
url = create_test_url()
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'create', url],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0, f"Command failed: {stderr}"
|
||||
assert 'Created crawl' in stderr
|
||||
|
||||
# Check JSONL output
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) == 1
|
||||
assert records[0]['type'] == 'Crawl'
|
||||
assert url in records[0]['urls']
|
||||
|
||||
def test_create_from_stdin_urls(self, cli_env, initialized_archive):
|
||||
"""Create crawl from stdin URLs (one per line)."""
|
||||
urls = [create_test_url() for _ in range(3)]
|
||||
stdin = '\n'.join(urls)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'create'],
|
||||
stdin=stdin,
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0, f"Command failed: {stderr}"
|
||||
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) == 1
|
||||
crawl = records[0]
|
||||
assert crawl['type'] == 'Crawl'
|
||||
# All URLs should be in the crawl
|
||||
for url in urls:
|
||||
assert url in crawl['urls']
|
||||
|
||||
def test_create_with_depth(self, cli_env, initialized_archive):
|
||||
"""Create crawl with --depth flag."""
|
||||
url = create_test_url()
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'create', '--depth=2', url],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert records[0]['max_depth'] == 2
|
||||
|
||||
def test_create_with_tag(self, cli_env, initialized_archive):
|
||||
"""Create crawl with --tag flag."""
|
||||
url = create_test_url()
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'create', '--tag=test-tag', url],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert 'test-tag' in records[0].get('tags_str', '')
|
||||
|
||||
def test_create_pass_through_other_types(self, cli_env, initialized_archive):
|
||||
"""Pass-through records of other types unchanged."""
|
||||
tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'}
|
||||
url = create_test_url()
|
||||
stdin = json.dumps(tag_record) + '\n' + json.dumps({'url': url})
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'create'],
|
||||
stdin=stdin,
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
|
||||
# Should have both the passed-through Tag and the new Crawl
|
||||
types = [r.get('type') for r in records]
|
||||
assert 'Tag' in types
|
||||
assert 'Crawl' in types
|
||||
|
||||
def test_create_pass_through_existing_crawl(self, cli_env, initialized_archive):
|
||||
"""Existing Crawl records (with id) are passed through."""
|
||||
# First create a crawl
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
crawl = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
# Now pipe it back - should pass through
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'create'],
|
||||
stdin=json.dumps(crawl),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout2)
|
||||
assert len(records) == 1
|
||||
assert records[0]['id'] == crawl['id']
|
||||
|
||||
|
||||
class TestCrawlList:
|
||||
"""Tests for `archivebox crawl list`."""
|
||||
|
||||
def test_list_empty(self, cli_env, initialized_archive):
|
||||
"""List with no crawls returns empty."""
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'list'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Listed 0 crawls' in stderr
|
||||
|
||||
def test_list_returns_created(self, cli_env, initialized_archive):
|
||||
"""List returns previously created crawls."""
|
||||
url = create_test_url()
|
||||
run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'list'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) >= 1
|
||||
assert any(url in r.get('urls', '') for r in records)
|
||||
|
||||
def test_list_filter_by_status(self, cli_env, initialized_archive):
|
||||
"""Filter crawls by status."""
|
||||
url = create_test_url()
|
||||
run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'list', '--status=queued'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
for r in records:
|
||||
assert r['status'] == 'queued'
|
||||
|
||||
def test_list_with_limit(self, cli_env, initialized_archive):
|
||||
"""Limit number of results."""
|
||||
# Create multiple crawls
|
||||
for _ in range(3):
|
||||
run_archivebox_cmd(['crawl', 'create', create_test_url()], env=cli_env)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'list', '--limit=2'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) == 2
|
||||
|
||||
|
||||
class TestCrawlUpdate:
|
||||
"""Tests for `archivebox crawl update`."""
|
||||
|
||||
def test_update_status(self, cli_env, initialized_archive):
|
||||
"""Update crawl status."""
|
||||
# Create a crawl
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
crawl = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
# Update it
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'update', '--status=started'],
|
||||
stdin=json.dumps(crawl),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Updated 1 crawls' in stderr
|
||||
|
||||
records = parse_jsonl_output(stdout2)
|
||||
assert records[0]['status'] == 'started'
|
||||
|
||||
|
||||
class TestCrawlDelete:
|
||||
"""Tests for `archivebox crawl delete`."""
|
||||
|
||||
def test_delete_requires_yes(self, cli_env, initialized_archive):
|
||||
"""Delete requires --yes flag."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
crawl = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'delete'],
|
||||
stdin=json.dumps(crawl),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 1
|
||||
assert '--yes' in stderr
|
||||
|
||||
def test_delete_with_yes(self, cli_env, initialized_archive):
|
||||
"""Delete with --yes flag works."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
crawl = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'delete', '--yes'],
|
||||
stdin=json.dumps(crawl),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Deleted 1 crawls' in stderr
|
||||
|
||||
def test_delete_dry_run(self, cli_env, initialized_archive):
|
||||
"""Dry run shows what would be deleted."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
crawl = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['crawl', 'delete', '--dry-run'],
|
||||
stdin=json.dumps(crawl),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Would delete' in stderr
|
||||
assert 'dry run' in stderr.lower()
|
||||
254
archivebox/tests/test_cli_run.py
Normal file
254
archivebox/tests/test_cli_run.py
Normal file
@@ -0,0 +1,254 @@
|
||||
"""
|
||||
Tests for archivebox run CLI command.
|
||||
|
||||
Tests cover:
|
||||
- run with stdin JSONL (Crawl, Snapshot, ArchiveResult)
|
||||
- create-or-update behavior (records with/without id)
|
||||
- pass-through output (for chaining)
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from archivebox.tests.conftest import (
|
||||
run_archivebox_cmd,
|
||||
parse_jsonl_output,
|
||||
create_test_url,
|
||||
create_test_crawl_json,
|
||||
create_test_snapshot_json,
|
||||
)
|
||||
|
||||
|
||||
class TestRunWithCrawl:
|
||||
"""Tests for `archivebox run` with Crawl input."""
|
||||
|
||||
def test_run_with_new_crawl(self, cli_env, initialized_archive):
|
||||
"""Run creates and processes a new Crawl (no id)."""
|
||||
crawl_record = create_test_crawl_json()
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(crawl_record),
|
||||
env=cli_env,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
assert code == 0, f"Command failed: {stderr}"
|
||||
|
||||
# Should output the created Crawl
|
||||
records = parse_jsonl_output(stdout)
|
||||
crawl_records = [r for r in records if r.get('type') == 'Crawl']
|
||||
assert len(crawl_records) >= 1
|
||||
assert crawl_records[0].get('id') # Should have an id now
|
||||
|
||||
def test_run_with_existing_crawl(self, cli_env, initialized_archive):
|
||||
"""Run re-queues an existing Crawl (with id)."""
|
||||
url = create_test_url()
|
||||
|
||||
# First create a crawl
|
||||
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
crawl = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
# Run with the existing crawl
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(crawl),
|
||||
env=cli_env,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout2)
|
||||
assert len(records) >= 1
|
||||
|
||||
|
||||
class TestRunWithSnapshot:
|
||||
"""Tests for `archivebox run` with Snapshot input."""
|
||||
|
||||
def test_run_with_new_snapshot(self, cli_env, initialized_archive):
|
||||
"""Run creates and processes a new Snapshot (no id, just url)."""
|
||||
snapshot_record = create_test_snapshot_json()
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(snapshot_record),
|
||||
env=cli_env,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
assert code == 0, f"Command failed: {stderr}"
|
||||
|
||||
records = parse_jsonl_output(stdout)
|
||||
snapshot_records = [r for r in records if r.get('type') == 'Snapshot']
|
||||
assert len(snapshot_records) >= 1
|
||||
assert snapshot_records[0].get('id')
|
||||
|
||||
def test_run_with_existing_snapshot(self, cli_env, initialized_archive):
|
||||
"""Run re-queues an existing Snapshot (with id)."""
|
||||
url = create_test_url()
|
||||
|
||||
# First create a snapshot
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
# Run with the existing snapshot
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout2)
|
||||
assert len(records) >= 1
|
||||
|
||||
def test_run_with_plain_url(self, cli_env, initialized_archive):
|
||||
"""Run accepts plain URL records (no type field)."""
|
||||
url = create_test_url()
|
||||
url_record = {'url': url}
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(url_record),
|
||||
env=cli_env,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) >= 1
|
||||
|
||||
|
||||
class TestRunWithArchiveResult:
|
||||
"""Tests for `archivebox run` with ArchiveResult input."""
|
||||
|
||||
def test_run_requeues_failed_archiveresult(self, cli_env, initialized_archive):
|
||||
"""Run re-queues a failed ArchiveResult."""
|
||||
url = create_test_url()
|
||||
|
||||
# Create snapshot and archive result
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout2, _, _ = run_archivebox_cmd(
|
||||
['archiveresult', 'create', '--plugin=title'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
|
||||
|
||||
# Update to failed
|
||||
ar['status'] = 'failed'
|
||||
run_archivebox_cmd(
|
||||
['archiveresult', 'update', '--status=failed'],
|
||||
stdin=json.dumps(ar),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
# Now run should re-queue it
|
||||
stdout3, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(ar),
|
||||
env=cli_env,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout3)
|
||||
ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
|
||||
assert len(ar_records) >= 1
|
||||
|
||||
|
||||
class TestRunPassThrough:
|
||||
"""Tests for pass-through behavior in `archivebox run`."""
|
||||
|
||||
def test_run_passes_through_unknown_types(self, cli_env, initialized_archive):
|
||||
"""Run passes through records with unknown types."""
|
||||
unknown_record = {'type': 'Unknown', 'id': 'fake-id', 'data': 'test'}
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(unknown_record),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
unknown_records = [r for r in records if r.get('type') == 'Unknown']
|
||||
assert len(unknown_records) == 1
|
||||
assert unknown_records[0]['data'] == 'test'
|
||||
|
||||
def test_run_outputs_all_processed_records(self, cli_env, initialized_archive):
|
||||
"""Run outputs all processed records for chaining."""
|
||||
url = create_test_url()
|
||||
crawl_record = create_test_crawl_json(urls=[url])
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(crawl_record),
|
||||
env=cli_env,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
# Should have at least the Crawl in output
|
||||
assert len(records) >= 1
|
||||
|
||||
|
||||
class TestRunMixedInput:
|
||||
"""Tests for `archivebox run` with mixed record types."""
|
||||
|
||||
def test_run_handles_mixed_types(self, cli_env, initialized_archive):
|
||||
"""Run handles mixed Crawl/Snapshot/ArchiveResult input."""
|
||||
crawl = create_test_crawl_json()
|
||||
snapshot = create_test_snapshot_json()
|
||||
unknown = {'type': 'Tag', 'id': 'fake', 'name': 'test'}
|
||||
|
||||
stdin = '\n'.join([
|
||||
json.dumps(crawl),
|
||||
json.dumps(snapshot),
|
||||
json.dumps(unknown),
|
||||
])
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=stdin,
|
||||
env=cli_env,
|
||||
timeout=120,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
|
||||
types = set(r.get('type') for r in records)
|
||||
# Should have processed Crawl and Snapshot, passed through Tag
|
||||
assert 'Crawl' in types or 'Snapshot' in types or 'Tag' in types
|
||||
|
||||
|
||||
class TestRunEmpty:
|
||||
"""Tests for `archivebox run` edge cases."""
|
||||
|
||||
def test_run_empty_stdin(self, cli_env, initialized_archive):
|
||||
"""Run with empty stdin returns success."""
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin='',
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
|
||||
def test_run_no_records_to_process(self, cli_env, initialized_archive):
|
||||
"""Run with only pass-through records shows message."""
|
||||
unknown = {'type': 'Unknown', 'id': 'fake'}
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['run'],
|
||||
stdin=json.dumps(unknown),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'No records to process' in stderr
|
||||
274
archivebox/tests/test_cli_snapshot.py
Normal file
274
archivebox/tests/test_cli_snapshot.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""
|
||||
Tests for archivebox snapshot CLI command.
|
||||
|
||||
Tests cover:
|
||||
- snapshot create (from URLs, from Crawl JSONL, pass-through)
|
||||
- snapshot list (with filters)
|
||||
- snapshot update
|
||||
- snapshot delete
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from archivebox.tests.conftest import (
|
||||
run_archivebox_cmd,
|
||||
parse_jsonl_output,
|
||||
assert_jsonl_contains_type,
|
||||
create_test_url,
|
||||
)
|
||||
|
||||
|
||||
class TestSnapshotCreate:
|
||||
"""Tests for `archivebox snapshot create`."""
|
||||
|
||||
def test_create_from_url_args(self, cli_env, initialized_archive):
|
||||
"""Create snapshot from URL arguments."""
|
||||
url = create_test_url()
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'create', url],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0, f"Command failed: {stderr}"
|
||||
assert 'Created' in stderr
|
||||
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) == 1
|
||||
assert records[0]['type'] == 'Snapshot'
|
||||
assert records[0]['url'] == url
|
||||
|
||||
def test_create_from_crawl_jsonl(self, cli_env, initialized_archive):
|
||||
"""Create snapshots from Crawl JSONL input."""
|
||||
url = create_test_url()
|
||||
|
||||
# First create a crawl
|
||||
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], env=cli_env)
|
||||
crawl = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
# Pipe crawl to snapshot create
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'create'],
|
||||
stdin=json.dumps(crawl),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0, f"Command failed: {stderr}"
|
||||
|
||||
records = parse_jsonl_output(stdout2)
|
||||
# Should have the Crawl passed through and the Snapshot created
|
||||
types = [r.get('type') for r in records]
|
||||
assert 'Crawl' in types
|
||||
assert 'Snapshot' in types
|
||||
|
||||
snapshot = next(r for r in records if r['type'] == 'Snapshot')
|
||||
assert snapshot['url'] == url
|
||||
|
||||
def test_create_with_tag(self, cli_env, initialized_archive):
|
||||
"""Create snapshot with --tag flag."""
|
||||
url = create_test_url()
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'create', '--tag=test-tag', url],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert 'test-tag' in records[0].get('tags_str', '')
|
||||
|
||||
def test_create_pass_through_other_types(self, cli_env, initialized_archive):
|
||||
"""Pass-through records of other types unchanged."""
|
||||
tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'}
|
||||
url = create_test_url()
|
||||
stdin = json.dumps(tag_record) + '\n' + json.dumps({'url': url})
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'create'],
|
||||
stdin=stdin,
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
|
||||
types = [r.get('type') for r in records]
|
||||
assert 'Tag' in types
|
||||
assert 'Snapshot' in types
|
||||
|
||||
def test_create_multiple_urls(self, cli_env, initialized_archive):
|
||||
"""Create snapshots from multiple URLs."""
|
||||
urls = [create_test_url() for _ in range(3)]
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'create'] + urls,
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) == 3
|
||||
|
||||
created_urls = {r['url'] for r in records}
|
||||
for url in urls:
|
||||
assert url in created_urls
|
||||
|
||||
|
||||
class TestSnapshotList:
|
||||
"""Tests for `archivebox snapshot list`."""
|
||||
|
||||
def test_list_empty(self, cli_env, initialized_archive):
|
||||
"""List with no snapshots returns empty."""
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'list'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Listed 0 snapshots' in stderr
|
||||
|
||||
def test_list_returns_created(self, cli_env, initialized_archive):
|
||||
"""List returns previously created snapshots."""
|
||||
url = create_test_url()
|
||||
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'list'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) >= 1
|
||||
assert any(r.get('url') == url for r in records)
|
||||
|
||||
def test_list_filter_by_status(self, cli_env, initialized_archive):
|
||||
"""Filter snapshots by status."""
|
||||
url = create_test_url()
|
||||
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'list', '--status=queued'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
for r in records:
|
||||
assert r['status'] == 'queued'
|
||||
|
||||
def test_list_filter_by_url_contains(self, cli_env, initialized_archive):
|
||||
"""Filter snapshots by URL contains."""
|
||||
url = create_test_url(domain='unique-domain-12345.com')
|
||||
run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'list', '--url__icontains=unique-domain-12345'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) == 1
|
||||
assert 'unique-domain-12345' in records[0]['url']
|
||||
|
||||
def test_list_with_limit(self, cli_env, initialized_archive):
|
||||
"""Limit number of results."""
|
||||
for _ in range(3):
|
||||
run_archivebox_cmd(['snapshot', 'create', create_test_url()], env=cli_env)
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'list', '--limit=2'],
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
records = parse_jsonl_output(stdout)
|
||||
assert len(records) == 2
|
||||
|
||||
|
||||
class TestSnapshotUpdate:
|
||||
"""Tests for `archivebox snapshot update`."""
|
||||
|
||||
def test_update_status(self, cli_env, initialized_archive):
|
||||
"""Update snapshot status."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'update', '--status=started'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Updated 1 snapshots' in stderr
|
||||
|
||||
records = parse_jsonl_output(stdout2)
|
||||
assert records[0]['status'] == 'started'
|
||||
|
||||
def test_update_add_tag(self, cli_env, initialized_archive):
|
||||
"""Update snapshot by adding tag."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout2, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'update', '--tag=new-tag'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Updated 1 snapshots' in stderr
|
||||
|
||||
|
||||
class TestSnapshotDelete:
|
||||
"""Tests for `archivebox snapshot delete`."""
|
||||
|
||||
def test_delete_requires_yes(self, cli_env, initialized_archive):
|
||||
"""Delete requires --yes flag."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'delete'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 1
|
||||
assert '--yes' in stderr
|
||||
|
||||
def test_delete_with_yes(self, cli_env, initialized_archive):
|
||||
"""Delete with --yes flag works."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'delete', '--yes'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Deleted 1 snapshots' in stderr
|
||||
|
||||
def test_delete_dry_run(self, cli_env, initialized_archive):
|
||||
"""Dry run shows what would be deleted."""
|
||||
url = create_test_url()
|
||||
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], env=cli_env)
|
||||
snapshot = parse_jsonl_output(stdout1)[0]
|
||||
|
||||
stdout, stderr, code = run_archivebox_cmd(
|
||||
['snapshot', 'delete', '--dry-run'],
|
||||
stdin=json.dumps(snapshot),
|
||||
env=cli_env,
|
||||
)
|
||||
|
||||
assert code == 0
|
||||
assert 'Would delete' in stderr
|
||||
Reference in New Issue
Block a user