mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-06 07:47:53 +10:00
239 lines
7.7 KiB
Python
239 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Integration tests for archivebox snapshot command."""
|
|
|
|
import os
|
|
import subprocess
|
|
import sqlite3
|
|
from archivebox.machine.models import Process
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
|
|
def test_snapshot_creates_snapshot_with_correct_url(tmp_path, process, disable_extractors_dict):
|
|
"""Test that snapshot stores the exact URL in the database."""
|
|
os.chdir(tmp_path)
|
|
|
|
subprocess.run(
|
|
["archivebox", "snapshot", "create", "https://example.com"],
|
|
capture_output=True,
|
|
env={**disable_extractors_dict, "DATA_DIR": str(tmp_path)},
|
|
)
|
|
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
snapshot_row = c.execute(
|
|
"SELECT id, created_at, url, crawl_id FROM core_snapshot WHERE url = ?",
|
|
("https://example.com",),
|
|
).fetchone()
|
|
assert snapshot_row is not None
|
|
crawl_row = c.execute(
|
|
"SELECT id, created_at, urls, created_by_id FROM crawls_crawl WHERE id = ?",
|
|
(snapshot_row[3],),
|
|
).fetchone()
|
|
assert crawl_row is not None
|
|
user_row = c.execute(
|
|
"SELECT username FROM auth_user WHERE id = ?",
|
|
(crawl_row[3],),
|
|
).fetchone()
|
|
assert user_row is not None
|
|
conn.close()
|
|
|
|
snapshot_id_raw, snapshot_created_at, snapshot_url, crawl_id = snapshot_row
|
|
snapshot_id = str(uuid.UUID(snapshot_id_raw))
|
|
username = user_row[0]
|
|
snapshot_date_str = datetime.fromisoformat(snapshot_created_at).strftime("%Y%m%d")
|
|
domain = urlparse(snapshot_url).hostname or "unknown"
|
|
|
|
# Verify crawl symlink exists and is relative
|
|
target_path = tmp_path / "users" / username / "snapshots" / snapshot_date_str / domain / snapshot_id
|
|
symlinks = [p for p in tmp_path.rglob(str(snapshot_id)) if p.is_symlink()]
|
|
assert symlinks, "Snapshot symlink should exist under crawl dir"
|
|
link_path = symlinks[0]
|
|
|
|
assert link_path.is_symlink(), "Snapshot symlink should exist under crawl dir"
|
|
link_target = os.readlink(link_path)
|
|
assert not os.path.isabs(link_target), "Symlink should be relative"
|
|
assert link_path.resolve() == target_path.resolve()
|
|
|
|
|
|
def test_snapshot_multiple_urls_creates_multiple_records(tmp_path, process, disable_extractors_dict):
|
|
"""Test that multiple URLs each get their own snapshot record."""
|
|
os.chdir(tmp_path)
|
|
|
|
subprocess.run(
|
|
[
|
|
"archivebox",
|
|
"snapshot",
|
|
"create",
|
|
"https://example.com",
|
|
"https://iana.org",
|
|
],
|
|
capture_output=True,
|
|
env={**disable_extractors_dict, "DATA_DIR": str(tmp_path)},
|
|
)
|
|
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
urls = c.execute("SELECT url FROM core_snapshot ORDER BY url").fetchall()
|
|
conn.close()
|
|
|
|
urls = [u[0] for u in urls]
|
|
assert "https://example.com" in urls
|
|
assert "https://iana.org" in urls
|
|
assert len(urls) >= 2
|
|
|
|
|
|
def test_snapshot_tag_creates_tag_and_links_to_snapshot(tmp_path, process, disable_extractors_dict):
|
|
"""Test that --tag creates tag record and links it to the snapshot."""
|
|
os.chdir(tmp_path)
|
|
|
|
subprocess.run(
|
|
[
|
|
"archivebox",
|
|
"snapshot",
|
|
"create",
|
|
"--tag=mytesttag",
|
|
"https://example.com",
|
|
],
|
|
capture_output=True,
|
|
env={**disable_extractors_dict, "DATA_DIR": str(tmp_path)},
|
|
)
|
|
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
|
|
# Verify tag was created
|
|
tag = c.execute("SELECT id, name FROM core_tag WHERE name = ?", ("mytesttag",)).fetchone()
|
|
assert tag is not None, "Tag 'mytesttag' should exist in core_tag"
|
|
tag_id = tag[0]
|
|
|
|
# Verify snapshot exists
|
|
snapshot = c.execute(
|
|
"SELECT id FROM core_snapshot WHERE url = ?",
|
|
("https://example.com",),
|
|
).fetchone()
|
|
assert snapshot is not None
|
|
snapshot_id = snapshot[0]
|
|
|
|
# Verify tag is linked to snapshot via join table
|
|
link = c.execute(
|
|
"""
|
|
SELECT * FROM core_snapshot_tags
|
|
WHERE snapshot_id = ? AND tag_id = ?
|
|
""",
|
|
(snapshot_id, tag_id),
|
|
).fetchone()
|
|
conn.close()
|
|
|
|
assert link is not None, "Tag should be linked to snapshot via core_snapshot_tags"
|
|
|
|
|
|
def test_snapshot_jsonl_output_has_correct_structure(tmp_path, process, disable_extractors_dict):
|
|
"""Test that JSONL output contains required fields with correct types."""
|
|
os.chdir(tmp_path)
|
|
|
|
# Pass URL as argument instead of stdin for more reliable behavior
|
|
result = subprocess.run(
|
|
["archivebox", "snapshot", "create", "https://example.com"],
|
|
capture_output=True,
|
|
text=True,
|
|
env={**disable_extractors_dict, "DATA_DIR": str(tmp_path)},
|
|
)
|
|
|
|
# Parse JSONL output lines
|
|
records = Process.parse_records_from_text(result.stdout)
|
|
snapshot_records = [r for r in records if r.get("type") == "Snapshot"]
|
|
|
|
assert len(snapshot_records) >= 1, "Should output at least one Snapshot JSONL record"
|
|
|
|
record = snapshot_records[0]
|
|
assert record.get("type") == "Snapshot"
|
|
assert "id" in record, "Snapshot record should have 'id' field"
|
|
assert "url" in record, "Snapshot record should have 'url' field"
|
|
assert record["url"] == "https://example.com"
|
|
|
|
|
|
def test_snapshot_with_tag_stores_tag_name(tmp_path, process, disable_extractors_dict):
|
|
"""Test that title is stored when provided via tag option."""
|
|
os.chdir(tmp_path)
|
|
|
|
# Use command line args instead of stdin
|
|
subprocess.run(
|
|
["archivebox", "snapshot", "create", "--tag=customtag", "https://example.com"],
|
|
capture_output=True,
|
|
text=True,
|
|
env={**disable_extractors_dict, "DATA_DIR": str(tmp_path)},
|
|
)
|
|
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
|
|
# Verify tag was created with correct name
|
|
tag = c.execute(
|
|
"SELECT name FROM core_tag WHERE name = ?",
|
|
("customtag",),
|
|
).fetchone()
|
|
conn.close()
|
|
|
|
assert tag is not None
|
|
assert tag[0] == "customtag"
|
|
|
|
|
|
def test_snapshot_with_depth_sets_snapshot_depth(tmp_path, process, disable_extractors_dict):
|
|
"""Test that --depth sets snapshot depth when creating snapshots."""
|
|
os.chdir(tmp_path)
|
|
|
|
subprocess.run(
|
|
[
|
|
"archivebox",
|
|
"snapshot",
|
|
"create",
|
|
"--depth=1",
|
|
"https://example.com",
|
|
],
|
|
capture_output=True,
|
|
env={**disable_extractors_dict, "DATA_DIR": str(tmp_path)},
|
|
)
|
|
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
snapshot = c.execute("SELECT depth FROM core_snapshot ORDER BY created_at DESC LIMIT 1").fetchone()
|
|
conn.close()
|
|
|
|
assert snapshot is not None, "Snapshot should be created when depth is provided"
|
|
assert snapshot[0] == 1, "Snapshot depth should match --depth value"
|
|
|
|
|
|
def test_snapshot_allows_duplicate_urls_across_crawls(tmp_path, process, disable_extractors_dict):
|
|
"""Snapshot create auto-creates a crawl per run; same URL can appear multiple times."""
|
|
os.chdir(tmp_path)
|
|
|
|
# Add same URL twice
|
|
subprocess.run(
|
|
["archivebox", "snapshot", "create", "https://example.com"],
|
|
capture_output=True,
|
|
env={**disable_extractors_dict, "DATA_DIR": str(tmp_path)},
|
|
)
|
|
subprocess.run(
|
|
["archivebox", "snapshot", "create", "https://example.com"],
|
|
capture_output=True,
|
|
env={**disable_extractors_dict, "DATA_DIR": str(tmp_path)},
|
|
)
|
|
|
|
conn = sqlite3.connect("index.sqlite3")
|
|
c = conn.cursor()
|
|
count = c.execute(
|
|
"SELECT COUNT(*) FROM core_snapshot WHERE url = ?",
|
|
("https://example.com",),
|
|
).fetchone()[0]
|
|
conn.close()
|
|
|
|
assert count == 2, "Same URL should create separate snapshots across different crawls"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|