mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-03 09:25:42 +10:00
1241 lines
37 KiB
Markdown
1241 lines
37 KiB
Markdown
# Lazy Filesystem Migration System - Implementation TODO
|
|
|
|
## Architecture Decision: DB as Single Source of Truth
|
|
|
|
**Key Principle**: Only `archivebox update` scans the filesystem (for migration/import). All other commands query the database exclusively.
|
|
|
|
- ✅ `archivebox status` - Query DB only (count by status field)
|
|
- ✅ `archivebox search` - Query DB only (filter by URL/tags/etc)
|
|
- ✅ `archivebox remove` - Query DB + delete directories
|
|
- ⚠️ `archivebox update` - **ONLY command that scans filesystem** (for orphan import + migration)
|
|
- ✅ `archivebox init` - Simplified: just apply migrations, no folder scanning
|
|
|
|
---
|
|
|
|
## Status: What Already Exists
|
|
|
|
### ✅ Core Migration Infrastructure (in `archivebox/core/models.py`)
|
|
|
|
**Lines 348-367: Migration on `save()` with transaction wrapper**
|
|
- Automatically detects if `fs_migration_needed`
|
|
- Walks migration chain: 0.7.0 → 0.8.0 → 0.9.0
|
|
- Calls `_fs_migrate_from_X_to_Y()` methods
|
|
- Updates `fs_version` field within transaction
|
|
|
|
**Lines 393-419: Migration helper methods**
|
|
- `_fs_current_version()` - Gets current ArchiveBox version (normalizes to x.x.0)
|
|
- `fs_migration_needed` property - Checks if migration needed
|
|
- `_fs_next_version()` - Returns next version in chain
|
|
- `_fs_migrate_from_0_7_0_to_0_8_0()` - No-op (same layout)
|
|
- `_fs_migrate_from_0_8_0_to_0_9_0()` - **Placeholder (currently no-op at line 427)** ← NEEDS IMPLEMENTATION
|
|
|
|
**Lines 540-542: `output_dir` property**
|
|
- Currently: `return str(CONSTANTS.ARCHIVE_DIR / self.timestamp)`
|
|
- Needs: Check `fs_version`, handle symlinks for backwards compat
|
|
|
|
**Line 311: `fs_version` field**
|
|
- CharField tracking filesystem version per snapshot
|
|
- Default is current ArchiveBox version
|
|
|
|
**Lines 266-267: Timestamp uniqueness logic EXISTS**
|
|
```python
|
|
while self.filter(timestamp=timestamp).exists():
|
|
timestamp = str(float(timestamp) + 1.0)
|
|
```
|
|
Already implemented in `create_or_update_from_dict()` at line 241!
|
|
|
|
**Lines 120-133: SnapshotQuerySet with `filter_by_patterns()`**
|
|
- Already supports filtering by exact/substring/regex/domain/tag/timestamp
|
|
|
|
**archivebox/misc/jsonl.py:**
|
|
- Line 252: `get_or_create_snapshot()` - Creates snapshot from JSONL record
|
|
- Line 281: Uses `Snapshot.objects.create_or_update_from_dict()` internally
|
|
|
|
### ✅ Current `archivebox update` Implementation (archivebox/cli/archivebox_update.py)
|
|
|
|
**Lines 36-102:**
|
|
- Filters snapshots from DB using `filter_by_patterns()`
|
|
- Applies before/after timestamp filters
|
|
- Queues snapshots via status update
|
|
- Starts Orchestrator to process queued snapshots
|
|
|
|
**Current behavior:**
|
|
- Only queries DB, never scans filesystem ← NEEDS TO BE FIXED
|
|
- No orphan detection ← NEEDS TO BE ADDED
|
|
- No reconciliation ← NEEDS TO BE ADDED
|
|
- No migration triggering ← save() does this automatically
|
|
|
|
---
|
|
|
|
## What Needs Implementation
|
|
|
|
### Phase 1: Add Methods to Snapshot Model
|
|
|
|
File: `archivebox/core/models.py`
|
|
|
|
Add these methods after the existing migration methods (around line 457):
|
|
|
|
```python
|
|
# =========================================================================
|
|
# Path Calculation and Migration Helpers
|
|
# =========================================================================
|
|
|
|
@staticmethod
|
|
def extract_domain_from_url(url: str) -> str:
|
|
"""
|
|
Extract domain from URL for 0.9.x path structure.
|
|
Uses full hostname with sanitized special chars.
|
|
|
|
Examples:
|
|
https://example.com:8080 → example.com_8080
|
|
https://sub.example.com → sub.example.com
|
|
file:///path → localhost
|
|
data:text/html → data
|
|
"""
|
|
from urllib.parse import urlparse
|
|
|
|
try:
|
|
parsed = urlparse(url)
|
|
|
|
if parsed.scheme in ('http', 'https'):
|
|
if parsed.port:
|
|
return f"{parsed.hostname}_{parsed.port}".replace(':', '_')
|
|
return parsed.hostname or 'unknown'
|
|
elif parsed.scheme == 'file':
|
|
return 'localhost'
|
|
elif parsed.scheme:
|
|
return parsed.scheme
|
|
else:
|
|
return 'unknown'
|
|
except Exception:
|
|
return 'unknown'
|
|
|
|
def get_storage_path_for_version(self, version: str) -> Path:
|
|
"""
|
|
Calculate storage path for specific filesystem version.
|
|
Centralizes path logic so it's reusable.
|
|
|
|
0.7.x/0.8.x: archive/{timestamp}
|
|
0.9.x: users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/
|
|
"""
|
|
from datetime import datetime
|
|
|
|
if version in ('0.7.0', '0.8.0'):
|
|
return CONSTANTS.ARCHIVE_DIR / self.timestamp
|
|
|
|
elif version in ('0.9.0', '1.0.0'):
|
|
username = self.created_by.username if self.created_by else 'unknown'
|
|
|
|
# Use created_at for date grouping (fallback to timestamp)
|
|
if self.created_at:
|
|
date_str = self.created_at.strftime('%Y%m%d')
|
|
else:
|
|
date_str = datetime.fromtimestamp(float(self.timestamp)).strftime('%Y%m%d')
|
|
|
|
domain = self.extract_domain_from_url(self.url)
|
|
|
|
return (
|
|
CONSTANTS.DATA_DIR / 'users' / username / 'snapshots' /
|
|
date_str / domain / str(self.id)
|
|
)
|
|
else:
|
|
# Unknown version - use current
|
|
return self.get_storage_path_for_version(self._fs_current_version())
|
|
|
|
# =========================================================================
|
|
# Loading and Creation from Filesystem (Used by archivebox update ONLY)
|
|
# =========================================================================
|
|
|
|
@classmethod
|
|
def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
|
|
"""
|
|
Load existing Snapshot from DB by reading index.json.
|
|
|
|
Reads index.json, extracts url+timestamp, queries DB.
|
|
Returns existing Snapshot or None if not found/invalid.
|
|
Does NOT create new snapshots.
|
|
|
|
ONLY used by: archivebox update (for orphan detection)
|
|
"""
|
|
import json
|
|
|
|
index_path = snapshot_dir / 'index.json'
|
|
if not index_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(index_path) as f:
|
|
data = json.load(f)
|
|
except:
|
|
return None
|
|
|
|
url = data.get('url')
|
|
if not url:
|
|
return None
|
|
|
|
# Get timestamp - prefer index.json, fallback to folder name
|
|
timestamp = cls._select_best_timestamp(
|
|
index_timestamp=data.get('timestamp'),
|
|
folder_name=snapshot_dir.name
|
|
)
|
|
|
|
if not timestamp:
|
|
return None
|
|
|
|
# Look up existing
|
|
try:
|
|
return cls.objects.get(url=url, timestamp=timestamp)
|
|
except cls.DoesNotExist:
|
|
return None
|
|
except cls.MultipleObjectsReturned:
|
|
# Should not happen with unique constraint
|
|
return cls.objects.filter(url=url, timestamp=timestamp).first()
|
|
|
|
@classmethod
|
|
def create_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
|
|
"""
|
|
Create new Snapshot from orphaned directory.
|
|
|
|
Validates timestamp, ensures uniqueness.
|
|
Returns new UNSAVED Snapshot or None if invalid.
|
|
|
|
ONLY used by: archivebox update (for orphan import)
|
|
"""
|
|
import json
|
|
from archivebox.base_models.models import get_or_create_system_user_pk
|
|
|
|
index_path = snapshot_dir / 'index.json'
|
|
if not index_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(index_path) as f:
|
|
data = json.load(f)
|
|
except:
|
|
return None
|
|
|
|
url = data.get('url')
|
|
if not url:
|
|
return None
|
|
|
|
# Get and validate timestamp
|
|
timestamp = cls._select_best_timestamp(
|
|
index_timestamp=data.get('timestamp'),
|
|
folder_name=snapshot_dir.name
|
|
)
|
|
|
|
if not timestamp:
|
|
return None
|
|
|
|
# Ensure uniqueness (reuses existing logic from create_or_update_from_dict)
|
|
timestamp = cls._ensure_unique_timestamp(url, timestamp)
|
|
|
|
# Detect version
|
|
fs_version = cls._detect_fs_version_from_index(data)
|
|
|
|
return cls(
|
|
url=url,
|
|
timestamp=timestamp,
|
|
title=data.get('title', ''),
|
|
fs_version=fs_version,
|
|
created_by_id=get_or_create_system_user_pk(),
|
|
)
|
|
|
|
@staticmethod
|
|
def _select_best_timestamp(index_timestamp: str, folder_name: str) -> Optional[str]:
|
|
"""
|
|
Select best timestamp from index.json vs folder name.
|
|
|
|
Validates range (1995-2035).
|
|
Prefers index.json if valid.
|
|
"""
|
|
def is_valid_timestamp(ts):
|
|
try:
|
|
ts_int = int(float(ts))
|
|
# 1995-01-01 to 2035-12-31
|
|
return 788918400 <= ts_int <= 2082758400
|
|
except:
|
|
return False
|
|
|
|
index_valid = is_valid_timestamp(index_timestamp) if index_timestamp else False
|
|
folder_valid = is_valid_timestamp(folder_name)
|
|
|
|
if index_valid:
|
|
return str(int(float(index_timestamp)))
|
|
elif folder_valid:
|
|
return str(int(float(folder_name)))
|
|
else:
|
|
return None
|
|
|
|
@classmethod
|
|
def _ensure_unique_timestamp(cls, url: str, timestamp: str) -> str:
|
|
"""
|
|
Ensure timestamp is globally unique.
|
|
If collision with different URL, increment by 1 until unique.
|
|
|
|
NOTE: Logic already exists in create_or_update_from_dict (line 266-267)
|
|
This is just an extracted, reusable version.
|
|
"""
|
|
while cls.objects.filter(timestamp=timestamp).exclude(url=url).exists():
|
|
timestamp = str(int(float(timestamp)) + 1)
|
|
return timestamp
|
|
|
|
@staticmethod
|
|
def _detect_fs_version_from_index(data: dict) -> str:
|
|
"""
|
|
Detect fs_version from index.json structure.
|
|
|
|
- Has fs_version field: use it
|
|
- Has history dict: 0.7.0
|
|
- Has archive_results list: 0.8.0
|
|
- Default: 0.7.0
|
|
"""
|
|
if 'fs_version' in data:
|
|
return data['fs_version']
|
|
if 'history' in data and 'archive_results' not in data:
|
|
return '0.7.0'
|
|
if 'archive_results' in data:
|
|
return '0.8.0'
|
|
return '0.7.0'
|
|
|
|
# =========================================================================
|
|
# Index.json Reconciliation
|
|
# =========================================================================
|
|
|
|
def reconcile_with_index_json(self):
|
|
"""
|
|
Merge index.json with DB. DB is source of truth.
|
|
|
|
- Title: longest non-URL
|
|
- Tags: union
|
|
- ArchiveResults: keep both (by extractor+start_ts)
|
|
|
|
Writes back in 0.9.x format.
|
|
|
|
Used by: archivebox update (to sync index.json with DB)
|
|
"""
|
|
import json
|
|
|
|
index_path = Path(self.output_dir) / 'index.json'
|
|
|
|
index_data = {}
|
|
if index_path.exists():
|
|
try:
|
|
with open(index_path) as f:
|
|
index_data = json.load(f)
|
|
except:
|
|
pass
|
|
|
|
# Merge title
|
|
self._merge_title_from_index(index_data)
|
|
|
|
# Merge tags
|
|
self._merge_tags_from_index(index_data)
|
|
|
|
# Merge ArchiveResults
|
|
self._merge_archive_results_from_index(index_data)
|
|
|
|
# Write back
|
|
self.write_index_json()
|
|
|
|
def _merge_title_from_index(self, index_data: dict):
|
|
"""Merge title - prefer longest non-URL title."""
|
|
index_title = index_data.get('title', '').strip()
|
|
db_title = self.title or ''
|
|
|
|
candidates = [t for t in [index_title, db_title] if t and t != self.url]
|
|
if candidates:
|
|
best_title = max(candidates, key=len)
|
|
if self.title != best_title:
|
|
self.title = best_title
|
|
|
|
def _merge_tags_from_index(self, index_data: dict):
|
|
"""Merge tags - union of both sources."""
|
|
from django.db import transaction
|
|
|
|
index_tags = set(index_data.get('tags', '').split(',')) if index_data.get('tags') else set()
|
|
index_tags = {t.strip() for t in index_tags if t.strip()}
|
|
|
|
db_tags = set(self.tags.values_list('name', flat=True))
|
|
|
|
new_tags = index_tags - db_tags
|
|
if new_tags:
|
|
with transaction.atomic():
|
|
for tag_name in new_tags:
|
|
tag, _ = Tag.objects.get_or_create(name=tag_name)
|
|
self.tags.add(tag)
|
|
|
|
def _merge_archive_results_from_index(self, index_data: dict):
|
|
"""Merge ArchiveResults - keep both (by extractor+start_ts)."""
|
|
existing = {
|
|
(ar.extractor, ar.start_ts): ar
|
|
for ar in ArchiveResult.objects.filter(snapshot=self)
|
|
}
|
|
|
|
# Handle 0.8.x format (archive_results list)
|
|
for result_data in index_data.get('archive_results', []):
|
|
self._create_archive_result_if_missing(result_data, existing)
|
|
|
|
# Handle 0.7.x format (history dict)
|
|
if 'history' in index_data and isinstance(index_data['history'], dict):
|
|
for extractor, result_list in index_data['history'].items():
|
|
if isinstance(result_list, list):
|
|
for result_data in result_list:
|
|
result_data['extractor'] = extractor
|
|
self._create_archive_result_if_missing(result_data, existing)
|
|
|
|
def _create_archive_result_if_missing(self, result_data: dict, existing: dict):
|
|
"""Create ArchiveResult if not already in DB."""
|
|
from dateutil import parser
|
|
import json
|
|
|
|
extractor = result_data.get('extractor', '')
|
|
if not extractor:
|
|
return
|
|
|
|
start_ts = None
|
|
if result_data.get('start_ts'):
|
|
try:
|
|
start_ts = parser.parse(result_data['start_ts'])
|
|
except:
|
|
pass
|
|
|
|
if (extractor, start_ts) in existing:
|
|
return
|
|
|
|
try:
|
|
end_ts = None
|
|
if result_data.get('end_ts'):
|
|
try:
|
|
end_ts = parser.parse(result_data['end_ts'])
|
|
except:
|
|
pass
|
|
|
|
ArchiveResult.objects.create(
|
|
snapshot=self,
|
|
extractor=extractor,
|
|
status=result_data.get('status', 'failed'),
|
|
output_str=result_data.get('output', ''),
|
|
cmd=result_data.get('cmd', []),
|
|
pwd=result_data.get('pwd', str(self.output_dir)),
|
|
start_ts=start_ts,
|
|
end_ts=end_ts,
|
|
created_by=self.created_by,
|
|
)
|
|
except:
|
|
pass
|
|
|
|
def write_index_json(self):
|
|
"""Write index.json in 0.9.x format."""
|
|
import json
|
|
|
|
index_path = Path(self.output_dir) / 'index.json'
|
|
|
|
data = {
|
|
'url': self.url,
|
|
'timestamp': self.timestamp,
|
|
'title': self.title or '',
|
|
'tags': ','.join(sorted(self.tags.values_list('name', flat=True))),
|
|
'fs_version': self.fs_version,
|
|
'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
'archive_results': [
|
|
{
|
|
'extractor': ar.extractor,
|
|
'status': ar.status,
|
|
'start_ts': ar.start_ts.isoformat() if ar.start_ts else None,
|
|
'end_ts': ar.end_ts.isoformat() if ar.end_ts else None,
|
|
'output': ar.output_str or '',
|
|
'cmd': ar.cmd if isinstance(ar.cmd, list) else [],
|
|
'pwd': ar.pwd,
|
|
}
|
|
for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts')
|
|
],
|
|
}
|
|
|
|
index_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(index_path, 'w') as f:
|
|
json.dump(data, f, indent=2, sort_keys=True)
|
|
|
|
# =========================================================================
|
|
# Snapshot Utilities
|
|
# =========================================================================
|
|
|
|
@staticmethod
|
|
def move_directory_to_invalid(snapshot_dir: Path):
|
|
"""
|
|
Move invalid directory to data/invalid/YYYYMMDD/.
|
|
|
|
Used by: archivebox update (when encountering invalid directories)
|
|
"""
|
|
from datetime import datetime
|
|
import shutil
|
|
|
|
invalid_dir = CONSTANTS.DATA_DIR / 'invalid' / datetime.now().strftime('%Y%m%d')
|
|
invalid_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
dest = invalid_dir / snapshot_dir.name
|
|
counter = 1
|
|
while dest.exists():
|
|
dest = invalid_dir / f"{snapshot_dir.name}_{counter}"
|
|
counter += 1
|
|
|
|
try:
|
|
shutil.move(str(snapshot_dir), str(dest))
|
|
except:
|
|
pass
|
|
|
|
@classmethod
|
|
def find_and_merge_duplicates(cls) -> int:
|
|
"""
|
|
Find and merge snapshots with same url:timestamp.
|
|
Returns count of duplicate sets merged.
|
|
|
|
Used by: archivebox update (Phase 3: deduplication)
|
|
"""
|
|
from django.db.models import Count
|
|
|
|
duplicates = (
|
|
cls.objects
|
|
.values('url', 'timestamp')
|
|
.annotate(count=Count('id'))
|
|
.filter(count__gt=1)
|
|
)
|
|
|
|
merged = 0
|
|
for dup in duplicates.iterator():
|
|
snapshots = list(
|
|
cls.objects
|
|
.filter(url=dup['url'], timestamp=dup['timestamp'])
|
|
.order_by('created_at') # Keep oldest
|
|
)
|
|
|
|
if len(snapshots) > 1:
|
|
try:
|
|
cls._merge_snapshots(snapshots)
|
|
merged += 1
|
|
except:
|
|
pass
|
|
|
|
return merged
|
|
|
|
@classmethod
|
|
def _merge_snapshots(cls, snapshots: list['Snapshot']):
|
|
"""
|
|
Merge exact duplicates.
|
|
Keep oldest, union files + ArchiveResults.
|
|
"""
|
|
import shutil
|
|
|
|
keeper = snapshots[0]
|
|
duplicates = snapshots[1:]
|
|
|
|
keeper_dir = Path(keeper.output_dir)
|
|
|
|
for dup in duplicates:
|
|
dup_dir = Path(dup.output_dir)
|
|
|
|
# Merge files
|
|
if dup_dir.exists() and dup_dir != keeper_dir:
|
|
for dup_file in dup_dir.rglob('*'):
|
|
if not dup_file.is_file():
|
|
continue
|
|
|
|
rel = dup_file.relative_to(dup_dir)
|
|
keeper_file = keeper_dir / rel
|
|
|
|
if not keeper_file.exists():
|
|
keeper_file.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(dup_file, keeper_file)
|
|
|
|
try:
|
|
shutil.rmtree(dup_dir)
|
|
except:
|
|
pass
|
|
|
|
# Merge tags
|
|
for tag in dup.tags.all():
|
|
keeper.tags.add(tag)
|
|
|
|
# Move ArchiveResults
|
|
ArchiveResult.objects.filter(snapshot=dup).update(snapshot=keeper)
|
|
|
|
# Delete
|
|
dup.delete()
|
|
```
|
|
|
|
### Phase 2: Update `output_dir` Property
|
|
|
|
File: `archivebox/core/models.py` line 540
|
|
|
|
Replace current implementation:
|
|
|
|
```python
|
|
@cached_property
|
|
def output_dir(self):
|
|
"""The filesystem path to the snapshot's output directory."""
|
|
import os
|
|
|
|
current_path = self.get_storage_path_for_version(self.fs_version)
|
|
|
|
if current_path.exists():
|
|
return str(current_path)
|
|
|
|
# Check for backwards-compat symlink
|
|
old_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
|
|
if old_path.is_symlink():
|
|
return str(Path(os.readlink(old_path)).resolve())
|
|
elif old_path.exists():
|
|
return str(old_path)
|
|
|
|
return str(current_path)
|
|
```
|
|
|
|
### Phase 3: Implement Real Migration
|
|
|
|
File: `archivebox/core/models.py` line 427
|
|
|
|
Replace the placeholder `_fs_migrate_from_0_8_0_to_0_9_0()`:
|
|
|
|
```python
|
|
def _fs_migrate_from_0_8_0_to_0_9_0(self):
|
|
"""
|
|
Migrate from flat to nested structure.
|
|
|
|
0.8.x: archive/{timestamp}/
|
|
0.9.x: users/{user}/snapshots/YYYYMMDD/{domain}/{uuid}/
|
|
|
|
Transaction handling:
|
|
1. Copy files INSIDE transaction
|
|
2. Create symlink INSIDE transaction
|
|
3. Update fs_version INSIDE transaction (done by save())
|
|
4. Exit transaction (DB commit)
|
|
5. Delete old files OUTSIDE transaction (after commit)
|
|
"""
|
|
import shutil
|
|
from django.db import transaction
|
|
|
|
old_dir = self.get_storage_path_for_version('0.8.0')
|
|
new_dir = self.get_storage_path_for_version('0.9.0')
|
|
|
|
if not old_dir.exists() or old_dir == new_dir or new_dir.exists():
|
|
return
|
|
|
|
new_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Copy all files (idempotent)
|
|
for old_file in old_dir.rglob('*'):
|
|
if not old_file.is_file():
|
|
continue
|
|
|
|
rel_path = old_file.relative_to(old_dir)
|
|
new_file = new_dir / rel_path
|
|
|
|
# Skip if already copied
|
|
if new_file.exists() and new_file.stat().st_size == old_file.stat().st_size:
|
|
continue
|
|
|
|
new_file.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(old_file, new_file)
|
|
|
|
# Verify all copied
|
|
old_files = {f.relative_to(old_dir): f.stat().st_size
|
|
for f in old_dir.rglob('*') if f.is_file()}
|
|
new_files = {f.relative_to(new_dir): f.stat().st_size
|
|
for f in new_dir.rglob('*') if f.is_file()}
|
|
|
|
if old_files.keys() != new_files.keys():
|
|
missing = old_files.keys() - new_files.keys()
|
|
raise Exception(f"Migration incomplete: missing {missing}")
|
|
|
|
# Create backwards-compat symlink (INSIDE transaction)
|
|
symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
|
|
if symlink_path.is_symlink():
|
|
symlink_path.unlink()
|
|
|
|
if not symlink_path.exists() or symlink_path == old_dir:
|
|
symlink_path.symlink_to(new_dir, target_is_directory=True)
|
|
|
|
# Schedule old directory deletion AFTER transaction commits
|
|
transaction.on_commit(lambda: self._cleanup_old_migration_dir(old_dir))
|
|
|
|
def _cleanup_old_migration_dir(self, old_dir: Path):
|
|
"""
|
|
Delete old directory after successful migration.
|
|
Called via transaction.on_commit() after DB commit succeeds.
|
|
"""
|
|
import shutil
|
|
import logging
|
|
|
|
if old_dir.exists() and not old_dir.is_symlink():
|
|
try:
|
|
shutil.rmtree(old_dir)
|
|
except Exception as e:
|
|
# Log but don't raise - migration succeeded, this is just cleanup
|
|
logging.getLogger('archivebox.migration').warning(
|
|
f"Could not remove old migration directory {old_dir}: {e}"
|
|
)
|
|
```
|
|
|
|
### Phase 4: Add Timestamp Uniqueness Constraint
|
|
|
|
File: `archivebox/core/models.py` - Add to `Snapshot.Meta` class (around line 330):
|
|
|
|
```python
|
|
class Meta(TypedModelMeta):
|
|
verbose_name = "Snapshot"
|
|
verbose_name_plural = "Snapshots"
|
|
constraints = [
|
|
# Allow same URL in different crawls, but not duplicates within same crawl
|
|
models.UniqueConstraint(fields=['url', 'crawl'], name='unique_url_per_crawl'),
|
|
# Global timestamp uniqueness for 1:1 symlink mapping
|
|
models.UniqueConstraint(fields=['timestamp'], name='unique_timestamp'),
|
|
]
|
|
```
|
|
|
|
Then create migration:
|
|
```bash
|
|
python -m archivebox manage makemigrations core
|
|
```
|
|
|
|
### Phase 5: Rewrite `archivebox update`
|
|
|
|
File: `archivebox/cli/archivebox_update.py`
|
|
|
|
Replace entire file:
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
|
|
__package__ = 'archivebox.cli'
|
|
|
|
import os
|
|
import time
|
|
import rich_click as click
|
|
|
|
from typing import Iterable
|
|
from pathlib import Path
|
|
|
|
from archivebox.misc.util import enforce_types, docstring
|
|
|
|
|
|
@enforce_types
|
|
def update(filter_patterns: Iterable[str] = (),
|
|
filter_type: str = 'exact',
|
|
before: float | None = None,
|
|
after: float | None = None,
|
|
resume: str | None = None,
|
|
batch_size: int = 100,
|
|
continuous: bool = False) -> None:
|
|
"""
|
|
Update snapshots: import orphans, reconcile, and re-run failed extractors.
|
|
|
|
Two-phase operation:
|
|
- Phase 1: Scan archive/ for orphaned snapshots (skip symlinks)
|
|
- Phase 2: Process all DB snapshots (reconcile + re-queue for archiving)
|
|
- Phase 3: Deduplicate exact duplicates
|
|
|
|
With filters: Only phase 2 (DB query), no filesystem scan.
|
|
Without filters: All phases (full update).
|
|
"""
|
|
|
|
from rich import print
|
|
from archivebox.config.django import setup_django
|
|
setup_django()
|
|
|
|
from archivebox.core.models import Snapshot
|
|
from django.utils import timezone
|
|
|
|
while True:
|
|
if filter_patterns or before or after:
|
|
# Filtered mode: query DB only
|
|
print('[*] Processing filtered snapshots from database...')
|
|
stats = process_filtered_snapshots(
|
|
filter_patterns=filter_patterns,
|
|
filter_type=filter_type,
|
|
before=before,
|
|
after=after,
|
|
batch_size=batch_size
|
|
)
|
|
print_stats(stats)
|
|
else:
|
|
# Full mode: import orphans + process DB + deduplicate
|
|
stats_combined = {'phase1': {}, 'phase2': {}, 'deduplicated': 0}
|
|
|
|
print('[*] Phase 1: Scanning archive/ for orphaned snapshots...')
|
|
stats_combined['phase1'] = import_orphans_from_archive(
|
|
resume_from=resume,
|
|
batch_size=batch_size
|
|
)
|
|
|
|
print('[*] Phase 2: Processing all database snapshots...')
|
|
stats_combined['phase2'] = process_all_db_snapshots(batch_size=batch_size)
|
|
|
|
print('[*] Phase 3: Deduplicating...')
|
|
stats_combined['deduplicated'] = Snapshot.find_and_merge_duplicates()
|
|
|
|
print_combined_stats(stats_combined)
|
|
|
|
if not continuous:
|
|
break
|
|
|
|
print('[yellow]Sleeping 60s before next pass...[/yellow]')
|
|
time.sleep(60)
|
|
resume = None
|
|
|
|
|
|
def import_orphans_from_archive(resume_from: str = None, batch_size: int = 100) -> dict:
|
|
"""
|
|
Scan archive/ for orphaned snapshots.
|
|
Skip symlinks (already migrated).
|
|
Create DB records and trigger migration on save().
|
|
"""
|
|
from archivebox.core.models import Snapshot
|
|
from archivebox.config import CONSTANTS
|
|
from django.db import transaction
|
|
|
|
stats = {'processed': 0, 'imported': 0, 'migrated': 0, 'invalid': 0}
|
|
|
|
archive_dir = CONSTANTS.ARCHIVE_DIR
|
|
if not archive_dir.exists():
|
|
return stats
|
|
|
|
print('[*] Scanning and sorting by modification time...')
|
|
|
|
# Scan and sort by mtime (newest first)
|
|
# Loading (mtime, path) tuples is fine even for millions (~100MB for 1M entries)
|
|
entries = [
|
|
(e.stat().st_mtime, e.path)
|
|
for e in os.scandir(archive_dir)
|
|
if e.is_dir(follow_symlinks=False) # Skip symlinks
|
|
]
|
|
entries.sort(reverse=True) # Newest first
|
|
print(f'[*] Found {len(entries)} directories to check')
|
|
|
|
for mtime, entry_path in entries:
|
|
entry_path = Path(entry_path)
|
|
|
|
# Resume from timestamp if specified
|
|
if resume_from and entry_path.name < resume_from:
|
|
continue
|
|
|
|
stats['processed'] += 1
|
|
|
|
# Check if already in DB
|
|
snapshot = Snapshot.load_from_directory(entry_path)
|
|
if snapshot:
|
|
continue # Already in DB, skip
|
|
|
|
# Not in DB - create orphaned snapshot
|
|
snapshot = Snapshot.create_from_directory(entry_path)
|
|
if not snapshot:
|
|
# Invalid directory
|
|
Snapshot.move_directory_to_invalid(entry_path)
|
|
stats['invalid'] += 1
|
|
print(f" [{stats['processed']}] Invalid: {entry_path.name}")
|
|
continue
|
|
|
|
needs_migration = snapshot.fs_migration_needed
|
|
|
|
snapshot.save() # Creates DB record + triggers migration
|
|
|
|
stats['imported'] += 1
|
|
if needs_migration:
|
|
stats['migrated'] += 1
|
|
print(f" [{stats['processed']}] Imported + migrated: {entry_path.name}")
|
|
else:
|
|
print(f" [{stats['processed']}] Imported: {entry_path.name}")
|
|
|
|
if stats['processed'] % batch_size == 0:
|
|
transaction.commit()
|
|
|
|
transaction.commit()
|
|
return stats
|
|
|
|
|
|
def process_all_db_snapshots(batch_size: int = 100) -> dict:
|
|
"""
|
|
Process all snapshots in DB.
|
|
Reconcile index.json and queue for archiving.
|
|
"""
|
|
from archivebox.core.models import Snapshot
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
|
|
stats = {'processed': 0, 'reconciled': 0, 'queued': 0}
|
|
|
|
total = Snapshot.objects.count()
|
|
print(f'[*] Processing {total} snapshots from database...')
|
|
|
|
for snapshot in Snapshot.objects.iterator():
|
|
# Reconcile index.json with DB
|
|
snapshot.reconcile_with_index_json()
|
|
|
|
# Queue for archiving (state machine will handle it)
|
|
snapshot.status = Snapshot.StatusChoices.QUEUED
|
|
snapshot.retry_at = timezone.now()
|
|
snapshot.save()
|
|
|
|
stats['reconciled'] += 1
|
|
stats['queued'] += 1
|
|
stats['processed'] += 1
|
|
|
|
if stats['processed'] % batch_size == 0:
|
|
transaction.commit()
|
|
print(f" [{stats['processed']}/{total}] Processed...")
|
|
|
|
transaction.commit()
|
|
return stats
|
|
|
|
|
|
def process_filtered_snapshots(
|
|
filter_patterns: Iterable[str],
|
|
filter_type: str,
|
|
before: float | None,
|
|
after: float | None,
|
|
batch_size: int
|
|
) -> dict:
|
|
"""Process snapshots matching filters (DB query only)."""
|
|
from archivebox.core.models import Snapshot
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
from datetime import datetime
|
|
|
|
stats = {'processed': 0, 'reconciled': 0, 'queued': 0}
|
|
|
|
snapshots = Snapshot.objects.all()
|
|
|
|
if filter_patterns:
|
|
snapshots = Snapshot.objects.filter_by_patterns(list(filter_patterns), filter_type)
|
|
|
|
if before:
|
|
snapshots = snapshots.filter(bookmarked_at__lt=datetime.fromtimestamp(before))
|
|
if after:
|
|
snapshots = snapshots.filter(bookmarked_at__gt=datetime.fromtimestamp(after))
|
|
|
|
total = snapshots.count()
|
|
print(f'[*] Found {total} matching snapshots')
|
|
|
|
for snapshot in snapshots.iterator():
|
|
# Reconcile index.json with DB
|
|
snapshot.reconcile_with_index_json()
|
|
|
|
# Queue for archiving
|
|
snapshot.status = Snapshot.StatusChoices.QUEUED
|
|
snapshot.retry_at = timezone.now()
|
|
snapshot.save()
|
|
|
|
stats['reconciled'] += 1
|
|
stats['queued'] += 1
|
|
stats['processed'] += 1
|
|
|
|
if stats['processed'] % batch_size == 0:
|
|
transaction.commit()
|
|
print(f" [{stats['processed']}/{total}] Processed...")
|
|
|
|
transaction.commit()
|
|
return stats
|
|
|
|
|
|
def print_stats(stats: dict):
|
|
"""Print statistics for filtered mode."""
|
|
from rich import print
|
|
|
|
print(f"""
|
|
[green]Update Complete[/green]
|
|
Processed: {stats['processed']}
|
|
Reconciled: {stats['reconciled']}
|
|
Queued: {stats['queued']}
|
|
""")
|
|
|
|
|
|
def print_combined_stats(stats_combined: dict):
|
|
"""Print statistics for full mode."""
|
|
from rich import print
|
|
|
|
s1 = stats_combined['phase1']
|
|
s2 = stats_combined['phase2']
|
|
|
|
print(f"""
|
|
[green]Archive Update Complete[/green]
|
|
|
|
Phase 1 (Import Orphans):
|
|
Checked: {s1.get('processed', 0)}
|
|
Imported: {s1.get('imported', 0)}
|
|
Migrated: {s1.get('migrated', 0)}
|
|
Invalid: {s1.get('invalid', 0)}
|
|
|
|
Phase 2 (Process DB):
|
|
Processed: {s2.get('processed', 0)}
|
|
Reconciled: {s2.get('reconciled', 0)}
|
|
Queued: {s2.get('queued', 0)}
|
|
|
|
Phase 3 (Deduplicate):
|
|
Merged: {stats_combined['deduplicated']}
|
|
""")
|
|
|
|
|
|
@click.command()
|
|
@click.option('--resume', type=str, help='Resume from timestamp')
|
|
@click.option('--before', type=float, help='Only snapshots before timestamp')
|
|
@click.option('--after', type=float, help='Only snapshots after timestamp')
|
|
@click.option('--filter-type', '-t', type=click.Choice(['exact', 'substring', 'regex', 'domain', 'tag', 'timestamp']), default='exact')
|
|
@click.option('--batch-size', type=int, default=100, help='Commit every N snapshots')
|
|
@click.option('--continuous', is_flag=True, help='Run continuously as background worker')
|
|
@click.argument('filter_patterns', nargs=-1)
|
|
@docstring(update.__doc__)
|
|
def main(**kwargs):
|
|
update(**kwargs)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
```
|
|
|
|
### Phase 6: Simplify `archivebox init`
|
|
|
|
File: `archivebox/cli/archivebox_init.py`
|
|
|
|
Remove lines 24, 113-150 (folder status function usage):
|
|
|
|
```python
|
|
# DELETE line 24:
|
|
from archivebox.misc.folders import fix_invalid_folder_locations, get_invalid_folders
|
|
|
|
# DELETE lines 113-150 (folder scanning logic):
|
|
# Replace with simple message:
|
|
print(' > Run "archivebox update" to import any orphaned snapshot directories')
|
|
```
|
|
|
|
Simplified logic:
|
|
- Create directory structure
|
|
- Apply migrations
|
|
- **Don't scan for orphans** (let `archivebox update` handle it)
|
|
|
|
### Phase 7: Simplify `archivebox search`
|
|
|
|
File: `archivebox/cli/archivebox_search.py`
|
|
|
|
Remove lines 65-96 (all folder status imports and `list_folders()` function):
|
|
|
|
```python
|
|
# DELETE lines 65-96
|
|
# DELETE STATUS_CHOICES with 'valid', 'invalid', 'orphaned', 'corrupted', 'unrecognized'
|
|
|
|
# Keep only: 'indexed', 'archived', 'unarchived'
|
|
STATUS_CHOICES = ['indexed', 'archived', 'unarchived']
|
|
```
|
|
|
|
Update `search()` function to query DB directly:
|
|
|
|
```python
|
|
@enforce_types
|
|
def search(filter_patterns: list[str] | None=None,
|
|
filter_type: str='substring',
|
|
status: str='indexed',
|
|
before: float | None=None,
|
|
after: float | None=None,
|
|
sort: str | None=None,
|
|
json: bool=False,
|
|
html: bool=False,
|
|
csv: str | None=None,
|
|
with_headers: bool=False):
|
|
"""List, filter, and export information about archive entries"""
|
|
|
|
from archivebox.core.models import Snapshot
|
|
|
|
if with_headers and not (json or html or csv):
|
|
stderr('[X] --with-headers requires --json, --html or --csv\n', color='red')
|
|
raise SystemExit(2)
|
|
|
|
# Query DB directly
|
|
snapshots = Snapshot.objects.all()
|
|
|
|
if filter_patterns:
|
|
snapshots = Snapshot.objects.filter_by_patterns(list(filter_patterns), filter_type)
|
|
|
|
if status == 'archived':
|
|
snapshots = snapshots.filter(downloaded_at__isnull=False)
|
|
elif status == 'unarchived':
|
|
snapshots = snapshots.filter(downloaded_at__isnull=True)
|
|
# 'indexed' = all snapshots (no filter)
|
|
|
|
if before:
|
|
from datetime import datetime
|
|
snapshots = snapshots.filter(bookmarked_at__lt=datetime.fromtimestamp(before))
|
|
if after:
|
|
from datetime import datetime
|
|
snapshots = snapshots.filter(bookmarked_at__gt=datetime.fromtimestamp(after))
|
|
|
|
if sort:
|
|
snapshots = snapshots.order_by(sort)
|
|
|
|
# Export to requested format
|
|
if json:
|
|
output = snapshots.to_json(with_headers=with_headers)
|
|
elif html:
|
|
output = snapshots.to_html(with_headers=with_headers)
|
|
elif csv:
|
|
output = snapshots.to_csv(cols=csv.split(','), header=with_headers)
|
|
else:
|
|
from archivebox.misc.logging_util import printable_folders
|
|
# Convert to dict for printable_folders
|
|
folders = {s.output_dir: s for s in snapshots}
|
|
output = printable_folders(folders, with_headers)
|
|
|
|
print(output)
|
|
return output
|
|
```
|
|
|
|
### Phase 8: Delete Folder Status Functions
|
|
|
|
File: `archivebox/misc/folders.py`
|
|
|
|
Delete lines 23-186 (all status checking functions):
|
|
|
|
```python
|
|
# DELETE these functions entirely:
|
|
# - _is_valid_snapshot()
|
|
# - _is_corrupt_snapshot()
|
|
# - get_indexed_folders()
|
|
# - get_archived_folders()
|
|
# - get_unarchived_folders()
|
|
# - get_present_folders()
|
|
# - get_valid_folders()
|
|
# - get_invalid_folders()
|
|
# - get_duplicate_folders()
|
|
# - get_orphaned_folders()
|
|
# - get_corrupted_folders()
|
|
# - get_unrecognized_folders()
|
|
```
|
|
|
|
Keep only `fix_invalid_folder_locations()` (used by archivebox init for one-time cleanup):
|
|
|
|
```python
|
|
"""
|
|
Folder utilities for ArchiveBox.
|
|
|
|
Note: This file only contains legacy cleanup utilities.
|
|
The DB is the single source of truth - use Snapshot.objects queries for all status checks.
|
|
"""
|
|
|
|
__package__ = 'archivebox.misc'
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Tuple, List
|
|
|
|
from archivebox.config import DATA_DIR, CONSTANTS
|
|
from archivebox.misc.util import enforce_types
|
|
|
|
|
|
@enforce_types
|
|
def fix_invalid_folder_locations(out_dir: Path = DATA_DIR) -> Tuple[List[str], List[str]]:
|
|
"""
|
|
Legacy cleanup: Move folders to their correct timestamp-named locations based on index.json.
|
|
|
|
This is only used during 'archivebox init' for one-time cleanup of misnamed directories.
|
|
After this runs once, 'archivebox update' handles all filesystem operations.
|
|
"""
|
|
fixed = []
|
|
cant_fix = []
|
|
for entry in os.scandir(out_dir / CONSTANTS.ARCHIVE_DIR_NAME):
|
|
if entry.is_dir(follow_symlinks=True):
|
|
index_path = Path(entry.path) / 'index.json'
|
|
if index_path.exists():
|
|
try:
|
|
with open(index_path, 'r') as f:
|
|
data = json.load(f)
|
|
timestamp = data.get('timestamp')
|
|
url = data.get('url')
|
|
except Exception:
|
|
continue
|
|
|
|
if not timestamp:
|
|
continue
|
|
|
|
if not entry.path.endswith(f'/{timestamp}'):
|
|
dest = out_dir / CONSTANTS.ARCHIVE_DIR_NAME / timestamp
|
|
if dest.exists():
|
|
cant_fix.append(entry.path)
|
|
else:
|
|
shutil.move(entry.path, str(dest))
|
|
fixed.append(str(dest))
|
|
return fixed, cant_fix
|
|
```
|
|
|
|
---
|
|
|
|
## Testing Plan
|
|
|
|
1. **Test migration idempotency:**
|
|
```bash
|
|
# Interrupt migration mid-way
|
|
# Re-run - should resume seamlessly
|
|
```
|
|
|
|
2. **Test orphan import:**
|
|
```bash
|
|
# Create orphaned directory manually
|
|
# Run archivebox update
|
|
# Verify imported and migrated
|
|
```
|
|
|
|
3. **Test deduplication:**
|
|
```bash
|
|
# Create two snapshots with same url:timestamp
|
|
# Run archivebox update
|
|
# Verify merged
|
|
```
|
|
|
|
4. **Test timestamp uniqueness:**
|
|
```bash
|
|
# Try to create snapshots with colliding timestamps
|
|
# Verify auto-increment
|
|
```
|
|
|
|
5. **Test filtered update:**
|
|
```bash
|
|
archivebox update --after 1234567890
|
|
# Should only process DB, no filesystem scan
|
|
```
|
|
|
|
6. **Test continuous mode:**
|
|
```bash
|
|
archivebox update --continuous
|
|
# Should run in loop, prioritize newest entries
|
|
```
|
|
|
|
7. **Test DB-only commands:**
|
|
```bash
|
|
archivebox search --status archived
|
|
archivebox search example.com --filter-type substring
|
|
archivebox remove example.com
|
|
# All should query DB only, no filesystem scanning
|
|
```
|
|
|
|
---
|
|
|
|
## Implementation Checklist
|
|
|
|
- [x] Add all new methods to `Snapshot` model (Phase 1)
|
|
- [x] Update `output_dir` property (Phase 2)
|
|
- [x] Implement real `_fs_migrate_from_0_8_0_to_0_9_0()` (Phase 3)
|
|
- [x] Add `_cleanup_old_migration_dir()` helper (Phase 3)
|
|
- [x] Add timestamp uniqueness constraint (Phase 4)
|
|
- [x] Create database migration for constraint (Phase 4) - Created: `0032_alter_archiveresult_binary_and_more.py`
|
|
- [x] Rewrite `archivebox/cli/archivebox_update.py` (Phase 5)
|
|
- [x] Simplify `archivebox/cli/archivebox_init.py` (Phase 6)
|
|
- [x] Simplify `archivebox/cli/archivebox_search.py` (Phase 7)
|
|
- [x] Delete folder status functions from `archivebox/misc/folders.py` (Phase 8)
|
|
- [x] Update migration tests (test_migrations_08_to_09.py)
|
|
- [x] Update update command tests (tests/test_update.py)
|
|
- [ ] Run tests to verify implementation
|
|
- [ ] Test migration on real 0.8.x collection
|
|
- [ ] Test orphan import in production
|
|
- [ ] Test deduplication in production
|
|
- [ ] Test filtered vs full mode in production
|
|
- [ ] Test continuous mode in production
|