diff --git a/archivebox/misc/folders.py b/archivebox/misc/folders.py index dd134dc1..ae364f04 100644 --- a/archivebox/misc/folders.py +++ b/archivebox/misc/folders.py @@ -1,8 +1,5 @@ """ -Folder utilities for ArchiveBox. - -Note: This file only contains legacy cleanup utilities. -The DB is the single source of truth - use Snapshot.objects queries for all status checks. +Folder status and integrity checking utilities for ArchiveBox. """ __package__ = 'archivebox.misc' @@ -11,11 +8,197 @@ import os import json import shutil from pathlib import Path -from typing import Tuple, List +from itertools import chain +from typing import Dict, Optional, List, Tuple, TYPE_CHECKING + +from django.db.models import QuerySet from archivebox.config import DATA_DIR, CONSTANTS from archivebox.misc.util import enforce_types +if TYPE_CHECKING: + from core.models import Snapshot + + +def _is_valid_snapshot(snapshot: 'Snapshot') -> bool: + """Check if a snapshot's data directory is valid""" + dir_exists = Path(snapshot.output_dir).exists() + index_exists = (Path(snapshot.output_dir) / "index.json").exists() + if not dir_exists: + return False + if dir_exists and not index_exists: + return False + if dir_exists and index_exists: + try: + with open(Path(snapshot.output_dir) / "index.json", 'r') as f: + data = json.load(f) + return snapshot.url == data.get('url') + except Exception: + pass + return False + + +def _is_corrupt_snapshot(snapshot: 'Snapshot') -> bool: + """Check if a snapshot's data directory is corrupted""" + if not Path(snapshot.output_dir).exists(): + return False + return not _is_valid_snapshot(snapshot) + + +def get_indexed_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, 'Snapshot']: + """indexed snapshots without checking archive status or data directory validity""" + return { + snapshot.output_dir: snapshot + for snapshot in snapshots.iterator(chunk_size=500) + } + + +def get_archived_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, 'Snapshot']: + """indexed snapshots that are archived with a valid data directory""" + return { + snapshot.output_dir: snapshot + for snapshot in snapshots.iterator(chunk_size=500) + if snapshot.is_archived + } + + +def get_unarchived_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, 'Snapshot']: + """indexed snapshots that are unarchived with no data directory or an empty data directory""" + return { + snapshot.output_dir: snapshot + for snapshot in snapshots.iterator(chunk_size=500) + if not snapshot.is_archived + } + + +def get_present_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, Optional['Snapshot']]: + """dirs that actually exist in the archive/ folder""" + from core.models import Snapshot + + all_folders = {} + archive_dir = out_dir / CONSTANTS.ARCHIVE_DIR_NAME + if not archive_dir.exists(): + return all_folders + for entry in archive_dir.iterdir(): + if entry.is_dir(): + snapshot = None + try: + snapshot = Snapshot.objects.get(timestamp=entry.name) + except Snapshot.DoesNotExist: + pass + all_folders[entry.name] = snapshot + return all_folders + + +def get_valid_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, 'Snapshot']: + """dirs with a valid index matched to the main index and archived content""" + return { + snapshot.output_dir: snapshot + for snapshot in snapshots.iterator(chunk_size=500) + if _is_valid_snapshot(snapshot) + } + + +def get_invalid_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, Optional['Snapshot']]: + """dirs that are invalid for any reason: corrupted/duplicate/orphaned/unrecognized""" + duplicate = get_duplicate_folders(snapshots, out_dir=out_dir) + orphaned = get_orphaned_folders(snapshots, out_dir=out_dir) + corrupted = get_corrupted_folders(snapshots, out_dir=out_dir) + unrecognized = get_unrecognized_folders(snapshots, out_dir=out_dir) + return {**duplicate, **orphaned, **corrupted, **unrecognized} + + +def get_duplicate_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, Optional['Snapshot']]: + """dirs that conflict with other directories that have the same URL or timestamp""" + from core.models import Snapshot as SnapshotModel + + by_url: Dict[str, int] = {} + by_timestamp: Dict[str, int] = {} + duplicate_folders: Dict[str, Optional['Snapshot']] = {} + + archive_dir = CONSTANTS.ARCHIVE_DIR + if not archive_dir.exists(): + return duplicate_folders + + data_folders = ( + str(entry) + for entry in archive_dir.iterdir() + if entry.is_dir() and not snapshots.filter(timestamp=entry.name).exists() + ) + + for item in chain(snapshots.iterator(chunk_size=500), data_folders): + snapshot = None + if isinstance(item, str): + path = item + timestamp = Path(path).name + try: + snapshot = SnapshotModel.objects.get(timestamp=timestamp) + except SnapshotModel.DoesNotExist: + pass + else: + snapshot = item + path = snapshot.output_dir + + if snapshot: + by_timestamp[snapshot.timestamp] = by_timestamp.get(snapshot.timestamp, 0) + 1 + if by_timestamp[snapshot.timestamp] > 1: + duplicate_folders[path] = snapshot + + by_url[snapshot.url] = by_url.get(snapshot.url, 0) + 1 + if by_url[snapshot.url] > 1: + duplicate_folders[path] = snapshot + return duplicate_folders + + +def get_orphaned_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, Optional['Snapshot']]: + """dirs that contain a valid index but aren't listed in the main index""" + orphaned_folders: Dict[str, Optional['Snapshot']] = {} + + archive_dir = CONSTANTS.ARCHIVE_DIR + if not archive_dir.exists(): + return orphaned_folders + + for entry in archive_dir.iterdir(): + if entry.is_dir(): + index_path = entry / "index.json" + if index_path.exists() and not snapshots.filter(timestamp=entry.name).exists(): + orphaned_folders[str(entry)] = None + return orphaned_folders + + +def get_corrupted_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, 'Snapshot']: + """dirs that exist but have corrupted/invalid index files""" + corrupted: Dict[str, 'Snapshot'] = {} + for snapshot in snapshots.iterator(chunk_size=500): + if _is_corrupt_snapshot(snapshot): + corrupted[snapshot.output_dir] = snapshot + return corrupted + + +def get_unrecognized_folders(snapshots: QuerySet, out_dir: Path = DATA_DIR) -> Dict[str, None]: + """dirs that don't contain recognizable archive data and aren't listed in the main index""" + unrecognized_folders: Dict[str, None] = {} + + archive_dir = Path(out_dir) / CONSTANTS.ARCHIVE_DIR_NAME + if not archive_dir.exists(): + return unrecognized_folders + + for entry in archive_dir.iterdir(): + if entry.is_dir(): + index_exists = (entry / "index.json").exists() + + if index_exists: + try: + with open(entry / "index.json", 'r') as f: + json.load(f) + except Exception: + unrecognized_folders[str(entry)] = None + else: + timestamp = entry.name + if not snapshots.filter(timestamp=timestamp).exists(): + unrecognized_folders[str(entry)] = None + return unrecognized_folders + @enforce_types def fix_invalid_folder_locations(out_dir: Path = DATA_DIR) -> Tuple[List[str], List[str]]: @@ -27,7 +210,11 @@ def fix_invalid_folder_locations(out_dir: Path = DATA_DIR) -> Tuple[List[str], L """ fixed = [] cant_fix = [] - for entry in os.scandir(out_dir / CONSTANTS.ARCHIVE_DIR_NAME): + archive_dir = out_dir / CONSTANTS.ARCHIVE_DIR_NAME + if not archive_dir.exists(): + return fixed, cant_fix + + for entry in os.scandir(archive_dir): if entry.is_dir(follow_symlinks=True): index_path = Path(entry.path) / 'index.json' if index_path.exists(): @@ -43,7 +230,7 @@ def fix_invalid_folder_locations(out_dir: Path = DATA_DIR) -> Tuple[List[str], L continue if not entry.path.endswith(f'/{timestamp}'): - dest = out_dir / CONSTANTS.ARCHIVE_DIR_NAME / timestamp + dest = archive_dir / timestamp if dest.exists(): cant_fix.append(entry.path) else: