Files
ArchiveBox/old/TODO_fs_migrations.md
Nick Sweeting f0aa19fa7d wip
2025-12-28 17:51:54 -08:00

37 KiB

Lazy Filesystem Migration System - Implementation TODO

Architecture Decision: DB as Single Source of Truth

Key Principle: Only archivebox update scans the filesystem (for migration/import). All other commands query the database exclusively.

  • archivebox status - Query DB only (count by status field)
  • archivebox search - Query DB only (filter by URL/tags/etc)
  • archivebox remove - Query DB + delete directories
  • ⚠️ archivebox update - ONLY command that scans filesystem (for orphan import + migration)
  • archivebox init - Simplified: just apply migrations, no folder scanning

Status: What Already Exists

Core Migration Infrastructure (in archivebox/core/models.py)

Lines 348-367: Migration on save() with transaction wrapper

  • Automatically detects if fs_migration_needed
  • Walks migration chain: 0.7.0 → 0.8.0 → 0.9.0
  • Calls _fs_migrate_from_X_to_Y() methods
  • Updates fs_version field within transaction

Lines 393-419: Migration helper methods

  • _fs_current_version() - Gets current ArchiveBox version (normalizes to x.x.0)
  • fs_migration_needed property - Checks if migration needed
  • _fs_next_version() - Returns next version in chain
  • _fs_migrate_from_0_7_0_to_0_8_0() - No-op (same layout)
  • _fs_migrate_from_0_8_0_to_0_9_0() - Placeholder (currently no-op at line 427) ← NEEDS IMPLEMENTATION

Lines 540-542: output_dir property

  • Currently: return str(CONSTANTS.ARCHIVE_DIR / self.timestamp)
  • Needs: Check fs_version, handle symlinks for backwards compat

Line 311: fs_version field

  • CharField tracking filesystem version per snapshot
  • Default is current ArchiveBox version

Lines 266-267: Timestamp uniqueness logic EXISTS

while self.filter(timestamp=timestamp).exists():
    timestamp = str(float(timestamp) + 1.0)

Already implemented in create_or_update_from_dict() at line 241!

Lines 120-133: SnapshotQuerySet with filter_by_patterns()

  • Already supports filtering by exact/substring/regex/domain/tag/timestamp

archivebox/misc/jsonl.py:

  • Line 252: get_or_create_snapshot() - Creates snapshot from JSONL record
  • Line 281: Uses Snapshot.objects.create_or_update_from_dict() internally

Current archivebox update Implementation (archivebox/cli/archivebox_update.py)

Lines 36-102:

  • Filters snapshots from DB using filter_by_patterns()
  • Applies before/after timestamp filters
  • Queues snapshots via status update
  • Starts Orchestrator to process queued snapshots

Current behavior:

  • Only queries DB, never scans filesystem ← NEEDS TO BE FIXED
  • No orphan detection ← NEEDS TO BE ADDED
  • No reconciliation ← NEEDS TO BE ADDED
  • No migration triggering ← save() does this automatically

What Needs Implementation

Phase 1: Add Methods to Snapshot Model

File: archivebox/core/models.py

Add these methods after the existing migration methods (around line 457):

# =========================================================================
# Path Calculation and Migration Helpers
# =========================================================================

@staticmethod
def extract_domain_from_url(url: str) -> str:
    """
    Extract domain from URL for 0.9.x path structure.
    Uses full hostname with sanitized special chars.

    Examples:
        https://example.com:8080 → example.com_8080
        https://sub.example.com → sub.example.com
        file:///path → localhost
        data:text/html → data
    """
    from urllib.parse import urlparse

    try:
        parsed = urlparse(url)

        if parsed.scheme in ('http', 'https'):
            if parsed.port:
                return f"{parsed.hostname}_{parsed.port}".replace(':', '_')
            return parsed.hostname or 'unknown'
        elif parsed.scheme == 'file':
            return 'localhost'
        elif parsed.scheme:
            return parsed.scheme
        else:
            return 'unknown'
    except Exception:
        return 'unknown'

def get_storage_path_for_version(self, version: str) -> Path:
    """
    Calculate storage path for specific filesystem version.
    Centralizes path logic so it's reusable.

    0.7.x/0.8.x: archive/{timestamp}
    0.9.x: users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/
    """
    from datetime import datetime

    if version in ('0.7.0', '0.8.0'):
        return CONSTANTS.ARCHIVE_DIR / self.timestamp

    elif version in ('0.9.0', '1.0.0'):
        username = self.created_by.username if self.created_by else 'unknown'

        # Use created_at for date grouping (fallback to timestamp)
        if self.created_at:
            date_str = self.created_at.strftime('%Y%m%d')
        else:
            date_str = datetime.fromtimestamp(float(self.timestamp)).strftime('%Y%m%d')

        domain = self.extract_domain_from_url(self.url)

        return (
            CONSTANTS.DATA_DIR / 'users' / username / 'snapshots' /
            date_str / domain / str(self.id)
        )
    else:
        # Unknown version - use current
        return self.get_storage_path_for_version(self._fs_current_version())

# =========================================================================
# Loading and Creation from Filesystem (Used by archivebox update ONLY)
# =========================================================================

@classmethod
def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
    """
    Load existing Snapshot from DB by reading index.json.

    Reads index.json, extracts url+timestamp, queries DB.
    Returns existing Snapshot or None if not found/invalid.
    Does NOT create new snapshots.

    ONLY used by: archivebox update (for orphan detection)
    """
    import json

    index_path = snapshot_dir / 'index.json'
    if not index_path.exists():
        return None

    try:
        with open(index_path) as f:
            data = json.load(f)
    except:
        return None

    url = data.get('url')
    if not url:
        return None

    # Get timestamp - prefer index.json, fallback to folder name
    timestamp = cls._select_best_timestamp(
        index_timestamp=data.get('timestamp'),
        folder_name=snapshot_dir.name
    )

    if not timestamp:
        return None

    # Look up existing
    try:
        return cls.objects.get(url=url, timestamp=timestamp)
    except cls.DoesNotExist:
        return None
    except cls.MultipleObjectsReturned:
        # Should not happen with unique constraint
        return cls.objects.filter(url=url, timestamp=timestamp).first()

@classmethod
def create_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
    """
    Create new Snapshot from orphaned directory.

    Validates timestamp, ensures uniqueness.
    Returns new UNSAVED Snapshot or None if invalid.

    ONLY used by: archivebox update (for orphan import)
    """
    import json
    from archivebox.base_models.models import get_or_create_system_user_pk

    index_path = snapshot_dir / 'index.json'
    if not index_path.exists():
        return None

    try:
        with open(index_path) as f:
            data = json.load(f)
    except:
        return None

    url = data.get('url')
    if not url:
        return None

    # Get and validate timestamp
    timestamp = cls._select_best_timestamp(
        index_timestamp=data.get('timestamp'),
        folder_name=snapshot_dir.name
    )

    if not timestamp:
        return None

    # Ensure uniqueness (reuses existing logic from create_or_update_from_dict)
    timestamp = cls._ensure_unique_timestamp(url, timestamp)

    # Detect version
    fs_version = cls._detect_fs_version_from_index(data)

    return cls(
        url=url,
        timestamp=timestamp,
        title=data.get('title', ''),
        fs_version=fs_version,
        created_by_id=get_or_create_system_user_pk(),
    )

@staticmethod
def _select_best_timestamp(index_timestamp: str, folder_name: str) -> Optional[str]:
    """
    Select best timestamp from index.json vs folder name.

    Validates range (1995-2035).
    Prefers index.json if valid.
    """
    def is_valid_timestamp(ts):
        try:
            ts_int = int(float(ts))
            # 1995-01-01 to 2035-12-31
            return 788918400 <= ts_int <= 2082758400
        except:
            return False

    index_valid = is_valid_timestamp(index_timestamp) if index_timestamp else False
    folder_valid = is_valid_timestamp(folder_name)

    if index_valid:
        return str(int(float(index_timestamp)))
    elif folder_valid:
        return str(int(float(folder_name)))
    else:
        return None

@classmethod
def _ensure_unique_timestamp(cls, url: str, timestamp: str) -> str:
    """
    Ensure timestamp is globally unique.
    If collision with different URL, increment by 1 until unique.

    NOTE: Logic already exists in create_or_update_from_dict (line 266-267)
    This is just an extracted, reusable version.
    """
    while cls.objects.filter(timestamp=timestamp).exclude(url=url).exists():
        timestamp = str(int(float(timestamp)) + 1)
    return timestamp

@staticmethod
def _detect_fs_version_from_index(data: dict) -> str:
    """
    Detect fs_version from index.json structure.

    - Has fs_version field: use it
    - Has history dict: 0.7.0
    - Has archive_results list: 0.8.0
    - Default: 0.7.0
    """
    if 'fs_version' in data:
        return data['fs_version']
    if 'history' in data and 'archive_results' not in data:
        return '0.7.0'
    if 'archive_results' in data:
        return '0.8.0'
    return '0.7.0'

# =========================================================================
# Index.json Reconciliation
# =========================================================================

def reconcile_with_index_json(self):
    """
    Merge index.json with DB. DB is source of truth.

    - Title: longest non-URL
    - Tags: union
    - ArchiveResults: keep both (by extractor+start_ts)

    Writes back in 0.9.x format.

    Used by: archivebox update (to sync index.json with DB)
    """
    import json

    index_path = Path(self.output_dir) / 'index.json'

    index_data = {}
    if index_path.exists():
        try:
            with open(index_path) as f:
                index_data = json.load(f)
        except:
            pass

    # Merge title
    self._merge_title_from_index(index_data)

    # Merge tags
    self._merge_tags_from_index(index_data)

    # Merge ArchiveResults
    self._merge_archive_results_from_index(index_data)

    # Write back
    self.write_index_json()

def _merge_title_from_index(self, index_data: dict):
    """Merge title - prefer longest non-URL title."""
    index_title = index_data.get('title', '').strip()
    db_title = self.title or ''

    candidates = [t for t in [index_title, db_title] if t and t != self.url]
    if candidates:
        best_title = max(candidates, key=len)
        if self.title != best_title:
            self.title = best_title

def _merge_tags_from_index(self, index_data: dict):
    """Merge tags - union of both sources."""
    from django.db import transaction

    index_tags = set(index_data.get('tags', '').split(',')) if index_data.get('tags') else set()
    index_tags = {t.strip() for t in index_tags if t.strip()}

    db_tags = set(self.tags.values_list('name', flat=True))

    new_tags = index_tags - db_tags
    if new_tags:
        with transaction.atomic():
            for tag_name in new_tags:
                tag, _ = Tag.objects.get_or_create(name=tag_name)
                self.tags.add(tag)

def _merge_archive_results_from_index(self, index_data: dict):
    """Merge ArchiveResults - keep both (by extractor+start_ts)."""
    existing = {
        (ar.extractor, ar.start_ts): ar
        for ar in ArchiveResult.objects.filter(snapshot=self)
    }

    # Handle 0.8.x format (archive_results list)
    for result_data in index_data.get('archive_results', []):
        self._create_archive_result_if_missing(result_data, existing)

    # Handle 0.7.x format (history dict)
    if 'history' in index_data and isinstance(index_data['history'], dict):
        for extractor, result_list in index_data['history'].items():
            if isinstance(result_list, list):
                for result_data in result_list:
                    result_data['extractor'] = extractor
                    self._create_archive_result_if_missing(result_data, existing)

def _create_archive_result_if_missing(self, result_data: dict, existing: dict):
    """Create ArchiveResult if not already in DB."""
    from dateutil import parser
    import json

    extractor = result_data.get('extractor', '')
    if not extractor:
        return

    start_ts = None
    if result_data.get('start_ts'):
        try:
            start_ts = parser.parse(result_data['start_ts'])
        except:
            pass

    if (extractor, start_ts) in existing:
        return

    try:
        end_ts = None
        if result_data.get('end_ts'):
            try:
                end_ts = parser.parse(result_data['end_ts'])
            except:
                pass

        ArchiveResult.objects.create(
            snapshot=self,
            extractor=extractor,
            status=result_data.get('status', 'failed'),
            output_str=result_data.get('output', ''),
            cmd=result_data.get('cmd', []),
            pwd=result_data.get('pwd', str(self.output_dir)),
            start_ts=start_ts,
            end_ts=end_ts,
            created_by=self.created_by,
        )
    except:
        pass

def write_index_json(self):
    """Write index.json in 0.9.x format."""
    import json

    index_path = Path(self.output_dir) / 'index.json'

    data = {
        'url': self.url,
        'timestamp': self.timestamp,
        'title': self.title or '',
        'tags': ','.join(sorted(self.tags.values_list('name', flat=True))),
        'fs_version': self.fs_version,
        'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
        'created_at': self.created_at.isoformat() if self.created_at else None,
        'archive_results': [
            {
                'extractor': ar.extractor,
                'status': ar.status,
                'start_ts': ar.start_ts.isoformat() if ar.start_ts else None,
                'end_ts': ar.end_ts.isoformat() if ar.end_ts else None,
                'output': ar.output_str or '',
                'cmd': ar.cmd if isinstance(ar.cmd, list) else [],
                'pwd': ar.pwd,
            }
            for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts')
        ],
    }

    index_path.parent.mkdir(parents=True, exist_ok=True)
    with open(index_path, 'w') as f:
        json.dump(data, f, indent=2, sort_keys=True)

# =========================================================================
# Snapshot Utilities
# =========================================================================

@staticmethod
def move_directory_to_invalid(snapshot_dir: Path):
    """
    Move invalid directory to data/invalid/YYYYMMDD/.

    Used by: archivebox update (when encountering invalid directories)
    """
    from datetime import datetime
    import shutil

    invalid_dir = CONSTANTS.DATA_DIR / 'invalid' / datetime.now().strftime('%Y%m%d')
    invalid_dir.mkdir(parents=True, exist_ok=True)

    dest = invalid_dir / snapshot_dir.name
    counter = 1
    while dest.exists():
        dest = invalid_dir / f"{snapshot_dir.name}_{counter}"
        counter += 1

    try:
        shutil.move(str(snapshot_dir), str(dest))
    except:
        pass

@classmethod
def find_and_merge_duplicates(cls) -> int:
    """
    Find and merge snapshots with same url:timestamp.
    Returns count of duplicate sets merged.

    Used by: archivebox update (Phase 3: deduplication)
    """
    from django.db.models import Count

    duplicates = (
        cls.objects
        .values('url', 'timestamp')
        .annotate(count=Count('id'))
        .filter(count__gt=1)
    )

    merged = 0
    for dup in duplicates.iterator():
        snapshots = list(
            cls.objects
            .filter(url=dup['url'], timestamp=dup['timestamp'])
            .order_by('created_at')  # Keep oldest
        )

        if len(snapshots) > 1:
            try:
                cls._merge_snapshots(snapshots)
                merged += 1
            except:
                pass

    return merged

@classmethod
def _merge_snapshots(cls, snapshots: list['Snapshot']):
    """
    Merge exact duplicates.
    Keep oldest, union files + ArchiveResults.
    """
    import shutil

    keeper = snapshots[0]
    duplicates = snapshots[1:]

    keeper_dir = Path(keeper.output_dir)

    for dup in duplicates:
        dup_dir = Path(dup.output_dir)

        # Merge files
        if dup_dir.exists() and dup_dir != keeper_dir:
            for dup_file in dup_dir.rglob('*'):
                if not dup_file.is_file():
                    continue

                rel = dup_file.relative_to(dup_dir)
                keeper_file = keeper_dir / rel

                if not keeper_file.exists():
                    keeper_file.parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(dup_file, keeper_file)

            try:
                shutil.rmtree(dup_dir)
            except:
                pass

        # Merge tags
        for tag in dup.tags.all():
            keeper.tags.add(tag)

        # Move ArchiveResults
        ArchiveResult.objects.filter(snapshot=dup).update(snapshot=keeper)

        # Delete
        dup.delete()

Phase 2: Update output_dir Property

File: archivebox/core/models.py line 540

Replace current implementation:

@cached_property
def output_dir(self):
    """The filesystem path to the snapshot's output directory."""
    import os

    current_path = self.get_storage_path_for_version(self.fs_version)

    if current_path.exists():
        return str(current_path)

    # Check for backwards-compat symlink
    old_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
    if old_path.is_symlink():
        return str(Path(os.readlink(old_path)).resolve())
    elif old_path.exists():
        return str(old_path)

    return str(current_path)

Phase 3: Implement Real Migration

File: archivebox/core/models.py line 427

Replace the placeholder _fs_migrate_from_0_8_0_to_0_9_0():

def _fs_migrate_from_0_8_0_to_0_9_0(self):
    """
    Migrate from flat to nested structure.

    0.8.x: archive/{timestamp}/
    0.9.x: users/{user}/snapshots/YYYYMMDD/{domain}/{uuid}/

    Transaction handling:
    1. Copy files INSIDE transaction
    2. Create symlink INSIDE transaction
    3. Update fs_version INSIDE transaction (done by save())
    4. Exit transaction (DB commit)
    5. Delete old files OUTSIDE transaction (after commit)
    """
    import shutil
    from django.db import transaction

    old_dir = self.get_storage_path_for_version('0.8.0')
    new_dir = self.get_storage_path_for_version('0.9.0')

    if not old_dir.exists() or old_dir == new_dir or new_dir.exists():
        return

    new_dir.mkdir(parents=True, exist_ok=True)

    # Copy all files (idempotent)
    for old_file in old_dir.rglob('*'):
        if not old_file.is_file():
            continue

        rel_path = old_file.relative_to(old_dir)
        new_file = new_dir / rel_path

        # Skip if already copied
        if new_file.exists() and new_file.stat().st_size == old_file.stat().st_size:
            continue

        new_file.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy2(old_file, new_file)

    # Verify all copied
    old_files = {f.relative_to(old_dir): f.stat().st_size
                 for f in old_dir.rglob('*') if f.is_file()}
    new_files = {f.relative_to(new_dir): f.stat().st_size
                 for f in new_dir.rglob('*') if f.is_file()}

    if old_files.keys() != new_files.keys():
        missing = old_files.keys() - new_files.keys()
        raise Exception(f"Migration incomplete: missing {missing}")

    # Create backwards-compat symlink (INSIDE transaction)
    symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
    if symlink_path.is_symlink():
        symlink_path.unlink()

    if not symlink_path.exists() or symlink_path == old_dir:
        symlink_path.symlink_to(new_dir, target_is_directory=True)

    # Schedule old directory deletion AFTER transaction commits
    transaction.on_commit(lambda: self._cleanup_old_migration_dir(old_dir))

def _cleanup_old_migration_dir(self, old_dir: Path):
    """
    Delete old directory after successful migration.
    Called via transaction.on_commit() after DB commit succeeds.
    """
    import shutil
    import logging

    if old_dir.exists() and not old_dir.is_symlink():
        try:
            shutil.rmtree(old_dir)
        except Exception as e:
            # Log but don't raise - migration succeeded, this is just cleanup
            logging.getLogger('archivebox.migration').warning(
                f"Could not remove old migration directory {old_dir}: {e}"
            )

Phase 4: Add Timestamp Uniqueness Constraint

File: archivebox/core/models.py - Add to Snapshot.Meta class (around line 330):

class Meta(TypedModelMeta):
    verbose_name = "Snapshot"
    verbose_name_plural = "Snapshots"
    constraints = [
        # Allow same URL in different crawls, but not duplicates within same crawl
        models.UniqueConstraint(fields=['url', 'crawl'], name='unique_url_per_crawl'),
        # Global timestamp uniqueness for 1:1 symlink mapping
        models.UniqueConstraint(fields=['timestamp'], name='unique_timestamp'),
    ]

Then create migration:

python -m archivebox manage makemigrations core

Phase 5: Rewrite archivebox update

File: archivebox/cli/archivebox_update.py

Replace entire file:

#!/usr/bin/env python3

__package__ = 'archivebox.cli'

import os
import time
import rich_click as click

from typing import Iterable
from pathlib import Path

from archivebox.misc.util import enforce_types, docstring


@enforce_types
def update(filter_patterns: Iterable[str] = (),
          filter_type: str = 'exact',
          before: float | None = None,
          after: float | None = None,
          resume: str | None = None,
          batch_size: int = 100,
          continuous: bool = False) -> None:
    """
    Update snapshots: import orphans, reconcile, and re-run failed extractors.

    Two-phase operation:
    - Phase 1: Scan archive/ for orphaned snapshots (skip symlinks)
    - Phase 2: Process all DB snapshots (reconcile + re-queue for archiving)
    - Phase 3: Deduplicate exact duplicates

    With filters: Only phase 2 (DB query), no filesystem scan.
    Without filters: All phases (full update).
    """

    from rich import print
    from archivebox.config.django import setup_django
    setup_django()

    from archivebox.core.models import Snapshot
    from django.utils import timezone

    while True:
        if filter_patterns or before or after:
            # Filtered mode: query DB only
            print('[*] Processing filtered snapshots from database...')
            stats = process_filtered_snapshots(
                filter_patterns=filter_patterns,
                filter_type=filter_type,
                before=before,
                after=after,
                batch_size=batch_size
            )
            print_stats(stats)
        else:
            # Full mode: import orphans + process DB + deduplicate
            stats_combined = {'phase1': {}, 'phase2': {}, 'deduplicated': 0}

            print('[*] Phase 1: Scanning archive/ for orphaned snapshots...')
            stats_combined['phase1'] = import_orphans_from_archive(
                resume_from=resume,
                batch_size=batch_size
            )

            print('[*] Phase 2: Processing all database snapshots...')
            stats_combined['phase2'] = process_all_db_snapshots(batch_size=batch_size)

            print('[*] Phase 3: Deduplicating...')
            stats_combined['deduplicated'] = Snapshot.find_and_merge_duplicates()

            print_combined_stats(stats_combined)

        if not continuous:
            break

        print('[yellow]Sleeping 60s before next pass...[/yellow]')
        time.sleep(60)
        resume = None


def import_orphans_from_archive(resume_from: str = None, batch_size: int = 100) -> dict:
    """
    Scan archive/ for orphaned snapshots.
    Skip symlinks (already migrated).
    Create DB records and trigger migration on save().
    """
    from archivebox.core.models import Snapshot
    from archivebox.config import CONSTANTS
    from django.db import transaction

    stats = {'processed': 0, 'imported': 0, 'migrated': 0, 'invalid': 0}

    archive_dir = CONSTANTS.ARCHIVE_DIR
    if not archive_dir.exists():
        return stats

    print('[*] Scanning and sorting by modification time...')

    # Scan and sort by mtime (newest first)
    # Loading (mtime, path) tuples is fine even for millions (~100MB for 1M entries)
    entries = [
        (e.stat().st_mtime, e.path)
        for e in os.scandir(archive_dir)
        if e.is_dir(follow_symlinks=False)  # Skip symlinks
    ]
    entries.sort(reverse=True)  # Newest first
    print(f'[*] Found {len(entries)} directories to check')

    for mtime, entry_path in entries:
        entry_path = Path(entry_path)

        # Resume from timestamp if specified
        if resume_from and entry_path.name < resume_from:
            continue

        stats['processed'] += 1

        # Check if already in DB
        snapshot = Snapshot.load_from_directory(entry_path)
        if snapshot:
            continue  # Already in DB, skip

        # Not in DB - create orphaned snapshot
        snapshot = Snapshot.create_from_directory(entry_path)
        if not snapshot:
            # Invalid directory
            Snapshot.move_directory_to_invalid(entry_path)
            stats['invalid'] += 1
            print(f"    [{stats['processed']}] Invalid: {entry_path.name}")
            continue

        needs_migration = snapshot.fs_migration_needed

        snapshot.save()  # Creates DB record + triggers migration

        stats['imported'] += 1
        if needs_migration:
            stats['migrated'] += 1
            print(f"    [{stats['processed']}] Imported + migrated: {entry_path.name}")
        else:
            print(f"    [{stats['processed']}] Imported: {entry_path.name}")

        if stats['processed'] % batch_size == 0:
            transaction.commit()

    transaction.commit()
    return stats


def process_all_db_snapshots(batch_size: int = 100) -> dict:
    """
    Process all snapshots in DB.
    Reconcile index.json and queue for archiving.
    """
    from archivebox.core.models import Snapshot
    from django.db import transaction
    from django.utils import timezone

    stats = {'processed': 0, 'reconciled': 0, 'queued': 0}

    total = Snapshot.objects.count()
    print(f'[*] Processing {total} snapshots from database...')

    for snapshot in Snapshot.objects.iterator():
        # Reconcile index.json with DB
        snapshot.reconcile_with_index_json()

        # Queue for archiving (state machine will handle it)
        snapshot.status = Snapshot.StatusChoices.QUEUED
        snapshot.retry_at = timezone.now()
        snapshot.save()

        stats['reconciled'] += 1
        stats['queued'] += 1
        stats['processed'] += 1

        if stats['processed'] % batch_size == 0:
            transaction.commit()
            print(f"    [{stats['processed']}/{total}] Processed...")

    transaction.commit()
    return stats


def process_filtered_snapshots(
    filter_patterns: Iterable[str],
    filter_type: str,
    before: float | None,
    after: float | None,
    batch_size: int
) -> dict:
    """Process snapshots matching filters (DB query only)."""
    from archivebox.core.models import Snapshot
    from django.db import transaction
    from django.utils import timezone
    from datetime import datetime

    stats = {'processed': 0, 'reconciled': 0, 'queued': 0}

    snapshots = Snapshot.objects.all()

    if filter_patterns:
        snapshots = Snapshot.objects.filter_by_patterns(list(filter_patterns), filter_type)

    if before:
        snapshots = snapshots.filter(bookmarked_at__lt=datetime.fromtimestamp(before))
    if after:
        snapshots = snapshots.filter(bookmarked_at__gt=datetime.fromtimestamp(after))

    total = snapshots.count()
    print(f'[*] Found {total} matching snapshots')

    for snapshot in snapshots.iterator():
        # Reconcile index.json with DB
        snapshot.reconcile_with_index_json()

        # Queue for archiving
        snapshot.status = Snapshot.StatusChoices.QUEUED
        snapshot.retry_at = timezone.now()
        snapshot.save()

        stats['reconciled'] += 1
        stats['queued'] += 1
        stats['processed'] += 1

        if stats['processed'] % batch_size == 0:
            transaction.commit()
            print(f"    [{stats['processed']}/{total}] Processed...")

    transaction.commit()
    return stats


def print_stats(stats: dict):
    """Print statistics for filtered mode."""
    from rich import print

    print(f"""
[green]Update Complete[/green]
  Processed:   {stats['processed']}
  Reconciled:  {stats['reconciled']}
  Queued:      {stats['queued']}
""")


def print_combined_stats(stats_combined: dict):
    """Print statistics for full mode."""
    from rich import print

    s1 = stats_combined['phase1']
    s2 = stats_combined['phase2']

    print(f"""
[green]Archive Update Complete[/green]

Phase 1 (Import Orphans):
  Checked:     {s1.get('processed', 0)}
  Imported:    {s1.get('imported', 0)}
  Migrated:    {s1.get('migrated', 0)}
  Invalid:     {s1.get('invalid', 0)}

Phase 2 (Process DB):
  Processed:   {s2.get('processed', 0)}
  Reconciled:  {s2.get('reconciled', 0)}
  Queued:      {s2.get('queued', 0)}

Phase 3 (Deduplicate):
  Merged:      {stats_combined['deduplicated']}
""")


@click.command()
@click.option('--resume', type=str, help='Resume from timestamp')
@click.option('--before', type=float, help='Only snapshots before timestamp')
@click.option('--after', type=float, help='Only snapshots after timestamp')
@click.option('--filter-type', '-t', type=click.Choice(['exact', 'substring', 'regex', 'domain', 'tag', 'timestamp']), default='exact')
@click.option('--batch-size', type=int, default=100, help='Commit every N snapshots')
@click.option('--continuous', is_flag=True, help='Run continuously as background worker')
@click.argument('filter_patterns', nargs=-1)
@docstring(update.__doc__)
def main(**kwargs):
    update(**kwargs)


if __name__ == '__main__':
    main()

Phase 6: Simplify archivebox init

File: archivebox/cli/archivebox_init.py

Remove lines 24, 113-150 (folder status function usage):

# DELETE line 24:
from archivebox.misc.folders import fix_invalid_folder_locations, get_invalid_folders

# DELETE lines 113-150 (folder scanning logic):
# Replace with simple message:
print('    > Run "archivebox update" to import any orphaned snapshot directories')

Simplified logic:

  • Create directory structure
  • Apply migrations
  • Don't scan for orphans (let archivebox update handle it)

File: archivebox/cli/archivebox_search.py

Remove lines 65-96 (all folder status imports and list_folders() function):

# DELETE lines 65-96
# DELETE STATUS_CHOICES with 'valid', 'invalid', 'orphaned', 'corrupted', 'unrecognized'

# Keep only: 'indexed', 'archived', 'unarchived'
STATUS_CHOICES = ['indexed', 'archived', 'unarchived']

Update search() function to query DB directly:

@enforce_types
def search(filter_patterns: list[str] | None=None,
           filter_type: str='substring',
           status: str='indexed',
           before: float | None=None,
           after: float | None=None,
           sort: str | None=None,
           json: bool=False,
           html: bool=False,
           csv: str | None=None,
           with_headers: bool=False):
    """List, filter, and export information about archive entries"""

    from archivebox.core.models import Snapshot

    if with_headers and not (json or html or csv):
        stderr('[X] --with-headers requires --json, --html or --csv\n', color='red')
        raise SystemExit(2)

    # Query DB directly
    snapshots = Snapshot.objects.all()

    if filter_patterns:
        snapshots = Snapshot.objects.filter_by_patterns(list(filter_patterns), filter_type)

    if status == 'archived':
        snapshots = snapshots.filter(downloaded_at__isnull=False)
    elif status == 'unarchived':
        snapshots = snapshots.filter(downloaded_at__isnull=True)
    # 'indexed' = all snapshots (no filter)

    if before:
        from datetime import datetime
        snapshots = snapshots.filter(bookmarked_at__lt=datetime.fromtimestamp(before))
    if after:
        from datetime import datetime
        snapshots = snapshots.filter(bookmarked_at__gt=datetime.fromtimestamp(after))

    if sort:
        snapshots = snapshots.order_by(sort)

    # Export to requested format
    if json:
        output = snapshots.to_json(with_headers=with_headers)
    elif html:
        output = snapshots.to_html(with_headers=with_headers)
    elif csv:
        output = snapshots.to_csv(cols=csv.split(','), header=with_headers)
    else:
        from archivebox.misc.logging_util import printable_folders
        # Convert to dict for printable_folders
        folders = {s.output_dir: s for s in snapshots}
        output = printable_folders(folders, with_headers)

    print(output)
    return output

Phase 8: Delete Folder Status Functions

File: archivebox/misc/folders.py

Delete lines 23-186 (all status checking functions):

# DELETE these functions entirely:
# - _is_valid_snapshot()
# - _is_corrupt_snapshot()
# - get_indexed_folders()
# - get_archived_folders()
# - get_unarchived_folders()
# - get_present_folders()
# - get_valid_folders()
# - get_invalid_folders()
# - get_duplicate_folders()
# - get_orphaned_folders()
# - get_corrupted_folders()
# - get_unrecognized_folders()

Keep only fix_invalid_folder_locations() (used by archivebox init for one-time cleanup):

"""
Folder utilities for ArchiveBox.

Note: This file only contains legacy cleanup utilities.
The DB is the single source of truth - use Snapshot.objects queries for all status checks.
"""

__package__ = 'archivebox.misc'

import os
import json
import shutil
from pathlib import Path
from typing import Tuple, List

from archivebox.config import DATA_DIR, CONSTANTS
from archivebox.misc.util import enforce_types


@enforce_types
def fix_invalid_folder_locations(out_dir: Path = DATA_DIR) -> Tuple[List[str], List[str]]:
    """
    Legacy cleanup: Move folders to their correct timestamp-named locations based on index.json.

    This is only used during 'archivebox init' for one-time cleanup of misnamed directories.
    After this runs once, 'archivebox update' handles all filesystem operations.
    """
    fixed = []
    cant_fix = []
    for entry in os.scandir(out_dir / CONSTANTS.ARCHIVE_DIR_NAME):
        if entry.is_dir(follow_symlinks=True):
            index_path = Path(entry.path) / 'index.json'
            if index_path.exists():
                try:
                    with open(index_path, 'r') as f:
                        data = json.load(f)
                    timestamp = data.get('timestamp')
                    url = data.get('url')
                except Exception:
                    continue

                if not timestamp:
                    continue

                if not entry.path.endswith(f'/{timestamp}'):
                    dest = out_dir / CONSTANTS.ARCHIVE_DIR_NAME / timestamp
                    if dest.exists():
                        cant_fix.append(entry.path)
                    else:
                        shutil.move(entry.path, str(dest))
                        fixed.append(str(dest))
    return fixed, cant_fix

Testing Plan

  1. Test migration idempotency:

    # Interrupt migration mid-way
    # Re-run - should resume seamlessly
    
  2. Test orphan import:

    # Create orphaned directory manually
    # Run archivebox update
    # Verify imported and migrated
    
  3. Test deduplication:

    # Create two snapshots with same url:timestamp
    # Run archivebox update
    # Verify merged
    
  4. Test timestamp uniqueness:

    # Try to create snapshots with colliding timestamps
    # Verify auto-increment
    
  5. Test filtered update:

    archivebox update --after 1234567890
    # Should only process DB, no filesystem scan
    
  6. Test continuous mode:

    archivebox update --continuous
    # Should run in loop, prioritize newest entries
    
  7. Test DB-only commands:

    archivebox search --status archived
    archivebox search example.com --filter-type substring
    archivebox remove example.com
    # All should query DB only, no filesystem scanning
    

Implementation Checklist

  • Add all new methods to Snapshot model (Phase 1)
  • Update output_dir property (Phase 2)
  • Implement real _fs_migrate_from_0_8_0_to_0_9_0() (Phase 3)
  • Add _cleanup_old_migration_dir() helper (Phase 3)
  • Add timestamp uniqueness constraint (Phase 4)
  • Create database migration for constraint (Phase 4) - Created: 0032_alter_archiveresult_binary_and_more.py
  • Rewrite archivebox/cli/archivebox_update.py (Phase 5)
  • Simplify archivebox/cli/archivebox_init.py (Phase 6)
  • Simplify archivebox/cli/archivebox_search.py (Phase 7)
  • Delete folder status functions from archivebox/misc/folders.py (Phase 8)
  • Update migration tests (test_migrations_08_to_09.py)
  • Update update command tests (tests/test_update.py)
  • Run tests to verify implementation
  • Test migration on real 0.8.x collection
  • Test orphan import in production
  • Test deduplication in production
  • Test filtered vs full mode in production
  • Test continuous mode in production