Files
ArchiveBox/archivebox/tests/test_migrations_fresh.py
Claude 779040db1b Split migration tests into separate files and tighten assertions
- Split tests_migrations.py into focused test modules:
  - test_migrations_helpers.py: schemas, seeding functions, verification helpers
  - test_migrations_fresh.py: fresh install tests (12 tests)
  - test_migrations_04_to_09.py: 0.4.x migration tests (9 tests)
  - test_migrations_07_to_09.py: 0.7.x migration tests (19 tests)
  - test_migrations_08_to_09.py: 0.8.x migration tests (21 tests)

- Tighten all assertions:
  - init command now requires returncode == 0 (not [0, 1])
  - verify_all_snapshots_in_output checks ALL snapshots appear (not just one)
  - verify_tag_count uses exact match (not >=)
  - verify_snapshot_titles checks all URLs exist

- All 61 tests pass with strict assertions
- No mocks, no skips - real subprocess tests against real sqlite databases
2025-12-27 05:09:36 +00:00

296 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Fresh install tests for ArchiveBox.
Tests that fresh installations work correctly with the current schema.
"""
import shutil
import sqlite3
import tempfile
import unittest
from pathlib import Path
from .test_migrations_helpers import run_archivebox
class TestFreshInstall(unittest.TestCase):
"""Test that fresh installs work correctly."""
def test_init_creates_database(self):
"""Fresh init should create database and directories."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Verify database was created
self.assertTrue((work_dir / 'index.sqlite3').exists(), "Database not created")
# Verify archive directory exists
self.assertTrue((work_dir / 'archive').is_dir(), "Archive dir not created")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_status_after_init(self):
"""Status command should work after init."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(work_dir, ['status'])
self.assertEqual(result.returncode, 0, f"Status failed: {result.stderr}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_add_url_after_init(self):
"""Should be able to add URLs after init with --index-only."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Add a URL with --index-only for speed
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com'])
self.assertEqual(result.returncode, 0, f"Add command failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
# Verify a Crawl was created
cursor.execute("SELECT COUNT(*) FROM crawls_crawl")
crawl_count = cursor.fetchone()[0]
self.assertGreaterEqual(crawl_count, 1, "No Crawl was created")
# Verify at least one snapshot was created
cursor.execute("SELECT COUNT(*) FROM core_snapshot")
snapshot_count = cursor.fetchone()[0]
self.assertGreaterEqual(snapshot_count, 1, "No Snapshot was created")
conn.close()
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_list_after_add(self):
"""List command should show added snapshots."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com'])
self.assertEqual(result.returncode, 0, f"Add failed: {result.stderr}")
result = run_archivebox(work_dir, ['list'])
self.assertEqual(result.returncode, 0, f"List failed: {result.stderr}")
# Verify the URL appears in output
output = result.stdout + result.stderr
self.assertIn('example.com', output, f"Added URL not in list output: {output[:500]}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_migrations_table_populated(self):
"""Django migrations table should be populated after init."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM django_migrations")
count = cursor.fetchone()[0]
conn.close()
# Should have many migrations applied
self.assertGreater(count, 10, f"Expected >10 migrations, got {count}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_core_migrations_applied(self):
"""Core app migrations should be applied."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute("SELECT name FROM django_migrations WHERE app='core' ORDER BY name")
migrations = [row[0] for row in cursor.fetchall()]
conn.close()
self.assertIn('0001_initial', migrations)
finally:
shutil.rmtree(work_dir, ignore_errors=True)
class TestSchemaIntegrity(unittest.TestCase):
"""Test that the database schema is correct."""
def test_snapshot_table_has_required_columns(self):
"""Snapshot table should have all required columns."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(core_snapshot)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required = {'id', 'url', 'timestamp', 'title', 'status', 'created_at', 'modified_at'}
for col in required:
self.assertIn(col, columns, f"Missing column: {col}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_archiveresult_table_has_required_columns(self):
"""ArchiveResult table should have all required columns."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(core_archiveresult)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required = {'id', 'snapshot_id', 'extractor', 'status', 'created_at', 'modified_at'}
for col in required:
self.assertIn(col, columns, f"Missing column: {col}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_tag_table_has_required_columns(self):
"""Tag table should have all required columns."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(core_tag)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required = {'id', 'name', 'slug'}
for col in required:
self.assertIn(col, columns, f"Missing column: {col}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_crawl_table_has_required_columns(self):
"""Crawl table should have all required columns."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
cursor.execute('PRAGMA table_info(crawls_crawl)')
columns = {row[1] for row in cursor.fetchall()}
conn.close()
required = {'id', 'urls', 'status', 'created_at', 'created_by_id'}
for col in required:
self.assertIn(col, columns, f"Missing column: {col}")
# seed_id should NOT exist (removed in 0.9.x)
self.assertNotIn('seed_id', columns, "seed_id column should not exist in 0.9.x")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
class TestMultipleSnapshots(unittest.TestCase):
"""Test handling multiple snapshots."""
def test_add_urls_separately(self):
"""Should be able to add multiple URLs one at a time."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
# Add URLs one at a time
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com'])
self.assertEqual(result.returncode, 0, f"Add 1 failed: {result.stderr}")
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.org'])
self.assertEqual(result.returncode, 0, f"Add 2 failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
# Verify snapshots were created
cursor.execute("SELECT COUNT(*) FROM core_snapshot")
snapshot_count = cursor.fetchone()[0]
self.assertEqual(snapshot_count, 2, f"Expected 2 snapshots, got {snapshot_count}")
# Verify crawls were created (one per add call)
cursor.execute("SELECT COUNT(*) FROM crawls_crawl")
crawl_count = cursor.fetchone()[0]
self.assertEqual(crawl_count, 2, f"Expected 2 Crawls, got {crawl_count}")
conn.close()
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def test_snapshots_linked_to_crawls(self):
"""Each snapshot should be linked to a crawl."""
work_dir = Path(tempfile.mkdtemp())
try:
result = run_archivebox(work_dir, ['init'])
self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}")
result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com'])
self.assertEqual(result.returncode, 0, f"Add failed: {result.stderr}")
conn = sqlite3.connect(str(work_dir / 'index.sqlite3'))
cursor = conn.cursor()
# Check that snapshot has a crawl_id
cursor.execute("SELECT crawl_id FROM core_snapshot WHERE url = 'https://example.com'")
row = cursor.fetchone()
self.assertIsNotNone(row, "Snapshot not found")
self.assertIsNotNone(row[0], "Snapshot should have a crawl_id")
conn.close()
finally:
shutil.rmtree(work_dir, ignore_errors=True)
if __name__ == '__main__':
unittest.main()