feat: replace index.json with index.jsonl flat JSONL format

Switch from hierarchical index.json to flat index.jsonl format for
snapshot metadata storage. Each line is a self-contained JSON record
with a 'type' field (Snapshot, ArchiveResult, Binary, Process).

Changes:
- Add JSONL_INDEX_FILENAME constant to constants.py
- Add TYPE_PROCESS and TYPE_MACHINE to jsonl.py type constants
- Add binary_to_jsonl(), process_to_jsonl(), machine_to_jsonl() converters
- Add Snapshot.write_index_jsonl() to write new format
- Add Snapshot.read_index_jsonl() to read new format
- Add Snapshot.convert_index_json_to_jsonl() for migration
- Update Snapshot.reconcile_with_index() to handle both formats
- Update fs_migrate to convert during filesystem migration
- Update load_from_directory/create_from_directory for both formats
- Update legacy.py parse_json_links_details for JSONL support

The new format is easier to parse, extend, and mix record types.
This commit is contained in:
Claude
2025-12-30 18:21:06 +00:00
parent 96ee1bf686
commit d36079829b
4 changed files with 359 additions and 44 deletions

View File

@@ -100,6 +100,7 @@ class ConstantsDict(Mapping):
DATABASE_FILE: Path = DATA_DIR / SQL_INDEX_FILENAME
JSON_INDEX_FILENAME: str = 'index.json'
JSONL_INDEX_FILENAME: str = 'index.jsonl'
HTML_INDEX_FILENAME: str = 'index.html'
ROBOTS_TXT_FILENAME: str = 'robots.txt'
FAVICON_FILENAME: str = 'favicon.ico'

View File

@@ -415,10 +415,11 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
Transaction handling:
1. Copy files INSIDE transaction
2. Create symlink INSIDE transaction
3. Update fs_version INSIDE transaction (done by save())
4. Exit transaction (DB commit)
5. Delete old files OUTSIDE transaction (after commit)
2. Convert index.json to index.jsonl INSIDE transaction
3. Create symlink INSIDE transaction
4. Update fs_version INSIDE transaction (done by save())
5. Exit transaction (DB commit)
6. Delete old files OUTSIDE transaction (after commit)
"""
import shutil
from django.db import transaction
@@ -427,11 +428,13 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
new_dir = self.get_storage_path_for_version('0.9.0')
if not old_dir.exists() or old_dir == new_dir or new_dir.exists():
# Even if no directory migration needed, still convert index format
self.convert_index_json_to_jsonl()
return
new_dir.mkdir(parents=True, exist_ok=True)
# Copy all files (idempotent)
# Copy all files (idempotent), skipping index.json (will be converted to jsonl)
for old_file in old_dir.rglob('*'):
if not old_file.is_file():
continue
@@ -456,6 +459,9 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
missing = old_files.keys() - new_files.keys()
raise Exception(f"Migration incomplete: missing {missing}")
# Convert index.json to index.jsonl in the new directory
self.convert_index_json_to_jsonl()
# Create backwards-compat symlink (INSIDE transaction)
symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
if symlink_path.is_symlink():
@@ -557,9 +563,9 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
@classmethod
def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
"""
Load existing Snapshot from DB by reading index.json.
Load existing Snapshot from DB by reading index.jsonl or index.json.
Reads index.json, extracts url+timestamp, queries DB.
Reads index file, extracts url+timestamp, queries DB.
Returns existing Snapshot or None if not found/invalid.
Does NOT create new snapshots.
@@ -567,21 +573,38 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
"""
import json
index_path = snapshot_dir / 'index.json'
if not index_path.exists():
return None
# Try index.jsonl first (new format), then index.json (legacy)
jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
try:
with open(index_path) as f:
data = json.load(f)
except:
data = None
if jsonl_path.exists():
try:
with open(jsonl_path) as f:
for line in f:
line = line.strip()
if line.startswith('{'):
record = json.loads(line)
if record.get('type') == 'Snapshot':
data = record
break
except:
pass
elif json_path.exists():
try:
with open(json_path) as f:
data = json.load(f)
except:
pass
if not data:
return None
url = data.get('url')
if not url:
return None
# Get timestamp - prefer index.json, fallback to folder name
# Get timestamp - prefer index file, fallback to folder name
timestamp = cls._select_best_timestamp(
index_timestamp=data.get('timestamp'),
folder_name=snapshot_dir.name
@@ -611,14 +634,31 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
"""
import json
index_path = snapshot_dir / 'index.json'
if not index_path.exists():
return None
# Try index.jsonl first (new format), then index.json (legacy)
jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
try:
with open(index_path) as f:
data = json.load(f)
except:
data = None
if jsonl_path.exists():
try:
with open(jsonl_path) as f:
for line in f:
line = line.strip()
if line.startswith('{'):
record = json.loads(line)
if record.get('type') == 'Snapshot':
data = record
break
except:
pass
elif json_path.exists():
try:
with open(json_path) as f:
data = json.load(f)
except:
pass
if not data:
return None
url = data.get('url')
@@ -721,26 +761,40 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
# Index.json Reconciliation
# =========================================================================
def reconcile_with_index_json(self):
def reconcile_with_index(self):
"""
Merge index.json with DB. DB is source of truth.
Merge index.json/index.jsonl with DB. DB is source of truth.
- Title: longest non-URL
- Tags: union
- ArchiveResults: keep both (by plugin+start_ts)
Writes back in 0.9.x format.
Converts index.json to index.jsonl if needed, then writes back in JSONL format.
Used by: archivebox update (to sync index.json with DB)
Used by: archivebox update (to sync index with DB)
"""
import json
index_path = Path(self.output_dir) / 'index.json'
# Try to convert index.json to index.jsonl first
self.convert_index_json_to_jsonl()
# Check for index.jsonl (preferred) or index.json (legacy)
jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
index_data = {}
if index_path.exists():
if jsonl_path.exists():
# Read from JSONL format
jsonl_data = self.read_index_jsonl()
if jsonl_data['snapshot']:
index_data = jsonl_data['snapshot']
# Convert archive_results list to expected format
index_data['archive_results'] = jsonl_data['archive_results']
elif json_path.exists():
# Fallback to legacy JSON format
try:
with open(index_path) as f:
with open(json_path) as f:
index_data = json.load(f)
except:
pass
@@ -754,8 +808,12 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
# Merge ArchiveResults
self._merge_archive_results_from_index(index_data)
# Write back
self.write_index_json()
# Write back in JSONL format
self.write_index_jsonl()
def reconcile_with_index_json(self):
"""Deprecated: use reconcile_with_index() instead."""
return self.reconcile_with_index()
def _merge_title_from_index(self, index_data: dict):
"""Merge title - prefer longest non-URL title."""
@@ -831,12 +889,15 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
except:
pass
# Support both 'output' (legacy) and 'output_str' (new JSONL) field names
output_str = result_data.get('output_str') or result_data.get('output', '')
ArchiveResult.objects.create(
snapshot=self,
plugin=plugin,
hook_name=result_data.get('hook_name', ''),
status=result_data.get('status', 'failed'),
output_str=result_data.get('output', ''),
output_str=output_str,
cmd=result_data.get('cmd', []),
pwd=result_data.get('pwd', str(self.output_dir)),
start_ts=start_ts,
@@ -846,7 +907,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
pass
def write_index_json(self):
"""Write index.json in 0.9.x format."""
"""Write index.json in 0.9.x format (deprecated, use write_index_jsonl)."""
import json
index_path = Path(self.output_dir) / 'index.json'
@@ -877,6 +938,181 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
with open(index_path, 'w') as f:
json.dump(data, f, indent=2, sort_keys=True)
def write_index_jsonl(self):
"""
Write index.jsonl in flat JSONL format.
Each line is a JSON record with a 'type' field:
- Snapshot: snapshot metadata (crawl_id, url, tags, etc.)
- ArchiveResult: extractor results (plugin, status, output, etc.)
- Binary: binary info used for the extraction
- Process: process execution details (cmd, exit_code, timing, etc.)
"""
import json
from archivebox.misc.jsonl import (
snapshot_to_jsonl, archiveresult_to_jsonl,
binary_to_jsonl, process_to_jsonl,
)
index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
index_path.parent.mkdir(parents=True, exist_ok=True)
# Collect unique binaries and processes from archive results
binaries_seen = set()
processes_seen = set()
with open(index_path, 'w') as f:
# Write Snapshot record first
snapshot_record = snapshot_to_jsonl(self)
snapshot_record['crawl_id'] = str(self.crawl_id) if self.crawl_id else None
snapshot_record['fs_version'] = self.fs_version
f.write(json.dumps(snapshot_record) + '\n')
# Write ArchiveResult records with their associated Binary and Process
for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts'):
# Write Binary record if not already written
if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen:
binaries_seen.add(ar.process.binary_id)
f.write(json.dumps(binary_to_jsonl(ar.process.binary)) + '\n')
# Write Process record if not already written
if ar.process and ar.process_id not in processes_seen:
processes_seen.add(ar.process_id)
f.write(json.dumps(process_to_jsonl(ar.process)) + '\n')
# Write ArchiveResult record
ar_record = archiveresult_to_jsonl(ar)
if ar.process_id:
ar_record['process_id'] = str(ar.process_id)
f.write(json.dumps(ar_record) + '\n')
def read_index_jsonl(self) -> dict:
"""
Read index.jsonl and return parsed records grouped by type.
Returns dict with keys: 'snapshot', 'archive_results', 'binaries', 'processes'
"""
import json
from archivebox.misc.jsonl import (
TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY, TYPE_PROCESS,
)
index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
result = {
'snapshot': None,
'archive_results': [],
'binaries': [],
'processes': [],
}
if not index_path.exists():
return result
with open(index_path, 'r') as f:
for line in f:
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
record = json.loads(line)
record_type = record.get('type')
if record_type == TYPE_SNAPSHOT:
result['snapshot'] = record
elif record_type == TYPE_ARCHIVERESULT:
result['archive_results'].append(record)
elif record_type == TYPE_BINARY:
result['binaries'].append(record)
elif record_type == TYPE_PROCESS:
result['processes'].append(record)
except json.JSONDecodeError:
continue
return result
def convert_index_json_to_jsonl(self) -> bool:
"""
Convert index.json to index.jsonl format.
Reads existing index.json, creates index.jsonl, and removes index.json.
Returns True if conversion was performed, False if no conversion needed.
"""
import json
json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
# Skip if already converted or no json file exists
if jsonl_path.exists() or not json_path.exists():
return False
try:
with open(json_path, 'r') as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
return False
# Detect format version and extract records
fs_version = data.get('fs_version', '0.7.0')
jsonl_path.parent.mkdir(parents=True, exist_ok=True)
with open(jsonl_path, 'w') as f:
# Write Snapshot record
snapshot_record = {
'type': 'Snapshot',
'id': str(self.id),
'crawl_id': str(self.crawl_id) if self.crawl_id else None,
'url': data.get('url', self.url),
'timestamp': data.get('timestamp', self.timestamp),
'title': data.get('title', self.title or ''),
'tags': data.get('tags', ''),
'fs_version': fs_version,
'bookmarked_at': data.get('bookmarked_at'),
'created_at': data.get('created_at'),
}
f.write(json.dumps(snapshot_record) + '\n')
# Handle 0.8.x/0.9.x format (archive_results list)
for result_data in data.get('archive_results', []):
ar_record = {
'type': 'ArchiveResult',
'snapshot_id': str(self.id),
'plugin': result_data.get('plugin', ''),
'status': result_data.get('status', ''),
'output_str': result_data.get('output', ''),
'start_ts': result_data.get('start_ts'),
'end_ts': result_data.get('end_ts'),
}
if result_data.get('cmd'):
ar_record['cmd'] = result_data['cmd']
f.write(json.dumps(ar_record) + '\n')
# Handle 0.7.x format (history dict)
if 'history' in data and isinstance(data['history'], dict):
for plugin, result_list in data['history'].items():
if not isinstance(result_list, list):
continue
for result_data in result_list:
ar_record = {
'type': 'ArchiveResult',
'snapshot_id': str(self.id),
'plugin': result_data.get('plugin') or result_data.get('extractor') or plugin,
'status': result_data.get('status', ''),
'output_str': result_data.get('output', ''),
'start_ts': result_data.get('start_ts'),
'end_ts': result_data.get('end_ts'),
}
if result_data.get('cmd'):
ar_record['cmd'] = result_data['cmd']
f.write(json.dumps(ar_record) + '\n')
# Remove old index.json after successful conversion
try:
json_path.unlink()
except OSError:
pass
return True
# =========================================================================
# Snapshot Utilities
# =========================================================================

View File

@@ -28,8 +28,10 @@ TYPE_ARCHIVERESULT = 'ArchiveResult'
TYPE_TAG = 'Tag'
TYPE_CRAWL = 'Crawl'
TYPE_BINARY = 'Binary'
TYPE_PROCESS = 'Process'
TYPE_MACHINE = 'Machine'
VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY}
VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY, TYPE_PROCESS, TYPE_MACHINE}
def parse_line(line: str) -> Optional[Dict[str, Any]]:
@@ -227,6 +229,64 @@ def crawl_to_jsonl(crawl) -> Dict[str, Any]:
}
def binary_to_jsonl(binary) -> Dict[str, Any]:
"""
Convert a Binary model instance to a JSONL record.
"""
return {
'type': TYPE_BINARY,
'id': str(binary.id),
'machine_id': str(binary.machine_id),
'name': binary.name,
'binprovider': binary.binprovider,
'abspath': binary.abspath,
'version': binary.version,
'sha256': binary.sha256,
'status': binary.status,
}
def process_to_jsonl(process) -> Dict[str, Any]:
"""
Convert a Process model instance to a JSONL record.
"""
record = {
'type': TYPE_PROCESS,
'id': str(process.id),
'machine_id': str(process.machine_id),
'cmd': process.cmd,
'pwd': process.pwd,
'status': process.status,
'exit_code': process.exit_code,
'started_at': process.started_at.isoformat() if process.started_at else None,
'ended_at': process.ended_at.isoformat() if process.ended_at else None,
}
# Include optional fields if set
if process.binary_id:
record['binary_id'] = str(process.binary_id)
if process.pid:
record['pid'] = process.pid
if process.timeout:
record['timeout'] = process.timeout
return record
def machine_to_jsonl(machine) -> Dict[str, Any]:
"""
Convert a Machine model instance to a JSONL record.
"""
return {
'type': TYPE_MACHINE,
'id': str(machine.id),
'guid': machine.guid,
'hostname': machine.hostname,
'os_arch': machine.os_arch,
'os_family': machine.os_family,
'os_platform': machine.os_platform,
'os_release': machine.os_release,
}
def process_records(
records: Iterator[Dict[str, Any]],
handlers: Dict[str, Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]]

View File

@@ -58,9 +58,10 @@ def parse_json_main_index(out_dir: Path) -> Iterator[SnapshotDict]:
def parse_json_links_details(out_dir: Path) -> Iterator[SnapshotDict]:
"""
Parse links from individual snapshot index.json files in archive directories.
Parse links from individual snapshot index.jsonl/index.json files in archive directories.
Walks through archive/*/index.json files to discover orphaned snapshots.
Walks through archive/*/index.jsonl and archive/*/index.json files to discover orphaned snapshots.
Prefers index.jsonl (new format) over index.json (legacy format).
"""
from archivebox.config import CONSTANTS
@@ -72,19 +73,36 @@ def parse_json_links_details(out_dir: Path) -> Iterator[SnapshotDict]:
if not entry.is_dir():
continue
index_file = Path(entry.path) / 'index.json'
if not index_file.exists():
continue
# Try index.jsonl first (new format)
jsonl_file = Path(entry.path) / CONSTANTS.JSONL_INDEX_FILENAME
json_file = Path(entry.path) / CONSTANTS.JSON_INDEX_FILENAME
try:
with open(index_file, 'r', encoding='utf-8') as f:
link = json.load(f)
link = None
if jsonl_file.exists():
try:
with open(jsonl_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line.startswith('{'):
record = json.loads(line)
if record.get('type') == 'Snapshot':
link = record
break
except (json.JSONDecodeError, KeyError, TypeError):
pass
elif json_file.exists():
try:
with open(json_file, 'r', encoding='utf-8') as f:
link = json.load(f)
except (json.JSONDecodeError, KeyError, TypeError):
pass
if link:
yield {
'url': link.get('url', ''),
'timestamp': link.get('timestamp', entry.name),
'title': link.get('title'),
'tags': link.get('tags', ''),
}
except (json.JSONDecodeError, KeyError, TypeError):
continue