diff --git a/archivebox/tests/test_migrations_04_to_09.py b/archivebox/tests/test_migrations_04_to_09.py new file mode 100644 index 00000000..0614fbe4 --- /dev/null +++ b/archivebox/tests/test_migrations_04_to_09.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +""" +Migration tests from 0.4.x to 0.9.x. + +0.4.x was the first Django-powered version with a simpler schema: +- No Tag model (tags stored as comma-separated string in Snapshot) +- No ArchiveResult model (results stored in JSON files) +""" + +import shutil +import sqlite3 +import tempfile +import unittest +from pathlib import Path + +from .test_migrations_helpers import ( + SCHEMA_0_4, + seed_0_4_data, + run_archivebox, + create_data_dir_structure, + verify_snapshot_count, + verify_snapshot_urls, + verify_tag_count, +) + + +class TestMigrationFrom04x(unittest.TestCase): + """Test migration from 0.4.x schema to latest.""" + + def setUp(self): + """Create a temporary directory with 0.4.x schema and data.""" + self.work_dir = Path(tempfile.mkdtemp()) + self.db_path = self.work_dir / 'index.sqlite3' + + # Create directory structure + create_data_dir_structure(self.work_dir) + + # Create database with 0.4.x schema + conn = sqlite3.connect(str(self.db_path)) + conn.executescript(SCHEMA_0_4) + conn.close() + + # Seed with test data + self.original_data = seed_0_4_data(self.db_path) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.work_dir, ignore_errors=True) + + def test_migration_preserves_snapshot_count(self): + """Migration should preserve all snapshots from 0.4.x.""" + expected_count = len(self.original_data['snapshots']) + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_snapshot_count(self.db_path, expected_count) + self.assertTrue(ok, msg) + + def test_migration_preserves_snapshot_urls(self): + """Migration should preserve all snapshot URLs from 0.4.x.""" + expected_urls = [s['url'] for s in self.original_data['snapshots']] + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_snapshot_urls(self.db_path, expected_urls) + self.assertTrue(ok, msg) + + def test_migration_converts_string_tags_to_model(self): + """Migration should convert comma-separated tags to Tag model instances.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Collect unique tags from original data + original_tags = set() + for tags_str in self.original_data['tags_str']: + if tags_str: + for tag in tags_str.split(','): + original_tags.add(tag.strip()) + + # Tags should have been created + ok, msg = verify_tag_count(self.db_path, len(original_tags)) + self.assertTrue(ok, msg) + + def test_migration_preserves_snapshot_titles(self): + """Migration should preserve all snapshot titles.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("SELECT url, title FROM core_snapshot") + actual = {row[0]: row[1] for row in cursor.fetchall()} + conn.close() + + for snapshot in self.original_data['snapshots']: + self.assertEqual( + actual.get(snapshot['url']), + snapshot['title'], + f"Title mismatch for {snapshot['url']}" + ) + + def test_status_works_after_migration(self): + """Status command should work after migration.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['status']) + self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}") + + def test_list_works_after_migration(self): + """List command should work and show ALL migrated snapshots.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['list']) + self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}") + + # Verify ALL snapshots appear in output + output = result.stdout + result.stderr + for snapshot in self.original_data['snapshots']: + url_fragment = snapshot['url'][:30] + self.assertIn(url_fragment, output, + f"Snapshot {snapshot['url']} not found in list output") + + def test_add_works_after_migration(self): + """Adding new URLs should work after migration from 0.4.x.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Try to add a new URL after migration + result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45) + self.assertEqual(result.returncode, 0, f"Add failed after migration: {result.stderr}") + + # Verify snapshot was added + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM core_snapshot WHERE url = 'https://example.com/new-page'") + count = cursor.fetchone()[0] + conn.close() + + self.assertEqual(count, 1, "New snapshot was not created after migration") + + def test_new_schema_elements_created(self): + """Migration should create new 0.9.x schema elements.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") + tables = {row[0] for row in cursor.fetchall()} + conn.close() + + # New tables should exist + self.assertIn('crawls_crawl', tables, "crawls_crawl table not created") + self.assertIn('core_tag', tables, "core_tag table not created") + self.assertIn('core_archiveresult', tables, "core_archiveresult table not created") + + def test_snapshots_have_new_fields(self): + """Migrated snapshots should have new 0.9.x fields.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute('PRAGMA table_info(core_snapshot)') + columns = {row[1] for row in cursor.fetchall()} + conn.close() + + required_columns = {'status', 'depth', 'created_at', 'modified_at'} + for col in required_columns: + self.assertIn(col, columns, f"Snapshot missing new column: {col}") + + +if __name__ == '__main__': + unittest.main() diff --git a/archivebox/tests/test_migrations_07_to_09.py b/archivebox/tests/test_migrations_07_to_09.py new file mode 100644 index 00000000..f8f23a2f --- /dev/null +++ b/archivebox/tests/test_migrations_07_to_09.py @@ -0,0 +1,375 @@ +#!/usr/bin/env python3 +""" +Migration tests from 0.7.x to 0.9.x. + +0.7.x schema includes: +- Tag model with ManyToMany to Snapshot +- ArchiveResult model with ForeignKey to Snapshot +- AutoField primary keys +""" + +import shutil +import sqlite3 +import tempfile +import unittest +from pathlib import Path + +from .test_migrations_helpers import ( + SCHEMA_0_7, + seed_0_7_data, + run_archivebox, + create_data_dir_structure, + verify_snapshot_count, + verify_snapshot_urls, + verify_snapshot_titles, + verify_tag_count, + verify_archiveresult_count, + verify_foreign_keys, + verify_all_snapshots_in_output, +) + + +class TestMigrationFrom07x(unittest.TestCase): + """Test migration from 0.7.x schema to latest.""" + + def setUp(self): + """Create a temporary directory with 0.7.x schema and data.""" + self.work_dir = Path(tempfile.mkdtemp()) + self.db_path = self.work_dir / 'index.sqlite3' + + # Create directory structure + create_data_dir_structure(self.work_dir) + + # Create database with 0.7.x schema + conn = sqlite3.connect(str(self.db_path)) + conn.executescript(SCHEMA_0_7) + conn.close() + + # Seed with test data + self.original_data = seed_0_7_data(self.db_path) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.work_dir, ignore_errors=True) + + def test_migration_preserves_snapshot_count(self): + """Migration should preserve all snapshots.""" + expected_count = len(self.original_data['snapshots']) + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_snapshot_count(self.db_path, expected_count) + self.assertTrue(ok, msg) + + def test_migration_preserves_snapshot_urls(self): + """Migration should preserve all snapshot URLs.""" + expected_urls = [s['url'] for s in self.original_data['snapshots']] + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_snapshot_urls(self.db_path, expected_urls) + self.assertTrue(ok, msg) + + def test_migration_preserves_snapshot_titles(self): + """Migration should preserve all snapshot titles.""" + expected_titles = {s['url']: s['title'] for s in self.original_data['snapshots']} + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_snapshot_titles(self.db_path, expected_titles) + self.assertTrue(ok, msg) + + def test_migration_preserves_tags(self): + """Migration should preserve all tags.""" + expected_count = len(self.original_data['tags']) + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_tag_count(self.db_path, expected_count) + self.assertTrue(ok, msg) + + def test_migration_preserves_archiveresults(self): + """Migration should preserve all archive results.""" + expected_count = len(self.original_data['archiveresults']) + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_archiveresult_count(self.db_path, expected_count) + self.assertTrue(ok, msg) + + def test_migration_preserves_foreign_keys(self): + """Migration should maintain foreign key relationships.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_foreign_keys(self.db_path) + self.assertTrue(ok, msg) + + def test_status_works_after_migration(self): + """Status command should work after migration.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['status']) + self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}") + + def test_search_works_after_migration(self): + """Search command should find ALL migrated snapshots.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['search']) + self.assertEqual(result.returncode, 0, f"Search failed after migration: {result.stderr}") + + # Verify ALL snapshots appear in output + output = result.stdout + result.stderr + ok, msg = verify_all_snapshots_in_output(output, self.original_data['snapshots']) + self.assertTrue(ok, msg) + + def test_list_works_after_migration(self): + """List command should work and show ALL migrated data.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['list']) + self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}") + + # Verify ALL snapshots appear in output + output = result.stdout + result.stderr + ok, msg = verify_all_snapshots_in_output(output, self.original_data['snapshots']) + self.assertTrue(ok, msg) + + def test_new_schema_elements_created_after_migration(self): + """Migration should create new 0.9.x schema elements (crawls_crawl, etc.).""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + + # Check that new tables exist + cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") + tables = {row[0] for row in cursor.fetchall()} + conn.close() + + # 0.9.x should have crawls_crawl table + self.assertIn('crawls_crawl', tables, "crawls_crawl table not created during migration") + + def test_snapshots_have_new_fields_after_migration(self): + """Migrated snapshots should have new 0.9.x fields (status, depth, etc.).""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + + # Check snapshot table has new columns + cursor.execute('PRAGMA table_info(core_snapshot)') + columns = {row[1] for row in cursor.fetchall()} + conn.close() + + # 0.9.x snapshots should have status, depth, created_at, modified_at + required_new_columns = {'status', 'depth', 'created_at', 'modified_at'} + for col in required_new_columns: + self.assertIn(col, columns, f"Snapshot missing new column: {col}") + + def test_add_works_after_migration(self): + """Adding new URLs should work after migration from 0.7.x.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Verify that init created the crawls_crawl table before proceeding + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='crawls_crawl'") + table_exists = cursor.fetchone() is not None + conn.close() + self.assertTrue(table_exists, f"Init failed to create crawls_crawl table. Init stderr: {result.stderr[-500:]}") + + # Try to add a new URL after migration (use --index-only for speed) + result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45) + self.assertEqual(result.returncode, 0, f"Add failed after migration: {result.stderr}") + + # Verify a Crawl was created for the new URL + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM crawls_crawl") + crawl_count = cursor.fetchone()[0] + conn.close() + + self.assertGreaterEqual(crawl_count, 1, f"No Crawl created when adding URL. Add stderr: {result.stderr[-500:]}") + + def test_archiveresult_status_preserved_after_migration(self): + """Migration should preserve archive result status values.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + + # Get status counts + cursor.execute("SELECT status, COUNT(*) FROM core_archiveresult GROUP BY status") + status_counts = dict(cursor.fetchall()) + conn.close() + + # Original data has known status distribution: succeeded, failed, skipped + self.assertIn('succeeded', status_counts, "Should have succeeded results") + self.assertIn('failed', status_counts, "Should have failed results") + self.assertIn('skipped', status_counts, "Should have skipped results") + + def test_version_works_after_migration(self): + """Version command should work after migration.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['version']) + self.assertEqual(result.returncode, 0, f"Version failed after migration: {result.stderr}") + + # Should show version info + output = result.stdout + result.stderr + self.assertTrue('ArchiveBox' in output or 'version' in output.lower(), + f"Version output missing expected content: {output[:500]}") + + def test_help_works_after_migration(self): + """Help command should work after migration.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['help']) + self.assertEqual(result.returncode, 0, f"Help failed after migration: {result.stderr}") + + # Should show available commands + output = result.stdout + result.stderr + self.assertTrue('add' in output.lower() and 'status' in output.lower(), + f"Help output missing expected commands: {output[:500]}") + + +class TestMigrationDataIntegrity07x(unittest.TestCase): + """Comprehensive data integrity tests for 0.7.x migrations.""" + + def test_no_duplicate_snapshots_after_migration(self): + """Migration should not create duplicate snapshots.""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_7) + conn.close() + seed_0_7_data(db_path) + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Check for duplicate URLs + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute(""" + SELECT url, COUNT(*) as cnt FROM core_snapshot + GROUP BY url HAVING cnt > 1 + """) + duplicates = cursor.fetchall() + conn.close() + + self.assertEqual(len(duplicates), 0, f"Found duplicate URLs: {duplicates}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_no_orphaned_archiveresults_after_migration(self): + """Migration should not leave orphaned ArchiveResults.""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_7) + conn.close() + seed_0_7_data(db_path) + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_foreign_keys(db_path) + self.assertTrue(ok, msg) + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_timestamps_preserved_after_migration(self): + """Migration should preserve original timestamps.""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_7) + conn.close() + original_data = seed_0_7_data(db_path) + + original_timestamps = {s['url']: s['timestamp'] for s in original_data['snapshots']} + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT url, timestamp FROM core_snapshot") + migrated_timestamps = {row[0]: row[1] for row in cursor.fetchall()} + conn.close() + + for url, original_ts in original_timestamps.items(): + self.assertEqual( + migrated_timestamps.get(url), original_ts, + f"Timestamp changed for {url}: {original_ts} -> {migrated_timestamps.get(url)}" + ) + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_tag_associations_preserved_after_migration(self): + """Migration should preserve snapshot-tag associations.""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_7) + conn.close() + seed_0_7_data(db_path) + + # Count tag associations before migration + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM core_snapshot_tags") + original_count = cursor.fetchone()[0] + conn.close() + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Count tag associations after migration + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM core_snapshot_tags") + migrated_count = cursor.fetchone()[0] + conn.close() + + self.assertEqual(migrated_count, original_count, + f"Tag associations changed: {original_count} -> {migrated_count}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/archivebox/tests/test_migrations_08_to_09.py b/archivebox/tests/test_migrations_08_to_09.py new file mode 100644 index 00000000..d28d4744 --- /dev/null +++ b/archivebox/tests/test_migrations_08_to_09.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python3 +""" +Migration tests from 0.8.x to 0.9.x. + +0.8.x introduced: +- Crawl model for grouping URLs +- Seed model (removed in 0.9.x) +- UUID primary keys for Snapshot +- Status fields for state machine +- New fields like depth, retry_at, etc. +""" + +import shutil +import sqlite3 +import tempfile +import unittest +from pathlib import Path + +from .test_migrations_helpers import ( + SCHEMA_0_8, + seed_0_8_data, + run_archivebox, + create_data_dir_structure, + verify_snapshot_count, + verify_snapshot_urls, + verify_snapshot_titles, + verify_tag_count, + verify_archiveresult_count, + verify_foreign_keys, + verify_all_snapshots_in_output, + verify_crawl_count, +) + + +class TestMigrationFrom08x(unittest.TestCase): + """Test migration from 0.8.x schema to latest.""" + + def setUp(self): + """Create a temporary directory with 0.8.x schema and data.""" + self.work_dir = Path(tempfile.mkdtemp()) + self.db_path = self.work_dir / 'index.sqlite3' + + # Create directory structure + create_data_dir_structure(self.work_dir) + + # Create database with 0.8.x schema + conn = sqlite3.connect(str(self.db_path)) + conn.executescript(SCHEMA_0_8) + conn.close() + + # Seed with test data + self.original_data = seed_0_8_data(self.db_path) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.work_dir, ignore_errors=True) + + def test_migration_preserves_snapshot_count(self): + """Migration should preserve all snapshots from 0.8.x.""" + expected_count = len(self.original_data['snapshots']) + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_snapshot_count(self.db_path, expected_count) + self.assertTrue(ok, msg) + + def test_migration_preserves_snapshot_urls(self): + """Migration should preserve all snapshot URLs from 0.8.x.""" + expected_urls = [s['url'] for s in self.original_data['snapshots']] + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_snapshot_urls(self.db_path, expected_urls) + self.assertTrue(ok, msg) + + def test_migration_preserves_crawls(self): + """Migration should preserve all Crawl records.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + expected_count = len(self.original_data['crawls']) + ok, msg = verify_crawl_count(self.db_path, expected_count) + self.assertTrue(ok, msg) + + def test_migration_preserves_snapshot_crawl_links(self): + """Migration should preserve snapshot-to-crawl relationships.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + + # Check EVERY snapshot still has its crawl_id + for snapshot in self.original_data['snapshots']: + cursor.execute("SELECT crawl_id FROM core_snapshot WHERE url = ?", (snapshot['url'],)) + row = cursor.fetchone() + self.assertIsNotNone(row, f"Snapshot {snapshot['url']} not found after migration") + self.assertEqual(row[0], snapshot['crawl_id'], + f"Crawl ID mismatch for {snapshot['url']}: expected {snapshot['crawl_id']}, got {row[0]}") + + conn.close() + + def test_migration_preserves_tags(self): + """Migration should preserve all tags.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_tag_count(self.db_path, len(self.original_data['tags'])) + self.assertTrue(ok, msg) + + def test_migration_preserves_archiveresults(self): + """Migration should preserve all archive results.""" + expected_count = len(self.original_data['archiveresults']) + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_archiveresult_count(self.db_path, expected_count) + self.assertTrue(ok, msg) + + def test_migration_preserves_archiveresult_status(self): + """Migration should preserve archive result status values.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + + # Get status counts + cursor.execute("SELECT status, COUNT(*) FROM core_archiveresult GROUP BY status") + status_counts = dict(cursor.fetchall()) + conn.close() + + # Original data has known status distribution: succeeded, failed, skipped + self.assertIn('succeeded', status_counts, "Should have succeeded results") + self.assertIn('failed', status_counts, "Should have failed results") + self.assertIn('skipped', status_counts, "Should have skipped results") + + def test_status_works_after_migration(self): + """Status command should work after migration.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['status']) + self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}") + + def test_list_works_after_migration(self): + """List command should work and show ALL migrated data.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['list']) + self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}") + + # Verify ALL snapshots appear in output + output = result.stdout + result.stderr + ok, msg = verify_all_snapshots_in_output(output, self.original_data['snapshots']) + self.assertTrue(ok, msg) + + def test_search_works_after_migration(self): + """Search command should find ALL migrated snapshots.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['search']) + self.assertEqual(result.returncode, 0, f"Search failed after migration: {result.stderr}") + + # Verify ALL snapshots appear in output + output = result.stdout + result.stderr + ok, msg = verify_all_snapshots_in_output(output, self.original_data['snapshots']) + self.assertTrue(ok, msg) + + def test_migration_preserves_snapshot_titles(self): + """Migration should preserve all snapshot titles.""" + expected_titles = {s['url']: s['title'] for s in self.original_data['snapshots']} + + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_snapshot_titles(self.db_path, expected_titles) + self.assertTrue(ok, msg) + + def test_migration_preserves_foreign_keys(self): + """Migration should maintain foreign key relationships.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_foreign_keys(self.db_path) + self.assertTrue(ok, msg) + + def test_migration_removes_seed_id_column(self): + """Migration should remove seed_id column from crawls_crawl.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("PRAGMA table_info(crawls_crawl)") + columns = [row[1] for row in cursor.fetchall()] + conn.close() + + self.assertNotIn('seed_id', columns, + f"seed_id column should have been removed by migration. Columns: {columns}") + + def test_migration_removes_seed_table(self): + """Migration should remove crawls_seed table.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='crawls_seed'") + table_exists = cursor.fetchone() is not None + conn.close() + + self.assertFalse(table_exists, "crawls_seed table should have been removed by migration") + + def test_add_works_after_migration(self): + """Adding new URLs should work after migration from 0.8.x.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + # Check that init actually ran and applied migrations + self.assertIn('Applying', result.stdout + result.stderr, + f"Init did not apply migrations. stdout: {result.stdout[:500]}, stderr: {result.stderr[:500]}") + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Count existing crawls + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM crawls_crawl") + initial_crawl_count = cursor.fetchone()[0] + conn.close() + + # Try to add a new URL after migration (use --index-only for speed) + result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45) + self.assertEqual(result.returncode, 0, f"Add failed after migration: {result.stderr}") + + # Verify a new Crawl was created + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM crawls_crawl") + new_crawl_count = cursor.fetchone()[0] + conn.close() + + self.assertGreater(new_crawl_count, initial_crawl_count, + f"No new Crawl created when adding URL. Add stderr: {result.stderr[-500:]}") + + def test_version_works_after_migration(self): + """Version command should work after migration.""" + result = run_archivebox(self.work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(self.work_dir, ['version']) + self.assertEqual(result.returncode, 0, f"Version failed after migration: {result.stderr}") + + # Should show version info + output = result.stdout + result.stderr + self.assertTrue('ArchiveBox' in output or 'version' in output.lower(), + f"Version output missing expected content: {output[:500]}") + + +class TestMigrationDataIntegrity08x(unittest.TestCase): + """Comprehensive data integrity tests for 0.8.x migrations.""" + + def test_no_duplicate_snapshots_after_migration(self): + """Migration should not create duplicate snapshots.""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_8) + conn.close() + seed_0_8_data(db_path) + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Check for duplicate URLs + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute(""" + SELECT url, COUNT(*) as cnt FROM core_snapshot + GROUP BY url HAVING cnt > 1 + """) + duplicates = cursor.fetchall() + conn.close() + + self.assertEqual(len(duplicates), 0, f"Found duplicate URLs: {duplicates}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_no_orphaned_archiveresults_after_migration(self): + """Migration should not leave orphaned ArchiveResults.""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_8) + conn.close() + seed_0_8_data(db_path) + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + ok, msg = verify_foreign_keys(db_path) + self.assertTrue(ok, msg) + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_timestamps_preserved_after_migration(self): + """Migration should preserve original timestamps.""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_8) + conn.close() + original_data = seed_0_8_data(db_path) + + original_timestamps = {s['url']: s['timestamp'] for s in original_data['snapshots']} + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT url, timestamp FROM core_snapshot") + migrated_timestamps = {row[0]: row[1] for row in cursor.fetchall()} + conn.close() + + for url, original_ts in original_timestamps.items(): + self.assertEqual( + migrated_timestamps.get(url), original_ts, + f"Timestamp changed for {url}: {original_ts} -> {migrated_timestamps.get(url)}" + ) + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_crawl_data_preserved_after_migration(self): + """Migration should preserve crawl metadata (urls, label, status).""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_8) + conn.close() + original_data = seed_0_8_data(db_path) + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check each crawl's data is preserved + for crawl in original_data['crawls']: + cursor.execute("SELECT urls, label FROM crawls_crawl WHERE id = ?", (crawl['id'],)) + row = cursor.fetchone() + self.assertIsNotNone(row, f"Crawl {crawl['id']} not found after migration") + self.assertEqual(row[0], crawl['urls'], f"URLs mismatch for crawl {crawl['id']}") + self.assertEqual(row[1], crawl['label'], f"Label mismatch for crawl {crawl['id']}") + + conn.close() + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_tag_associations_preserved_after_migration(self): + """Migration should preserve snapshot-tag associations.""" + work_dir = Path(tempfile.mkdtemp()) + db_path = work_dir / 'index.sqlite3' + + try: + create_data_dir_structure(work_dir) + conn = sqlite3.connect(str(db_path)) + conn.executescript(SCHEMA_0_8) + conn.close() + seed_0_8_data(db_path) + + # Count tag associations before migration + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM core_snapshot_tags") + original_count = cursor.fetchone()[0] + conn.close() + + result = run_archivebox(work_dir, ['init'], timeout=45) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Count tag associations after migration + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM core_snapshot_tags") + migrated_count = cursor.fetchone()[0] + conn.close() + + self.assertEqual(migrated_count, original_count, + f"Tag associations changed: {original_count} -> {migrated_count}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/archivebox/tests/test_migrations_fresh.py b/archivebox/tests/test_migrations_fresh.py new file mode 100644 index 00000000..0d8ec166 --- /dev/null +++ b/archivebox/tests/test_migrations_fresh.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +""" +Fresh install tests for ArchiveBox. + +Tests that fresh installations work correctly with the current schema. +""" + +import shutil +import sqlite3 +import tempfile +import unittest +from pathlib import Path + +from .test_migrations_helpers import run_archivebox + + +class TestFreshInstall(unittest.TestCase): + """Test that fresh installs work correctly.""" + + def test_init_creates_database(self): + """Fresh init should create database and directories.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Verify database was created + self.assertTrue((work_dir / 'index.sqlite3').exists(), "Database not created") + # Verify archive directory exists + self.assertTrue((work_dir / 'archive').is_dir(), "Archive dir not created") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_status_after_init(self): + """Status command should work after init.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(work_dir, ['status']) + self.assertEqual(result.returncode, 0, f"Status failed: {result.stderr}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_add_url_after_init(self): + """Should be able to add URLs after init with --index-only.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Add a URL with --index-only for speed + result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com']) + self.assertEqual(result.returncode, 0, f"Add command failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + + # Verify a Crawl was created + cursor.execute("SELECT COUNT(*) FROM crawls_crawl") + crawl_count = cursor.fetchone()[0] + self.assertGreaterEqual(crawl_count, 1, "No Crawl was created") + + # Verify at least one snapshot was created + cursor.execute("SELECT COUNT(*) FROM core_snapshot") + snapshot_count = cursor.fetchone()[0] + self.assertGreaterEqual(snapshot_count, 1, "No Snapshot was created") + + conn.close() + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_list_after_add(self): + """List command should show added snapshots.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com']) + self.assertEqual(result.returncode, 0, f"Add failed: {result.stderr}") + + result = run_archivebox(work_dir, ['list']) + self.assertEqual(result.returncode, 0, f"List failed: {result.stderr}") + + # Verify the URL appears in output + output = result.stdout + result.stderr + self.assertIn('example.com', output, f"Added URL not in list output: {output[:500]}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_migrations_table_populated(self): + """Django migrations table should be populated after init.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM django_migrations") + count = cursor.fetchone()[0] + conn.close() + + # Should have many migrations applied + self.assertGreater(count, 10, f"Expected >10 migrations, got {count}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_core_migrations_applied(self): + """Core app migrations should be applied.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + cursor.execute("SELECT name FROM django_migrations WHERE app='core' ORDER BY name") + migrations = [row[0] for row in cursor.fetchall()] + conn.close() + + self.assertIn('0001_initial', migrations) + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + +class TestSchemaIntegrity(unittest.TestCase): + """Test that the database schema is correct.""" + + def test_snapshot_table_has_required_columns(self): + """Snapshot table should have all required columns.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + cursor.execute('PRAGMA table_info(core_snapshot)') + columns = {row[1] for row in cursor.fetchall()} + conn.close() + + required = {'id', 'url', 'timestamp', 'title', 'status', 'created_at', 'modified_at'} + for col in required: + self.assertIn(col, columns, f"Missing column: {col}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_archiveresult_table_has_required_columns(self): + """ArchiveResult table should have all required columns.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + cursor.execute('PRAGMA table_info(core_archiveresult)') + columns = {row[1] for row in cursor.fetchall()} + conn.close() + + required = {'id', 'snapshot_id', 'extractor', 'status', 'created_at', 'modified_at'} + for col in required: + self.assertIn(col, columns, f"Missing column: {col}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_tag_table_has_required_columns(self): + """Tag table should have all required columns.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + cursor.execute('PRAGMA table_info(core_tag)') + columns = {row[1] for row in cursor.fetchall()} + conn.close() + + required = {'id', 'name', 'slug'} + for col in required: + self.assertIn(col, columns, f"Missing column: {col}") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_crawl_table_has_required_columns(self): + """Crawl table should have all required columns.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + cursor.execute('PRAGMA table_info(crawls_crawl)') + columns = {row[1] for row in cursor.fetchall()} + conn.close() + + required = {'id', 'urls', 'status', 'created_at', 'created_by_id'} + for col in required: + self.assertIn(col, columns, f"Missing column: {col}") + + # seed_id should NOT exist (removed in 0.9.x) + self.assertNotIn('seed_id', columns, "seed_id column should not exist in 0.9.x") + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + +class TestMultipleSnapshots(unittest.TestCase): + """Test handling multiple snapshots.""" + + def test_add_urls_separately(self): + """Should be able to add multiple URLs one at a time.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + # Add URLs one at a time + result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com']) + self.assertEqual(result.returncode, 0, f"Add 1 failed: {result.stderr}") + + result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.org']) + self.assertEqual(result.returncode, 0, f"Add 2 failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + + # Verify snapshots were created + cursor.execute("SELECT COUNT(*) FROM core_snapshot") + snapshot_count = cursor.fetchone()[0] + self.assertEqual(snapshot_count, 2, f"Expected 2 snapshots, got {snapshot_count}") + + # Verify crawls were created (one per add call) + cursor.execute("SELECT COUNT(*) FROM crawls_crawl") + crawl_count = cursor.fetchone()[0] + self.assertEqual(crawl_count, 2, f"Expected 2 Crawls, got {crawl_count}") + + conn.close() + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + def test_snapshots_linked_to_crawls(self): + """Each snapshot should be linked to a crawl.""" + work_dir = Path(tempfile.mkdtemp()) + + try: + result = run_archivebox(work_dir, ['init']) + self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") + + result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com']) + self.assertEqual(result.returncode, 0, f"Add failed: {result.stderr}") + + conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) + cursor = conn.cursor() + + # Check that snapshot has a crawl_id + cursor.execute("SELECT crawl_id FROM core_snapshot WHERE url = 'https://example.com'") + row = cursor.fetchone() + self.assertIsNotNone(row, "Snapshot not found") + self.assertIsNotNone(row[0], "Snapshot should have a crawl_id") + + conn.close() + + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/archivebox/tests/tests_migrations.py b/archivebox/tests/test_migrations_helpers.py old mode 100755 new mode 100644 similarity index 52% rename from archivebox/tests/tests_migrations.py rename to archivebox/tests/test_migrations_helpers.py index 87f0d2ff..d2bf17aa --- a/archivebox/tests/tests_migrations.py +++ b/archivebox/tests/test_migrations_helpers.py @@ -1,35 +1,21 @@ #!/usr/bin/env python3 """ -Migration tests for ArchiveBox. +Helper functions and schema definitions for migration tests. -Tests that data directories from older versions can be migrated to newer versions -without data loss. Supports testing from 0.4.x (first Django version) to latest. - -Run with: pytest archivebox/cli/tests_migrations.py -v - -Schema Evolution: -- 0.4.x: Snapshot (tags as comma-separated string), no Tag model, no ArchiveResult -- 0.6.x: Added Tag model, Snapshot.tags became ManyToMany, added ArchiveResult -- 0.7.x: Same as 0.6.x with minor field additions -- 0.8.x: Added status fields, renamed datetime fields, added Crawl/Seed models, - changed primary keys from AutoField to UUID for Tag/ArchiveResult +This module provides: +- Schema definitions for each major ArchiveBox version (0.4.x, 0.7.x, 0.8.x) +- Data seeding functions to populate test databases +- Helper functions to run archivebox commands and verify results """ -# Note: This test file intentionally does NOT set __package__ to avoid -# importing archivebox directly (which would trigger root checks). -# All tests run archivebox via subprocess, which handles its own env. - import os import sys import json -import shutil import sqlite3 -import tempfile import subprocess -import unittest from pathlib import Path from datetime import datetime, timezone -from typing import Dict, List, Any, Tuple +from typing import Dict, List, Tuple from uuid import uuid4 @@ -37,9 +23,6 @@ from uuid import uuid4 # Schema Definitions for Each Version # ============================================================================= -# Represents the minimum schema needed for each major version -# These are simplified - real migrations handle edge cases - SCHEMA_0_4 = """ -- Django system tables (minimal) CREATE TABLE IF NOT EXISTS django_migrations ( @@ -543,13 +526,11 @@ def seed_0_4_data(db_path: Path) -> Dict[str, List[Dict]]: conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() - # Track created data for verification created_data = { 'snapshots': [], - 'tags_str': [], # Tags are stored as comma-separated strings in 0.4.x + 'tags_str': [], } - # Create 5 snapshots with various data test_urls = [ ('https://example.com/page1', 'Example Page 1', 'news,tech'), ('https://example.org/article', 'Article Title', 'blog,reading'), @@ -577,7 +558,6 @@ def seed_0_4_data(db_path: Path) -> Dict[str, List[Dict]]: }) created_data['tags_str'].append(tags) - # Record migrations as applied (0.4.x had just the initial migration) cursor.execute(""" INSERT INTO django_migrations (app, name, applied) VALUES ('core', '0001_initial', datetime('now')) @@ -646,7 +626,7 @@ def seed_0_7_data(db_path: Path) -> Dict[str, List[Dict]]: 'title': title, }) - # Assign 2 random tags to each snapshot + # Assign 2 tags to each snapshot tag_ids = [created_data['tags'][i % 5]['id'], created_data['tags'][(i + 1) % 5]['id']] for tag_id in tag_ids: cursor.execute(""" @@ -658,7 +638,6 @@ def seed_0_7_data(db_path: Path) -> Dict[str, List[Dict]]: statuses = ['succeeded', 'succeeded', 'failed', 'succeeded', 'skipped'] for j, (extractor, status) in enumerate(zip(extractors, statuses)): - # Note: uuid column is added by our migration, not present in 0.7.x cursor.execute(""" INSERT INTO core_archiveresult (snapshot_id, extractor, cmd, pwd, cmd_version, output, start_ts, end_ts, status) @@ -682,7 +661,6 @@ def seed_0_7_data(db_path: Path) -> Dict[str, List[Dict]]: # Record migrations as applied (0.7.x migrations up to 0022) migrations = [ - # Django system migrations ('contenttypes', '0001_initial'), ('contenttypes', '0002_remove_content_type_name'), ('auth', '0001_initial'), @@ -701,7 +679,6 @@ def seed_0_7_data(db_path: Path) -> Dict[str, List[Dict]]: ('admin', '0002_logentry_remove_auto_add'), ('admin', '0003_logentry_add_action_flag_choices'), ('sessions', '0001_initial'), - # Core migrations ('core', '0001_initial'), ('core', '0002_auto_20200625_1521'), ('core', '0003_auto_20200630_1034'), @@ -816,8 +793,8 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: ('https://example.com/page1', 'Example Page 1', created_data['crawls'][0]['id']), ('https://example.org/article', 'Article Title', created_data['crawls'][0]['id']), ('https://github.com/user/repo', 'GitHub Repository', created_data['crawls'][1]['id']), - ('https://news.ycombinator.com/item?id=12345', 'HN Discussion', None), # No crawl - ('https://en.wikipedia.org/wiki/Test', 'Wikipedia Test', None), # No crawl + ('https://news.ycombinator.com/item?id=12345', 'HN Discussion', None), + ('https://en.wikipedia.org/wiki/Test', 'Wikipedia Test', None), ] for i, (url, title, crawl_id) in enumerate(test_urls): @@ -839,7 +816,7 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: 'crawl_id': crawl_id, }) - # Assign 2 random tags to each snapshot + # Assign 2 tags to each snapshot tag_ids = [created_data['tags'][i % 5]['id'], created_data['tags'][(i + 1) % 5]['id']] for tag_id in tag_ids: cursor.execute(""" @@ -879,7 +856,6 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: # Record migrations as applied (0.8.x migrations) migrations = [ - # Django system migrations ('contenttypes', '0001_initial'), ('contenttypes', '0002_remove_content_type_name'), ('auth', '0001_initial'), @@ -898,7 +874,6 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: ('admin', '0002_logentry_remove_auto_add'), ('admin', '0003_logentry_add_action_flag_choices'), ('sessions', '0001_initial'), - # Core migrations (up to 0.8.x) ('core', '0001_initial'), ('core', '0002_auto_20200625_1521'), ('core', '0003_auto_20200630_1034'), @@ -922,7 +897,6 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: ('core', '0021_auto_20220914_0934'), ('core', '0022_auto_20231023_2008'), # For 0.8.x (dev branch), record the migrations that 0023_new_schema replaces - # This is required because 0023_new_schema is a squashed migration ('core', '0023_alter_archiveresult_options_archiveresult_abid_and_more'), ('core', '0024_auto_20240513_1143'), ('core', '0025_alter_archiveresult_uuid'), @@ -975,9 +949,7 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: ('core', '0072_rename_added_snapshot_bookmarked_at_and_more'), ('core', '0073_rename_created_archiveresult_created_at_and_more'), ('core', '0074_alter_snapshot_downloaded_at'), - # Also record the squashed migration itself ('core', '0023_new_schema'), - # Machine app - record both squashed and individual migrations (like fresh install does) ('machine', '0001_initial'), ('machine', '0001_squashed'), ('machine', '0002_alter_machine_stats_installedbinary'), @@ -985,9 +957,6 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: ('machine', '0004_alter_installedbinary_abspath_and_more'), ('core', '0024_snapshot_crawl'), ('core', '0025_allow_duplicate_urls_per_crawl'), - # Note: core.0026 removes output_dir which the 0.8.x schema still has - # Let Django apply it during migration - # API app - record both squashed and individual migrations (like fresh install does) ('api', '0001_initial'), ('api', '0001_squashed'), ('api', '0002_alter_apitoken_options'), @@ -998,7 +967,6 @@ def seed_0_8_data(db_path: Path) -> Dict[str, List[Dict]]: ('api', '0007_alter_apitoken_created_by'), ('api', '0008_alter_apitoken_created_alter_apitoken_created_by_and_more'), ('api', '0009_rename_created_apitoken_created_at_and_more'), - # Crawls migrations ('crawls', '0001_initial'), ] @@ -1073,16 +1041,16 @@ def verify_snapshot_count(db_path: Path, expected: int) -> Tuple[bool, str]: def verify_tag_count(db_path: Path, expected: int) -> Tuple[bool, str]: - """Verify the number of tags in the database.""" + """Verify the number of tags in the database (exact match).""" conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM core_tag") count = cursor.fetchone()[0] conn.close() - if count >= expected: # May have more due to tag splitting - return True, f"Tag count OK: {count} (expected >= {expected})" - return False, f"Tag count mismatch: expected >= {expected}, got {count}" + if count == expected: + return True, f"Tag count OK: {count}" + return False, f"Tag count mismatch: expected {expected}, got {count}" def verify_archiveresult_count(db_path: Path, expected: int) -> Tuple[bool, str]: @@ -1099,7 +1067,7 @@ def verify_archiveresult_count(db_path: Path, expected: int) -> Tuple[bool, str] def verify_snapshot_urls(db_path: Path, expected_urls: List[str]) -> Tuple[bool, str]: - """Verify all expected URLs exist in snapshots.""" + """Verify ALL expected URLs exist in snapshots.""" conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() cursor.execute("SELECT url FROM core_snapshot") @@ -1113,7 +1081,7 @@ def verify_snapshot_urls(db_path: Path, expected_urls: List[str]) -> Tuple[bool, def verify_snapshot_titles(db_path: Path, expected_titles: Dict[str, str]) -> Tuple[bool, str]: - """Verify snapshot titles are preserved.""" + """Verify ALL snapshot titles are preserved.""" conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() cursor.execute("SELECT url, title FROM core_snapshot") @@ -1122,7 +1090,9 @@ def verify_snapshot_titles(db_path: Path, expected_titles: Dict[str, str]) -> Tu mismatches = [] for url, expected_title in expected_titles.items(): - if url in actual and actual[url] != expected_title: + if url not in actual: + mismatches.append(f"{url}: missing from database") + elif actual[url] != expected_title: mismatches.append(f"{url}: expected '{expected_title}', got '{actual[url]}'") if not mismatches: @@ -1149,850 +1119,28 @@ def verify_foreign_keys(db_path: Path) -> Tuple[bool, str]: return False, f"Found {orphaned_results} orphaned ArchiveResults" -# ============================================================================= -# Test Classes -# ============================================================================= - -class TestFreshInstall(unittest.TestCase): - """Test that fresh installs work correctly.""" - - def test_init_creates_database(self): - """Fresh init should create database and directories.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0, f"Init failed: {result.stderr}") - - # Verify database was created - self.assertTrue((work_dir / 'index.sqlite3').exists(), "Database not created") - # Verify archive directory exists - self.assertTrue((work_dir / 'archive').is_dir(), "Archive dir not created") - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_status_after_init(self): - """Status command should work after init.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - result = run_archivebox(work_dir, ['status']) - self.assertEqual(result.returncode, 0, f"Status failed: {result.stderr}") - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_add_url_after_init(self): - """Should be able to add URLs after init with --index-only (fast).""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - # Add a URL with --index-only for speed - result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com']) - self.assertIn(result.returncode, [0, 1], - f"Add command crashed: {result.stderr}") - - conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) - cursor = conn.cursor() - - # Verify a Crawl was created - cursor.execute("SELECT COUNT(*) FROM crawls_crawl") - crawl_count = cursor.fetchone()[0] - self.assertGreaterEqual(crawl_count, 1, "No Crawl was created") - - # Verify at least one snapshot was created - cursor.execute("SELECT COUNT(*) FROM core_snapshot") - snapshot_count = cursor.fetchone()[0] - self.assertGreaterEqual(snapshot_count, 1, "No Snapshot was created") - - conn.close() - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_list_after_add(self): - """List command should show added snapshots.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com']) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(work_dir, ['list']) - self.assertEqual(result.returncode, 0, f"List failed: {result.stderr}") - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_migrations_table_populated(self): - """Django migrations table should be populated after init.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*) FROM django_migrations") - count = cursor.fetchone()[0] - conn.close() - - # Should have many migrations applied - self.assertGreater(count, 10, f"Expected >10 migrations, got {count}") - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_core_migrations_applied(self): - """Core app migrations should be applied.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) - cursor = conn.cursor() - cursor.execute("SELECT name FROM django_migrations WHERE app='core' ORDER BY name") - migrations = [row[0] for row in cursor.fetchall()] - conn.close() - - self.assertIn('0001_initial', migrations) - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - -class TestSchemaIntegrity(unittest.TestCase): - """Test that the database schema is correct.""" - - def test_snapshot_table_has_required_columns(self): - """Snapshot table should have all required columns.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) - cursor = conn.cursor() - cursor.execute('PRAGMA table_info(core_snapshot)') - columns = {row[1] for row in cursor.fetchall()} - conn.close() - - required = {'id', 'url', 'timestamp', 'title', 'status', 'created_at', 'modified_at'} - for col in required: - self.assertIn(col, columns, f"Missing column: {col}") - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_archiveresult_table_has_required_columns(self): - """ArchiveResult table should have all required columns.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) - cursor = conn.cursor() - cursor.execute('PRAGMA table_info(core_archiveresult)') - columns = {row[1] for row in cursor.fetchall()} - conn.close() - - required = {'id', 'snapshot_id', 'extractor', 'status', 'created_at', 'modified_at'} - for col in required: - self.assertIn(col, columns, f"Missing column: {col}") - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_tag_table_has_required_columns(self): - """Tag table should have all required columns.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) - cursor = conn.cursor() - cursor.execute('PRAGMA table_info(core_tag)') - columns = {row[1] for row in cursor.fetchall()} - conn.close() - - required = {'id', 'name', 'slug'} - for col in required: - self.assertIn(col, columns, f"Missing column: {col}") - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - -class TestMultipleSnapshots(unittest.TestCase): - """Test handling multiple snapshots.""" - - def test_add_multiple_urls(self): - """Should be able to add multiple URLs with --index-only.""" - work_dir = Path(tempfile.mkdtemp()) - - - try: - result = run_archivebox(work_dir, ['init']) - self.assertEqual(result.returncode, 0) - - # Add multiple URLs with --index-only for speed - result = run_archivebox(work_dir, ['add', '--index-only', 'https://example.com', 'https://example.org']) - self.assertIn(result.returncode, [0, 1]) - - conn = sqlite3.connect(str(work_dir / 'index.sqlite3')) - cursor = conn.cursor() - - # Verify a Crawl was created - cursor.execute("SELECT COUNT(*) FROM crawls_crawl") - crawl_count = cursor.fetchone()[0] - self.assertGreaterEqual(crawl_count, 1, f"Expected >=1 Crawl, got {crawl_count}") - - conn.close() - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - -class TestMigrationFrom07x(unittest.TestCase): - """Test migration from 0.7.x schema to latest.""" - - def setUp(self): - """Create a temporary directory with 0.7.x schema and data.""" - self.work_dir = Path(tempfile.mkdtemp()) - self.db_path = self.work_dir / 'index.sqlite3' - - # Create directory structure - create_data_dir_structure(self.work_dir) - - # Create database with 0.7.x schema - conn = sqlite3.connect(str(self.db_path)) - conn.executescript(SCHEMA_0_7) - conn.close() - - # Seed with test data - self.original_data = seed_0_7_data(self.db_path) - - # Change ownership to testuser so archivebox can write to it - - - def tearDown(self): - """Clean up temporary directory.""" - shutil.rmtree(self.work_dir, ignore_errors=True) - - def test_migration_preserves_snapshot_count(self): - """Migration should preserve all snapshots.""" - expected_count = len(self.original_data['snapshots']) - - # Run init to trigger migrations - result = run_archivebox(self.work_dir, ['init'], timeout=45) - - # Check return code - may be 1 if some migrations have issues, but data should be preserved - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - # Verify snapshot count - ok, msg = verify_snapshot_count(self.db_path, expected_count) - self.assertTrue(ok, msg) - - def test_migration_preserves_snapshot_urls(self): - """Migration should preserve all snapshot URLs.""" - expected_urls = [s['url'] for s in self.original_data['snapshots']] - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_snapshot_urls(self.db_path, expected_urls) - self.assertTrue(ok, msg) - - def test_migration_preserves_snapshot_titles(self): - """Migration should preserve all snapshot titles.""" - expected_titles = {s['url']: s['title'] for s in self.original_data['snapshots']} - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_snapshot_titles(self.db_path, expected_titles) - self.assertTrue(ok, msg) - - def test_migration_preserves_tags(self): - """Migration should preserve all tags.""" - expected_count = len(self.original_data['tags']) - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_tag_count(self.db_path, expected_count) - self.assertTrue(ok, msg) - - def test_migration_preserves_archiveresults(self): - """Migration should preserve all archive results.""" - expected_count = len(self.original_data['archiveresults']) - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_archiveresult_count(self.db_path, expected_count) - self.assertTrue(ok, msg) - - def test_migration_preserves_foreign_keys(self): - """Migration should maintain foreign key relationships.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_foreign_keys(self.db_path) - self.assertTrue(ok, msg) - - def test_status_works_after_migration(self): - """Status command should work after migration.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['status']) - self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}") - - def test_search_works_after_migration(self): - """Search command should find migrated snapshots.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['search']) - self.assertEqual(result.returncode, 0, f"Search failed after migration: {result.stderr}") - - # Should find at least some of the migrated URLs - output = result.stdout + result.stderr - found_any = any(s['url'][:30] in output or s['title'] in output - for s in self.original_data['snapshots']) - self.assertTrue(found_any, f"No migrated snapshots found in search: {output[:500]}") - - def test_list_works_after_migration(self): - """List command should work and show migrated data.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['list']) - self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}") - - # Should find at least some of the migrated URLs - output = result.stdout + result.stderr - found_any = any(s['url'][:30] in output or (s['title'] and s['title'] in output) - for s in self.original_data['snapshots']) - self.assertTrue(found_any, f"No migrated snapshots found in list: {output[:500]}") - - def test_new_schema_elements_created_after_migration(self): - """Migration should create new 0.9.x schema elements (crawls_crawl, etc.).""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - - # Check that new tables exist - cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") - tables = {row[0] for row in cursor.fetchall()} - conn.close() - - # 0.9.x should have crawls_crawl table - self.assertIn('crawls_crawl', tables, "crawls_crawl table not created during migration") - - def test_snapshots_have_new_fields_after_migration(self): - """Migrated snapshots should have new 0.9.x fields (status, depth, etc.).""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - - # Check snapshot table has new columns - cursor.execute('PRAGMA table_info(core_snapshot)') - columns = {row[1] for row in cursor.fetchall()} - conn.close() - - # 0.9.x snapshots should have status, depth, created_at, modified_at - required_new_columns = {'status', 'depth', 'created_at', 'modified_at'} - for col in required_new_columns: - self.assertIn(col, columns, f"Snapshot missing new column: {col}") - - def test_add_works_after_migration(self): - """Adding new URLs should work after migration from 0.7.x.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - # Verify that init created the crawls_crawl table before proceeding - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='crawls_crawl'") - table_exists = cursor.fetchone() is not None - conn.close() - self.assertTrue(table_exists, f"Init failed to create crawls_crawl table. Init stderr: {result.stderr[-500:]}") - - # Try to add a new URL after migration (use --index-only for speed) - result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Add crashed after migration: {result.stderr}") - - # Verify a Crawl was created for the new URL - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*) FROM crawls_crawl") - crawl_count = cursor.fetchone()[0] - conn.close() - - self.assertGreaterEqual(crawl_count, 1, f"No Crawl created when adding URL. Add stderr: {result.stderr[-500:]}") - - def test_archiveresult_status_preserved_after_migration(self): - """Migration should preserve archive result status values.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - - # Get status counts - cursor.execute("SELECT status, COUNT(*) FROM core_archiveresult GROUP BY status") - status_counts = dict(cursor.fetchall()) - conn.close() - - # Original data has known status distribution: succeeded, failed, skipped - self.assertIn('succeeded', status_counts, "Should have succeeded results") - self.assertIn('failed', status_counts, "Should have failed results") - self.assertIn('skipped', status_counts, "Should have skipped results") - - def test_version_works_after_migration(self): - """Version command should work after migration.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['version']) - # Exit code might be 1 if some binaries are missing, but should not crash - self.assertIn(result.returncode, [0, 1], f"Version crashed after migration: {result.stderr}") - - # Should show version info - output = result.stdout + result.stderr - self.assertTrue('ArchiveBox' in output or 'version' in output.lower(), - f"Version output missing expected content: {output[:500]}") - - def test_help_works_after_migration(self): - """Help command should work after migration.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['help']) - self.assertEqual(result.returncode, 0, f"Help crashed after migration: {result.stderr}") - - # Should show available commands - output = result.stdout + result.stderr - self.assertTrue('add' in output.lower() or 'status' in output.lower(), - f"Help output missing expected commands: {output[:500]}") - - -class TestMigrationFrom04x(unittest.TestCase): - """Test migration from 0.4.x schema to latest. - - 0.4.x was the first Django-powered version with a simpler schema: - - No Tag model (tags stored as comma-separated string in Snapshot) - - No ArchiveResult model (results stored in JSON files) - """ - - def setUp(self): - """Create a temporary directory with 0.4.x schema and data.""" - self.work_dir = Path(tempfile.mkdtemp()) - self.db_path = self.work_dir / 'index.sqlite3' - - # Create directory structure - create_data_dir_structure(self.work_dir) - - # Create database with 0.4.x schema - conn = sqlite3.connect(str(self.db_path)) - conn.executescript(SCHEMA_0_4) - conn.close() - - # Seed with test data - self.original_data = seed_0_4_data(self.db_path) - - # Change ownership to testuser so archivebox can write to it - - - def tearDown(self): - """Clean up temporary directory.""" - shutil.rmtree(self.work_dir, ignore_errors=True) - - def test_migration_preserves_snapshot_count(self): - """Migration should preserve all snapshots from 0.4.x.""" - expected_count = len(self.original_data['snapshots']) - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_snapshot_count(self.db_path, expected_count) - self.assertTrue(ok, msg) - - def test_migration_preserves_snapshot_urls(self): - """Migration should preserve all snapshot URLs from 0.4.x.""" - expected_urls = [s['url'] for s in self.original_data['snapshots']] - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_snapshot_urls(self.db_path, expected_urls) - self.assertTrue(ok, msg) - - def test_migration_converts_string_tags_to_model(self): - """Migration should convert comma-separated tags to Tag model instances.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - # Collect unique tags from original data - original_tags = set() - for tags_str in self.original_data['tags_str']: - if tags_str: - for tag in tags_str.split(','): - original_tags.add(tag.strip()) - - # Tags should have been created - ok, msg = verify_tag_count(self.db_path, len(original_tags)) - self.assertTrue(ok, msg) - - -class TestMigrationFrom08x(unittest.TestCase): - """Test migration from 0.8.x schema to latest. - - 0.8.x introduced: - - Crawl model for grouping URLs - - UUID primary keys for Snapshot - - Status fields for state machine - - New fields like depth, retry_at, etc. - """ - - def setUp(self): - """Create a temporary directory with 0.8.x schema and data.""" - self.work_dir = Path(tempfile.mkdtemp()) - self.db_path = self.work_dir / 'index.sqlite3' - - # Create directory structure - create_data_dir_structure(self.work_dir) - - # Create database with 0.8.x schema - conn = sqlite3.connect(str(self.db_path)) - conn.executescript(SCHEMA_0_8) - conn.close() - - # Seed with test data - self.original_data = seed_0_8_data(self.db_path) - - # Change ownership to testuser so archivebox can write to it - - - def tearDown(self): - """Clean up temporary directory.""" - shutil.rmtree(self.work_dir, ignore_errors=True) - - def test_migration_preserves_snapshot_count(self): - """Migration should preserve all snapshots from 0.8.x.""" - expected_count = len(self.original_data['snapshots']) - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_snapshot_count(self.db_path, expected_count) - self.assertTrue(ok, msg) - - def test_migration_preserves_snapshot_urls(self): - """Migration should preserve all snapshot URLs from 0.8.x.""" - expected_urls = [s['url'] for s in self.original_data['snapshots']] - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_snapshot_urls(self.db_path, expected_urls) - self.assertTrue(ok, msg) - - def test_migration_preserves_crawls(self): - """Migration should preserve all Crawl records.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*) FROM crawls_crawl") - count = cursor.fetchone()[0] - conn.close() - - expected_count = len(self.original_data['crawls']) - self.assertEqual(count, expected_count, f"Crawl count mismatch: expected {expected_count}, got {count}") - - def test_migration_preserves_snapshot_crawl_links(self): - """Migration should preserve snapshot-to-crawl relationships.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - - # Check each snapshot still has its crawl_id - for snapshot in self.original_data['snapshots']: - if snapshot['crawl_id']: - cursor.execute("SELECT crawl_id FROM core_snapshot WHERE url = ?", (snapshot['url'],)) - row = cursor.fetchone() - self.assertIsNotNone(row, f"Snapshot {snapshot['url']} not found after migration") - self.assertEqual(row[0], snapshot['crawl_id'], - f"Crawl ID mismatch for {snapshot['url']}: expected {snapshot['crawl_id']}, got {row[0]}") - - conn.close() - - def test_migration_preserves_tags(self): - """Migration should preserve all tags.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_tag_count(self.db_path, len(self.original_data['tags'])) - self.assertTrue(ok, msg) - - def test_migration_preserves_archiveresults(self): - """Migration should preserve all archive results.""" - expected_count = len(self.original_data['archiveresults']) - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_archiveresult_count(self.db_path, expected_count) - self.assertTrue(ok, msg) - - def test_migration_preserves_archiveresult_status(self): - """Migration should preserve archive result status values.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - - # Get status counts - cursor.execute("SELECT status, COUNT(*) FROM core_archiveresult GROUP BY status") - status_counts = dict(cursor.fetchall()) - conn.close() - - # Original data has known status distribution: succeeded, failed, skipped - self.assertIn('succeeded', status_counts, "Should have succeeded results") - self.assertIn('failed', status_counts, "Should have failed results") - self.assertIn('skipped', status_counts, "Should have skipped results") - - def test_status_works_after_migration(self): - """Status command should work after migration.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['status']) - self.assertEqual(result.returncode, 0, f"Status failed after migration: {result.stderr}") - - def test_list_works_after_migration(self): - """List command should work and show migrated data.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['list']) - self.assertEqual(result.returncode, 0, f"List failed after migration: {result.stderr}") - - # Should find at least some of the migrated URLs - output = result.stdout + result.stderr - found_any = any(s['url'][:30] in output or (s['title'] and s['title'] in output) - for s in self.original_data['snapshots']) - self.assertTrue(found_any, f"No migrated snapshots found in list: {output[:500]}") - - def test_search_works_after_migration(self): - """Search command should find migrated snapshots.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['search']) - self.assertEqual(result.returncode, 0, f"Search failed after migration: {result.stderr}") - - # Should find at least some of the migrated URLs - output = result.stdout + result.stderr - found_any = any(s['url'][:30] in output or (s['title'] and s['title'] in output) - for s in self.original_data['snapshots']) - self.assertTrue(found_any, f"No migrated snapshots found in search: {output[:500]}") - - def test_migration_preserves_snapshot_titles(self): - """Migration should preserve all snapshot titles.""" - expected_titles = {s['url']: s['title'] for s in self.original_data['snapshots']} - - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_snapshot_titles(self.db_path, expected_titles) - self.assertTrue(ok, msg) - - def test_migration_preserves_foreign_keys(self): - """Migration should maintain foreign key relationships.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - ok, msg = verify_foreign_keys(self.db_path) - self.assertTrue(ok, msg) - - def test_add_works_after_migration(self): - """Adding new URLs should work after migration from 0.8.x.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - # Check that init actually ran and applied migrations - self.assertIn('Applying', result.stdout + result.stderr, - f"Init did not apply migrations. stdout: {result.stdout[:500]}, stderr: {result.stderr[:500]}") - self.assertIn(result.returncode, [0, 1], f"Init crashed: {result.stderr}") - - # Check that seed_id column was removed by migration - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - cursor.execute("PRAGMA table_info(crawls_crawl)") - columns = [row[1] for row in cursor.fetchall()] - self.assertNotIn('seed_id', columns, - f"seed_id column should have been removed by migration. Columns: {columns}") - - # Count existing crawls - cursor.execute("SELECT COUNT(*) FROM crawls_crawl") - initial_crawl_count = cursor.fetchone()[0] - conn.close() - - # Try to add a new URL after migration (use --index-only for speed) - result = run_archivebox(self.work_dir, ['add', '--index-only', 'https://example.com/new-page'], timeout=45) - self.assertIn(result.returncode, [0, 1], f"Add crashed after migration: {result.stderr}") - - # Verify a new Crawl was created - conn = sqlite3.connect(str(self.db_path)) - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*) FROM crawls_crawl") - new_crawl_count = cursor.fetchone()[0] - conn.close() - - self.assertGreater(new_crawl_count, initial_crawl_count, - f"No new Crawl created when adding URL. Add stderr: {result.stderr[-500:]}") - - def test_version_works_after_migration(self): - """Version command should work after migration.""" - result = run_archivebox(self.work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - result = run_archivebox(self.work_dir, ['version']) - # Exit code might be 1 if some binaries are missing, but should not crash - self.assertIn(result.returncode, [0, 1], f"Version crashed after migration: {result.stderr}") - - # Should show version info - output = result.stdout + result.stderr - self.assertTrue('ArchiveBox' in output or 'version' in output.lower(), - f"Version output missing expected content: {output[:500]}") - - -class TestMigrationDataIntegrity(unittest.TestCase): - """Comprehensive data integrity tests for migrations.""" - - def test_no_duplicate_snapshots_after_migration(self): - """Migration should not create duplicate snapshots.""" - work_dir = Path(tempfile.mkdtemp()) - db_path = work_dir / 'index.sqlite3' - - try: - create_data_dir_structure(work_dir) - conn = sqlite3.connect(str(db_path)) - conn.executescript(SCHEMA_0_7) - conn.close() - seed_0_7_data(db_path) - - - result = run_archivebox(work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - # Check for duplicate URLs - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - cursor.execute(""" - SELECT url, COUNT(*) as cnt FROM core_snapshot - GROUP BY url HAVING cnt > 1 - """) - duplicates = cursor.fetchall() - conn.close() - - self.assertEqual(len(duplicates), 0, f"Found duplicate URLs: {duplicates}") - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_no_orphaned_archiveresults_after_migration(self): - """Migration should not leave orphaned ArchiveResults.""" - work_dir = Path(tempfile.mkdtemp()) - db_path = work_dir / 'index.sqlite3' - - try: - create_data_dir_structure(work_dir) - conn = sqlite3.connect(str(db_path)) - conn.executescript(SCHEMA_0_7) - conn.close() - seed_0_7_data(db_path) - - - result = run_archivebox(work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - ok, msg = verify_foreign_keys(db_path) - self.assertTrue(ok, msg) - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - def test_timestamps_preserved_after_migration(self): - """Migration should preserve original timestamps.""" - work_dir = Path(tempfile.mkdtemp()) - db_path = work_dir / 'index.sqlite3' - - try: - create_data_dir_structure(work_dir) - conn = sqlite3.connect(str(db_path)) - conn.executescript(SCHEMA_0_7) - conn.close() - original_data = seed_0_7_data(db_path) - - - original_timestamps = {s['url']: s['timestamp'] for s in original_data['snapshots']} - - result = run_archivebox(work_dir, ['init'], timeout=45) - self.assertIn(result.returncode, [0, 1]) - - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - cursor.execute("SELECT url, timestamp FROM core_snapshot") - migrated_timestamps = {row[0]: row[1] for row in cursor.fetchall()} - conn.close() - - for url, original_ts in original_timestamps.items(): - self.assertEqual( - migrated_timestamps.get(url), original_ts, - f"Timestamp changed for {url}: {original_ts} -> {migrated_timestamps.get(url)}" - ) - - finally: - shutil.rmtree(work_dir, ignore_errors=True) - - -if __name__ == '__main__': - unittest.main() +def verify_all_snapshots_in_output(output: str, snapshots: List[Dict]) -> Tuple[bool, str]: + """Verify ALL snapshots appear in command output (not just one).""" + missing = [] + for snapshot in snapshots: + url_fragment = snapshot['url'][:30] + title = snapshot.get('title', '') + if url_fragment not in output and (not title or title not in output): + missing.append(snapshot['url']) + + if not missing: + return True, "All snapshots found in output" + return False, f"Missing snapshots in output: {missing}" + + +def verify_crawl_count(db_path: Path, expected: int) -> Tuple[bool, str]: + """Verify the number of crawls in the database.""" + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM crawls_crawl") + count = cursor.fetchone()[0] + conn.close() + + if count == expected: + return True, f"Crawl count OK: {count}" + return False, f"Crawl count mismatch: expected {expected}, got {count}"