mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-05 10:26:03 +10:00
more migration fixes
This commit is contained in:
@@ -38,6 +38,14 @@ def update(filter_patterns: Iterable[str] = (),
|
||||
|
||||
from archivebox.core.models import Snapshot
|
||||
from django.utils import timezone
|
||||
from django.core.management import call_command
|
||||
|
||||
# Run migrations first to ensure DB schema is up-to-date
|
||||
print('[*] Checking for pending migrations...')
|
||||
try:
|
||||
call_command('migrate', '--no-input', verbosity=0)
|
||||
except Exception as e:
|
||||
print(f'[!] Warning: Migration check failed: {e}')
|
||||
|
||||
while True:
|
||||
if filter_patterns or before or after:
|
||||
@@ -136,9 +144,17 @@ def drain_old_archive_dirs(resume_from: str = None, batch_size: int = 100) -> di
|
||||
|
||||
# Check if needs migration (0.8.x → 0.9.x)
|
||||
if snapshot.fs_migration_needed:
|
||||
snapshot.save() # Triggers migration + creates symlink
|
||||
stats['migrated'] += 1
|
||||
print(f" [{stats['processed']}] Migrated: {entry_path.name}")
|
||||
try:
|
||||
snapshot.save() # Triggers migration + creates symlink
|
||||
stats['migrated'] += 1
|
||||
print(f" [{stats['processed']}] Migrated: {entry_path.name}")
|
||||
except Exception as e:
|
||||
# Snapshot already exists in DB with different crawl - skip it
|
||||
if 'UNIQUE constraint failed' in str(e):
|
||||
stats['skipped'] += 1
|
||||
print(f" [{stats['processed']}] Skipped (already in DB): {entry_path.name}")
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
stats['skipped'] += 1
|
||||
|
||||
@@ -170,19 +186,33 @@ def process_all_db_snapshots(batch_size: int = 100) -> dict:
|
||||
print(f'[*] Processing {total} snapshots from database (most recent first)...')
|
||||
|
||||
# Process from most recent to least recent
|
||||
for snapshot in Snapshot.objects.order_by('-bookmarked_at').iterator(chunk_size=batch_size):
|
||||
# Reconcile index.json with DB
|
||||
snapshot.reconcile_with_index_json()
|
||||
|
||||
# Queue for archiving (state machine will handle it)
|
||||
snapshot.status = Snapshot.StatusChoices.QUEUED
|
||||
snapshot.retry_at = timezone.now()
|
||||
snapshot.save()
|
||||
|
||||
stats['reconciled'] += 1
|
||||
stats['queued'] += 1
|
||||
for snapshot in Snapshot.objects.select_related('crawl').order_by('-bookmarked_at').iterator(chunk_size=batch_size):
|
||||
stats['processed'] += 1
|
||||
|
||||
# Skip snapshots with missing crawl references (orphaned by migration errors)
|
||||
if not snapshot.crawl_id:
|
||||
continue
|
||||
|
||||
try:
|
||||
# Reconcile index.json with DB
|
||||
snapshot.reconcile_with_index_json()
|
||||
|
||||
# Clean up invalid field values from old migrations
|
||||
if not isinstance(snapshot.current_step, int):
|
||||
snapshot.current_step = 0
|
||||
|
||||
# Queue for archiving (state machine will handle it)
|
||||
snapshot.status = Snapshot.StatusChoices.QUEUED
|
||||
snapshot.retry_at = timezone.now()
|
||||
snapshot.save()
|
||||
|
||||
stats['reconciled'] += 1
|
||||
stats['queued'] += 1
|
||||
except Exception as e:
|
||||
# Skip snapshots that can't be processed (e.g., missing crawl)
|
||||
print(f" [!] Skipping snapshot {snapshot.id}: {e}")
|
||||
continue
|
||||
|
||||
if stats['processed'] % batch_size == 0:
|
||||
transaction.commit()
|
||||
print(f" [{stats['processed']}/{total}] Processed...")
|
||||
@@ -219,19 +249,33 @@ def process_filtered_snapshots(
|
||||
total = snapshots.count()
|
||||
print(f'[*] Found {total} matching snapshots')
|
||||
|
||||
for snapshot in snapshots.iterator(chunk_size=batch_size):
|
||||
# Reconcile index.json with DB
|
||||
snapshot.reconcile_with_index_json()
|
||||
|
||||
# Queue for archiving
|
||||
snapshot.status = Snapshot.StatusChoices.QUEUED
|
||||
snapshot.retry_at = timezone.now()
|
||||
snapshot.save()
|
||||
|
||||
stats['reconciled'] += 1
|
||||
stats['queued'] += 1
|
||||
for snapshot in snapshots.select_related('crawl').iterator(chunk_size=batch_size):
|
||||
stats['processed'] += 1
|
||||
|
||||
# Skip snapshots with missing crawl references
|
||||
if not snapshot.crawl_id:
|
||||
continue
|
||||
|
||||
try:
|
||||
# Reconcile index.json with DB
|
||||
snapshot.reconcile_with_index_json()
|
||||
|
||||
# Clean up invalid field values from old migrations
|
||||
if not isinstance(snapshot.current_step, int):
|
||||
snapshot.current_step = 0
|
||||
|
||||
# Queue for archiving
|
||||
snapshot.status = Snapshot.StatusChoices.QUEUED
|
||||
snapshot.retry_at = timezone.now()
|
||||
snapshot.save()
|
||||
|
||||
stats['reconciled'] += 1
|
||||
stats['queued'] += 1
|
||||
except Exception as e:
|
||||
# Skip snapshots that can't be processed
|
||||
print(f" [!] Skipping snapshot {snapshot.id}: {e}")
|
||||
continue
|
||||
|
||||
if stats['processed'] % batch_size == 0:
|
||||
transaction.commit()
|
||||
print(f" [{stats['processed']}/{total}] Processed...")
|
||||
|
||||
@@ -143,50 +143,50 @@ def upgrade_from_v072_or_v086(apps, schema_editor):
|
||||
|
||||
url TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
tags TEXT,
|
||||
title TEXT,
|
||||
|
||||
crawl_id TEXT NOT NULL,
|
||||
crawl_id TEXT,
|
||||
depth INTEGER NOT NULL DEFAULT 0,
|
||||
parent_snapshot_id TEXT,
|
||||
|
||||
status VARCHAR(15) NOT NULL DEFAULT 'queued',
|
||||
retry_at DATETIME,
|
||||
current_step VARCHAR(50) NOT NULL DEFAULT '',
|
||||
current_step INTEGER NOT NULL DEFAULT 0,
|
||||
|
||||
fs_version VARCHAR(10) NOT NULL DEFAULT '0.9.0',
|
||||
config TEXT,
|
||||
notes TEXT NOT NULL DEFAULT '',
|
||||
num_uses_succeeded INTEGER NOT NULL DEFAULT 0,
|
||||
num_uses_failed INTEGER NOT NULL DEFAULT 0,
|
||||
num_uses_failed INTEGER NOT NULL DEFAULT 0
|
||||
|
||||
FOREIGN KEY (crawl_id) REFERENCES crawls_crawl(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (parent_snapshot_id) REFERENCES core_snapshot(id) ON DELETE SET NULL
|
||||
-- Note: crawl_id foreign key will be added in 0024 after assigning crawl_ids
|
||||
-- FOREIGN KEY (crawl_id) REFERENCES crawls_crawl(id) ON DELETE CASCADE,
|
||||
-- FOREIGN KEY (parent_snapshot_id) REFERENCES core_snapshot(id) ON DELETE SET NULL
|
||||
)
|
||||
""")
|
||||
|
||||
# Copy snapshot data
|
||||
if has_crawl_id:
|
||||
# v0.8.6rc0 schema
|
||||
# v0.8.6rc0 schema - already has created_at, modified_at, bookmarked_at
|
||||
cursor.execute("""
|
||||
INSERT OR IGNORE INTO core_snapshot_new (
|
||||
id, created_at, modified_at, bookmarked_at, url, timestamp,
|
||||
crawl_id, depth, status, retry_at, config
|
||||
id, created_at, modified_at, bookmarked_at, downloaded_at, url, timestamp,
|
||||
crawl_id, status, retry_at
|
||||
)
|
||||
SELECT
|
||||
id,
|
||||
COALESCE(added, CURRENT_TIMESTAMP),
|
||||
COALESCE(updated, added, CURRENT_TIMESTAMP),
|
||||
COALESCE(added, CURRENT_TIMESTAMP),
|
||||
created_at,
|
||||
modified_at,
|
||||
bookmarked_at,
|
||||
downloaded_at,
|
||||
url, timestamp,
|
||||
crawl_id, COALESCE(depth, 0),
|
||||
NULLIF(crawl_id, ''),
|
||||
COALESCE(status, 'queued'),
|
||||
retry_at,
|
||||
config
|
||||
retry_at
|
||||
FROM core_snapshot
|
||||
""")
|
||||
else:
|
||||
# v0.7.2 schema - will get crawl_id assigned by later migration
|
||||
# v0.7.2 schema - will get crawl_id assigned by later migration (0024)
|
||||
cursor.execute("""
|
||||
INSERT OR IGNORE INTO core_snapshot_new (
|
||||
id, created_at, modified_at, bookmarked_at, url, timestamp, crawl_id
|
||||
@@ -197,7 +197,7 @@ def upgrade_from_v072_or_v086(apps, schema_editor):
|
||||
COALESCE(updated, added, CURRENT_TIMESTAMP),
|
||||
COALESCE(added, CURRENT_TIMESTAMP),
|
||||
url, timestamp,
|
||||
'' as crawl_id
|
||||
NULL as crawl_id
|
||||
FROM core_snapshot
|
||||
""")
|
||||
|
||||
@@ -217,6 +217,13 @@ def upgrade_from_v072_or_v086(apps, schema_editor):
|
||||
# PART 3: Upgrade core_tag table
|
||||
# ============================================================================
|
||||
|
||||
# Check if tag id is INTEGER (v0.7.2) or TEXT (v0.8.6rc0)
|
||||
cursor.execute("""
|
||||
SELECT type FROM pragma_table_info('core_tag') WHERE name='id'
|
||||
""")
|
||||
tag_id_type = cursor.fetchone()[0] if cursor.rowcount else 'INTEGER'
|
||||
tag_id_is_int = 'INT' in tag_id_type.upper()
|
||||
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS core_tag_new (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@@ -231,10 +238,26 @@ def upgrade_from_v072_or_v086(apps, schema_editor):
|
||||
)
|
||||
""")
|
||||
|
||||
cursor.execute("""
|
||||
INSERT OR IGNORE INTO core_tag_new (id, name, slug)
|
||||
SELECT id, name, slug FROM core_tag
|
||||
""")
|
||||
if tag_id_is_int:
|
||||
# v0.7.2: Direct copy (INTEGER to INTEGER)
|
||||
cursor.execute("""
|
||||
INSERT OR IGNORE INTO core_tag_new (id, name, slug)
|
||||
SELECT id, name, slug FROM core_tag
|
||||
""")
|
||||
else:
|
||||
# v0.8.6rc0: Need to remap TEXT ids to new INTEGER ids
|
||||
cursor.execute("SELECT id, name, slug FROM core_tag")
|
||||
old_tags = cursor.fetchall()
|
||||
tag_id_mapping = {} # old_text_id -> new_int_id
|
||||
|
||||
for old_id, name, slug in old_tags:
|
||||
cursor.execute("""
|
||||
INSERT OR IGNORE INTO core_tag_new (name, slug)
|
||||
VALUES (?, ?)
|
||||
""", [name, slug])
|
||||
cursor.execute("SELECT id FROM core_tag_new WHERE slug = ?", [slug])
|
||||
new_id = cursor.fetchone()[0]
|
||||
tag_id_mapping[old_id] = new_id
|
||||
|
||||
cursor.execute("DROP TABLE IF EXISTS core_tag")
|
||||
cursor.execute("ALTER TABLE core_tag_new RENAME TO core_tag")
|
||||
@@ -251,10 +274,23 @@ def upgrade_from_v072_or_v086(apps, schema_editor):
|
||||
)
|
||||
""")
|
||||
|
||||
cursor.execute("""
|
||||
INSERT OR IGNORE INTO core_snapshot_tags_new (snapshot_id, tag_id)
|
||||
SELECT snapshot_id, tag_id FROM core_snapshot_tags
|
||||
""")
|
||||
if tag_id_is_int:
|
||||
# Direct copy for v0.7.2
|
||||
cursor.execute("""
|
||||
INSERT OR IGNORE INTO core_snapshot_tags_new (snapshot_id, tag_id)
|
||||
SELECT snapshot_id, tag_id FROM core_snapshot_tags
|
||||
""")
|
||||
else:
|
||||
# v0.8.6rc0: Use mapping to convert old TEXT ids to new INTEGER ids
|
||||
cursor.execute("SELECT snapshot_id, tag_id FROM core_snapshot_tags")
|
||||
m2m_entries = cursor.fetchall()
|
||||
for snapshot_id, old_tag_id in m2m_entries:
|
||||
new_tag_id = tag_id_mapping.get(old_tag_id)
|
||||
if new_tag_id:
|
||||
cursor.execute("""
|
||||
INSERT OR IGNORE INTO core_snapshot_tags_new (snapshot_id, tag_id)
|
||||
VALUES (?, ?)
|
||||
""", [snapshot_id, new_tag_id])
|
||||
|
||||
cursor.execute("DROP TABLE IF EXISTS core_snapshot_tags")
|
||||
cursor.execute("ALTER TABLE core_snapshot_tags_new RENAME TO core_snapshot_tags")
|
||||
|
||||
@@ -56,7 +56,8 @@ class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('core', '0023_upgrade_to_0_9_0'),
|
||||
('crawls', '0001_initial'),
|
||||
('crawls', '0002_upgrade_to_0_9_0'),
|
||||
('machine', '0001_initial'),
|
||||
('auth', '0012_alter_user_first_name_max_length'),
|
||||
]
|
||||
|
||||
@@ -99,7 +100,18 @@ class Migration(migrations.Migration):
|
||||
FOREIGN KEY (parent_snapshot_id) REFERENCES core_snapshot(id) ON DELETE SET NULL
|
||||
);
|
||||
|
||||
INSERT INTO core_snapshot_final SELECT * FROM core_snapshot;
|
||||
INSERT INTO core_snapshot_final (
|
||||
id, created_at, modified_at, url, timestamp, bookmarked_at,
|
||||
crawl_id, parent_snapshot_id, title, downloaded_at, depth, fs_version,
|
||||
config, notes, num_uses_succeeded, num_uses_failed,
|
||||
status, retry_at, current_step
|
||||
)
|
||||
SELECT
|
||||
id, created_at, modified_at, url, timestamp, bookmarked_at,
|
||||
crawl_id, parent_snapshot_id, title, downloaded_at, depth, fs_version,
|
||||
COALESCE(config, '{}'), COALESCE(notes, ''), num_uses_succeeded, num_uses_failed,
|
||||
status, retry_at, current_step
|
||||
FROM core_snapshot;
|
||||
|
||||
DROP TABLE core_snapshot;
|
||||
ALTER TABLE core_snapshot_final RENAME TO core_snapshot;
|
||||
|
||||
Reference in New Issue
Block a user