Files
ArchiveBox/archivebox/tests/test_snapshot.py
2026-01-19 01:00:53 -08:00

224 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""Integration tests for archivebox snapshot command."""
import os
import subprocess
import sqlite3
from archivebox.machine.models import Process
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
import uuid
import pytest
from .fixtures import process, disable_extractors_dict
def test_snapshot_creates_snapshot_with_correct_url(tmp_path, process, disable_extractors_dict):
"""Test that snapshot stores the exact URL in the database."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot_row = c.execute(
"SELECT id, created_at, url, crawl_id FROM core_snapshot WHERE url = ?",
('https://example.com',)
).fetchone()
assert snapshot_row is not None
crawl_row = c.execute(
"SELECT id, created_at, urls, created_by_id FROM crawls_crawl WHERE id = ?",
(snapshot_row[3],)
).fetchone()
assert crawl_row is not None
user_row = c.execute(
"SELECT username FROM auth_user WHERE id = ?",
(crawl_row[3],)
).fetchone()
assert user_row is not None
conn.close()
snapshot_id_raw, snapshot_created_at, snapshot_url, crawl_id = snapshot_row
snapshot_id = str(uuid.UUID(snapshot_id_raw))
crawl_id, crawl_created_at, crawl_urls, crawl_created_by_id = crawl_row
username = user_row[0]
crawl_date_str = datetime.fromisoformat(crawl_created_at).strftime('%Y%m%d')
snapshot_date_str = datetime.fromisoformat(snapshot_created_at).strftime('%Y%m%d')
domain = urlparse(snapshot_url).hostname or 'unknown'
# Verify crawl symlink exists and is relative
target_path = tmp_path / 'users' / username / 'snapshots' / snapshot_date_str / domain / snapshot_id
symlinks = [
p for p in tmp_path.rglob(str(snapshot_id))
if p.is_symlink()
]
assert symlinks, "Snapshot symlink should exist under crawl dir"
link_path = symlinks[0]
assert link_path.is_symlink(), "Snapshot symlink should exist under crawl dir"
link_target = os.readlink(link_path)
assert not os.path.isabs(link_target), "Symlink should be relative"
assert link_path.resolve() == target_path.resolve()
def test_snapshot_multiple_urls_creates_multiple_records(tmp_path, process, disable_extractors_dict):
"""Test that multiple URLs each get their own snapshot record."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'snapshot', 'create',
'https://example.com',
'https://iana.org'],
capture_output=True,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
urls = c.execute("SELECT url FROM core_snapshot ORDER BY url").fetchall()
conn.close()
urls = [u[0] for u in urls]
assert 'https://example.com' in urls
assert 'https://iana.org' in urls
assert len(urls) >= 2
def test_snapshot_tag_creates_tag_and_links_to_snapshot(tmp_path, process, disable_extractors_dict):
"""Test that --tag creates tag record and links it to the snapshot."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'snapshot', 'create', '--tag=mytesttag',
'https://example.com'],
capture_output=True,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
# Verify tag was created
tag = c.execute("SELECT id, name FROM core_tag WHERE name = ?", ('mytesttag',)).fetchone()
assert tag is not None, "Tag 'mytesttag' should exist in core_tag"
tag_id = tag[0]
# Verify snapshot exists
snapshot = c.execute("SELECT id FROM core_snapshot WHERE url = ?",
('https://example.com',)).fetchone()
assert snapshot is not None
snapshot_id = snapshot[0]
# Verify tag is linked to snapshot via join table
link = c.execute("""
SELECT * FROM core_snapshot_tags
WHERE snapshot_id = ? AND tag_id = ?
""", (snapshot_id, tag_id)).fetchone()
conn.close()
assert link is not None, "Tag should be linked to snapshot via core_snapshot_tags"
def test_snapshot_jsonl_output_has_correct_structure(tmp_path, process, disable_extractors_dict):
"""Test that JSONL output contains required fields with correct types."""
os.chdir(tmp_path)
# Pass URL as argument instead of stdin for more reliable behavior
result = subprocess.run(
['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
text=True,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
# Parse JSONL output lines
records = Process.parse_records_from_text(result.stdout)
snapshot_records = [r for r in records if r.get('type') == 'Snapshot']
assert len(snapshot_records) >= 1, "Should output at least one Snapshot JSONL record"
record = snapshot_records[0]
assert record.get('type') == 'Snapshot'
assert 'id' in record, "Snapshot record should have 'id' field"
assert 'url' in record, "Snapshot record should have 'url' field"
assert record['url'] == 'https://example.com'
def test_snapshot_with_tag_stores_tag_name(tmp_path, process, disable_extractors_dict):
"""Test that title is stored when provided via tag option."""
os.chdir(tmp_path)
# Use command line args instead of stdin
subprocess.run(
['archivebox', 'snapshot', 'create', '--tag=customtag', 'https://example.com'],
capture_output=True,
text=True,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
# Verify tag was created with correct name
tag = c.execute("SELECT name FROM core_tag WHERE name = ?",
('customtag',)).fetchone()
conn.close()
assert tag is not None
assert tag[0] == 'customtag'
def test_snapshot_with_depth_sets_snapshot_depth(tmp_path, process, disable_extractors_dict):
"""Test that --depth sets snapshot depth when creating snapshots."""
os.chdir(tmp_path)
subprocess.run(
['archivebox', 'snapshot', 'create', '--depth=1',
'https://example.com'],
capture_output=True,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
snapshot = c.execute("SELECT depth FROM core_snapshot ORDER BY created_at DESC LIMIT 1").fetchone()
conn.close()
assert snapshot is not None, "Snapshot should be created when depth is provided"
assert snapshot[0] == 1, "Snapshot depth should match --depth value"
def test_snapshot_allows_duplicate_urls_across_crawls(tmp_path, process, disable_extractors_dict):
"""Snapshot create auto-creates a crawl per run; same URL can appear multiple times."""
os.chdir(tmp_path)
# Add same URL twice
subprocess.run(
['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
subprocess.run(
['archivebox', 'snapshot', 'create', 'https://example.com'],
capture_output=True,
env={**disable_extractors_dict, 'DATA_DIR': str(tmp_path)},
)
conn = sqlite3.connect('index.sqlite3')
c = conn.cursor()
count = c.execute("SELECT COUNT(*) FROM core_snapshot WHERE url = ?",
('https://example.com',)).fetchone()[0]
conn.close()
assert count == 2, "Same URL should create separate snapshots across different crawls"
if __name__ == '__main__':
pytest.main([__file__, '-v'])