mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-02 17:05:38 +10:00
146 lines
3.9 KiB
Python
146 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive tests for archivebox update command.
|
|
Verify update drains old dirs, reconciles DB, and queues snapshots.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import sqlite3
|
|
|
|
from .fixtures import *
|
|
|
|
|
|
def test_update_runs_successfully_on_empty_archive(tmp_path, process):
|
|
"""Test that update runs without error on empty archive."""
|
|
os.chdir(tmp_path)
|
|
result = subprocess.run(
|
|
['archivebox', 'update'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
|
|
# Should complete successfully even with no snapshots
|
|
assert result.returncode == 0
|
|
|
|
|
|
def test_update_reconciles_existing_snapshots(tmp_path, process, disable_extractors_dict):
|
|
"""Test that update command reconciles existing snapshots."""
|
|
os.chdir(tmp_path)
|
|
|
|
# Add a snapshot (index-only for faster test)
|
|
subprocess.run(
|
|
['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
)
|
|
|
|
# Run update - should reconcile and queue
|
|
result = subprocess.run(
|
|
['archivebox', 'update'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
timeout=30,
|
|
)
|
|
|
|
assert result.returncode == 0
|
|
|
|
|
|
def test_update_specific_snapshot_by_filter(tmp_path, process, disable_extractors_dict):
|
|
"""Test updating specific snapshot using filter."""
|
|
os.chdir(tmp_path)
|
|
|
|
# Add multiple snapshots
|
|
subprocess.run(
|
|
['archivebox', 'add', '--depth=0', 'https://example.com'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
timeout=90,
|
|
)
|
|
subprocess.run(
|
|
['archivebox', 'add', '--depth=0', 'https://example.org'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
timeout=90,
|
|
)
|
|
|
|
# Update with filter pattern (uses filter_patterns argument)
|
|
result = subprocess.run(
|
|
['archivebox', 'update', '--filter-type=substring', 'example.com'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
timeout=30,
|
|
)
|
|
|
|
# Should complete successfully
|
|
assert result.returncode == 0
|
|
|
|
|
|
def test_update_preserves_snapshot_count(tmp_path, process, disable_extractors_dict):
|
|
"""Test that update doesn't change snapshot count."""
|
|
os.chdir(tmp_path)
|
|
|
|
# Add snapshots
|
|
subprocess.run(
|
|
['archivebox', 'add', '--depth=0', 'https://example.com'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
timeout=90,
|
|
)
|
|
|
|
# Count before update
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
count_before = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
|
|
conn.close()
|
|
|
|
assert count_before == 1
|
|
|
|
# Run update (should reconcile + queue, not create new snapshots)
|
|
subprocess.run(
|
|
['archivebox', 'update'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
timeout=30,
|
|
)
|
|
|
|
# Count after update
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
count_after = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0]
|
|
conn.close()
|
|
|
|
# Snapshot count should remain the same
|
|
assert count_after == count_before
|
|
|
|
|
|
def test_update_queues_snapshots_for_archiving(tmp_path, process, disable_extractors_dict):
|
|
"""Test that update queues snapshots for archiving."""
|
|
os.chdir(tmp_path)
|
|
|
|
subprocess.run(
|
|
['archivebox', 'add', '--depth=0', 'https://example.com'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
timeout=90,
|
|
)
|
|
|
|
# Run update
|
|
result = subprocess.run(
|
|
['archivebox', 'update'],
|
|
capture_output=True,
|
|
env=disable_extractors_dict,
|
|
timeout=30,
|
|
)
|
|
|
|
assert result.returncode == 0
|
|
|
|
# Check that snapshot is queued
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
status = c.execute("SELECT status FROM core_snapshot").fetchone()[0]
|
|
conn.close()
|
|
|
|
assert status == 'queued'
|