From d36079829bed32d71b2a1a5e8e6019457d6a7ae7 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 30 Dec 2025 18:21:06 +0000 Subject: [PATCH] feat: replace index.json with index.jsonl flat JSONL format Switch from hierarchical index.json to flat index.jsonl format for snapshot metadata storage. Each line is a self-contained JSON record with a 'type' field (Snapshot, ArchiveResult, Binary, Process). Changes: - Add JSONL_INDEX_FILENAME constant to constants.py - Add TYPE_PROCESS and TYPE_MACHINE to jsonl.py type constants - Add binary_to_jsonl(), process_to_jsonl(), machine_to_jsonl() converters - Add Snapshot.write_index_jsonl() to write new format - Add Snapshot.read_index_jsonl() to read new format - Add Snapshot.convert_index_json_to_jsonl() for migration - Update Snapshot.reconcile_with_index() to handle both formats - Update fs_migrate to convert during filesystem migration - Update load_from_directory/create_from_directory for both formats - Update legacy.py parse_json_links_details for JSONL support The new format is easier to parse, extend, and mix record types. --- archivebox/config/constants.py | 1 + archivebox/core/models.py | 302 +++++++++++++++++++++++++++++---- archivebox/misc/jsonl.py | 62 ++++++- archivebox/misc/legacy.py | 38 +++-- 4 files changed, 359 insertions(+), 44 deletions(-) diff --git a/archivebox/config/constants.py b/archivebox/config/constants.py index a5c29ff4..30f0246b 100644 --- a/archivebox/config/constants.py +++ b/archivebox/config/constants.py @@ -100,6 +100,7 @@ class ConstantsDict(Mapping): DATABASE_FILE: Path = DATA_DIR / SQL_INDEX_FILENAME JSON_INDEX_FILENAME: str = 'index.json' + JSONL_INDEX_FILENAME: str = 'index.jsonl' HTML_INDEX_FILENAME: str = 'index.html' ROBOTS_TXT_FILENAME: str = 'robots.txt' FAVICON_FILENAME: str = 'favicon.ico' diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 0a94df61..b5992c22 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -415,10 +415,11 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea Transaction handling: 1. Copy files INSIDE transaction - 2. Create symlink INSIDE transaction - 3. Update fs_version INSIDE transaction (done by save()) - 4. Exit transaction (DB commit) - 5. Delete old files OUTSIDE transaction (after commit) + 2. Convert index.json to index.jsonl INSIDE transaction + 3. Create symlink INSIDE transaction + 4. Update fs_version INSIDE transaction (done by save()) + 5. Exit transaction (DB commit) + 6. Delete old files OUTSIDE transaction (after commit) """ import shutil from django.db import transaction @@ -427,11 +428,13 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea new_dir = self.get_storage_path_for_version('0.9.0') if not old_dir.exists() or old_dir == new_dir or new_dir.exists(): + # Even if no directory migration needed, still convert index format + self.convert_index_json_to_jsonl() return new_dir.mkdir(parents=True, exist_ok=True) - # Copy all files (idempotent) + # Copy all files (idempotent), skipping index.json (will be converted to jsonl) for old_file in old_dir.rglob('*'): if not old_file.is_file(): continue @@ -456,6 +459,9 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea missing = old_files.keys() - new_files.keys() raise Exception(f"Migration incomplete: missing {missing}") + # Convert index.json to index.jsonl in the new directory + self.convert_index_json_to_jsonl() + # Create backwards-compat symlink (INSIDE transaction) symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp if symlink_path.is_symlink(): @@ -557,9 +563,9 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea @classmethod def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']: """ - Load existing Snapshot from DB by reading index.json. + Load existing Snapshot from DB by reading index.jsonl or index.json. - Reads index.json, extracts url+timestamp, queries DB. + Reads index file, extracts url+timestamp, queries DB. Returns existing Snapshot or None if not found/invalid. Does NOT create new snapshots. @@ -567,21 +573,38 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea """ import json - index_path = snapshot_dir / 'index.json' - if not index_path.exists(): - return None + # Try index.jsonl first (new format), then index.json (legacy) + jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME + json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME - try: - with open(index_path) as f: - data = json.load(f) - except: + data = None + if jsonl_path.exists(): + try: + with open(jsonl_path) as f: + for line in f: + line = line.strip() + if line.startswith('{'): + record = json.loads(line) + if record.get('type') == 'Snapshot': + data = record + break + except: + pass + elif json_path.exists(): + try: + with open(json_path) as f: + data = json.load(f) + except: + pass + + if not data: return None url = data.get('url') if not url: return None - # Get timestamp - prefer index.json, fallback to folder name + # Get timestamp - prefer index file, fallback to folder name timestamp = cls._select_best_timestamp( index_timestamp=data.get('timestamp'), folder_name=snapshot_dir.name @@ -611,14 +634,31 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea """ import json - index_path = snapshot_dir / 'index.json' - if not index_path.exists(): - return None + # Try index.jsonl first (new format), then index.json (legacy) + jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME + json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME - try: - with open(index_path) as f: - data = json.load(f) - except: + data = None + if jsonl_path.exists(): + try: + with open(jsonl_path) as f: + for line in f: + line = line.strip() + if line.startswith('{'): + record = json.loads(line) + if record.get('type') == 'Snapshot': + data = record + break + except: + pass + elif json_path.exists(): + try: + with open(json_path) as f: + data = json.load(f) + except: + pass + + if not data: return None url = data.get('url') @@ -721,26 +761,40 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea # Index.json Reconciliation # ========================================================================= - def reconcile_with_index_json(self): + def reconcile_with_index(self): """ - Merge index.json with DB. DB is source of truth. + Merge index.json/index.jsonl with DB. DB is source of truth. - Title: longest non-URL - Tags: union - ArchiveResults: keep both (by plugin+start_ts) - Writes back in 0.9.x format. + Converts index.json to index.jsonl if needed, then writes back in JSONL format. - Used by: archivebox update (to sync index.json with DB) + Used by: archivebox update (to sync index with DB) """ import json - index_path = Path(self.output_dir) / 'index.json' + # Try to convert index.json to index.jsonl first + self.convert_index_json_to_jsonl() + + # Check for index.jsonl (preferred) or index.json (legacy) + jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME + json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME index_data = {} - if index_path.exists(): + + if jsonl_path.exists(): + # Read from JSONL format + jsonl_data = self.read_index_jsonl() + if jsonl_data['snapshot']: + index_data = jsonl_data['snapshot'] + # Convert archive_results list to expected format + index_data['archive_results'] = jsonl_data['archive_results'] + elif json_path.exists(): + # Fallback to legacy JSON format try: - with open(index_path) as f: + with open(json_path) as f: index_data = json.load(f) except: pass @@ -754,8 +808,12 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea # Merge ArchiveResults self._merge_archive_results_from_index(index_data) - # Write back - self.write_index_json() + # Write back in JSONL format + self.write_index_jsonl() + + def reconcile_with_index_json(self): + """Deprecated: use reconcile_with_index() instead.""" + return self.reconcile_with_index() def _merge_title_from_index(self, index_data: dict): """Merge title - prefer longest non-URL title.""" @@ -831,12 +889,15 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea except: pass + # Support both 'output' (legacy) and 'output_str' (new JSONL) field names + output_str = result_data.get('output_str') or result_data.get('output', '') + ArchiveResult.objects.create( snapshot=self, plugin=plugin, hook_name=result_data.get('hook_name', ''), status=result_data.get('status', 'failed'), - output_str=result_data.get('output', ''), + output_str=output_str, cmd=result_data.get('cmd', []), pwd=result_data.get('pwd', str(self.output_dir)), start_ts=start_ts, @@ -846,7 +907,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea pass def write_index_json(self): - """Write index.json in 0.9.x format.""" + """Write index.json in 0.9.x format (deprecated, use write_index_jsonl).""" import json index_path = Path(self.output_dir) / 'index.json' @@ -877,6 +938,181 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea with open(index_path, 'w') as f: json.dump(data, f, indent=2, sort_keys=True) + def write_index_jsonl(self): + """ + Write index.jsonl in flat JSONL format. + + Each line is a JSON record with a 'type' field: + - Snapshot: snapshot metadata (crawl_id, url, tags, etc.) + - ArchiveResult: extractor results (plugin, status, output, etc.) + - Binary: binary info used for the extraction + - Process: process execution details (cmd, exit_code, timing, etc.) + """ + import json + from archivebox.misc.jsonl import ( + snapshot_to_jsonl, archiveresult_to_jsonl, + binary_to_jsonl, process_to_jsonl, + ) + + index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME + index_path.parent.mkdir(parents=True, exist_ok=True) + + # Collect unique binaries and processes from archive results + binaries_seen = set() + processes_seen = set() + + with open(index_path, 'w') as f: + # Write Snapshot record first + snapshot_record = snapshot_to_jsonl(self) + snapshot_record['crawl_id'] = str(self.crawl_id) if self.crawl_id else None + snapshot_record['fs_version'] = self.fs_version + f.write(json.dumps(snapshot_record) + '\n') + + # Write ArchiveResult records with their associated Binary and Process + for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts'): + # Write Binary record if not already written + if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen: + binaries_seen.add(ar.process.binary_id) + f.write(json.dumps(binary_to_jsonl(ar.process.binary)) + '\n') + + # Write Process record if not already written + if ar.process and ar.process_id not in processes_seen: + processes_seen.add(ar.process_id) + f.write(json.dumps(process_to_jsonl(ar.process)) + '\n') + + # Write ArchiveResult record + ar_record = archiveresult_to_jsonl(ar) + if ar.process_id: + ar_record['process_id'] = str(ar.process_id) + f.write(json.dumps(ar_record) + '\n') + + def read_index_jsonl(self) -> dict: + """ + Read index.jsonl and return parsed records grouped by type. + + Returns dict with keys: 'snapshot', 'archive_results', 'binaries', 'processes' + """ + import json + from archivebox.misc.jsonl import ( + TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY, TYPE_PROCESS, + ) + + index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME + result = { + 'snapshot': None, + 'archive_results': [], + 'binaries': [], + 'processes': [], + } + + if not index_path.exists(): + return result + + with open(index_path, 'r') as f: + for line in f: + line = line.strip() + if not line or not line.startswith('{'): + continue + try: + record = json.loads(line) + record_type = record.get('type') + if record_type == TYPE_SNAPSHOT: + result['snapshot'] = record + elif record_type == TYPE_ARCHIVERESULT: + result['archive_results'].append(record) + elif record_type == TYPE_BINARY: + result['binaries'].append(record) + elif record_type == TYPE_PROCESS: + result['processes'].append(record) + except json.JSONDecodeError: + continue + + return result + + def convert_index_json_to_jsonl(self) -> bool: + """ + Convert index.json to index.jsonl format. + + Reads existing index.json, creates index.jsonl, and removes index.json. + Returns True if conversion was performed, False if no conversion needed. + """ + import json + + json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME + jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME + + # Skip if already converted or no json file exists + if jsonl_path.exists() or not json_path.exists(): + return False + + try: + with open(json_path, 'r') as f: + data = json.load(f) + except (json.JSONDecodeError, OSError): + return False + + # Detect format version and extract records + fs_version = data.get('fs_version', '0.7.0') + + jsonl_path.parent.mkdir(parents=True, exist_ok=True) + with open(jsonl_path, 'w') as f: + # Write Snapshot record + snapshot_record = { + 'type': 'Snapshot', + 'id': str(self.id), + 'crawl_id': str(self.crawl_id) if self.crawl_id else None, + 'url': data.get('url', self.url), + 'timestamp': data.get('timestamp', self.timestamp), + 'title': data.get('title', self.title or ''), + 'tags': data.get('tags', ''), + 'fs_version': fs_version, + 'bookmarked_at': data.get('bookmarked_at'), + 'created_at': data.get('created_at'), + } + f.write(json.dumps(snapshot_record) + '\n') + + # Handle 0.8.x/0.9.x format (archive_results list) + for result_data in data.get('archive_results', []): + ar_record = { + 'type': 'ArchiveResult', + 'snapshot_id': str(self.id), + 'plugin': result_data.get('plugin', ''), + 'status': result_data.get('status', ''), + 'output_str': result_data.get('output', ''), + 'start_ts': result_data.get('start_ts'), + 'end_ts': result_data.get('end_ts'), + } + if result_data.get('cmd'): + ar_record['cmd'] = result_data['cmd'] + f.write(json.dumps(ar_record) + '\n') + + # Handle 0.7.x format (history dict) + if 'history' in data and isinstance(data['history'], dict): + for plugin, result_list in data['history'].items(): + if not isinstance(result_list, list): + continue + for result_data in result_list: + ar_record = { + 'type': 'ArchiveResult', + 'snapshot_id': str(self.id), + 'plugin': result_data.get('plugin') or result_data.get('extractor') or plugin, + 'status': result_data.get('status', ''), + 'output_str': result_data.get('output', ''), + 'start_ts': result_data.get('start_ts'), + 'end_ts': result_data.get('end_ts'), + } + if result_data.get('cmd'): + ar_record['cmd'] = result_data['cmd'] + f.write(json.dumps(ar_record) + '\n') + + # Remove old index.json after successful conversion + try: + json_path.unlink() + except OSError: + pass + + return True + # ========================================================================= # Snapshot Utilities # ========================================================================= diff --git a/archivebox/misc/jsonl.py b/archivebox/misc/jsonl.py index 88081ea6..ad7c3557 100644 --- a/archivebox/misc/jsonl.py +++ b/archivebox/misc/jsonl.py @@ -28,8 +28,10 @@ TYPE_ARCHIVERESULT = 'ArchiveResult' TYPE_TAG = 'Tag' TYPE_CRAWL = 'Crawl' TYPE_BINARY = 'Binary' +TYPE_PROCESS = 'Process' +TYPE_MACHINE = 'Machine' -VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY} +VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY, TYPE_PROCESS, TYPE_MACHINE} def parse_line(line: str) -> Optional[Dict[str, Any]]: @@ -227,6 +229,64 @@ def crawl_to_jsonl(crawl) -> Dict[str, Any]: } +def binary_to_jsonl(binary) -> Dict[str, Any]: + """ + Convert a Binary model instance to a JSONL record. + """ + return { + 'type': TYPE_BINARY, + 'id': str(binary.id), + 'machine_id': str(binary.machine_id), + 'name': binary.name, + 'binprovider': binary.binprovider, + 'abspath': binary.abspath, + 'version': binary.version, + 'sha256': binary.sha256, + 'status': binary.status, + } + + +def process_to_jsonl(process) -> Dict[str, Any]: + """ + Convert a Process model instance to a JSONL record. + """ + record = { + 'type': TYPE_PROCESS, + 'id': str(process.id), + 'machine_id': str(process.machine_id), + 'cmd': process.cmd, + 'pwd': process.pwd, + 'status': process.status, + 'exit_code': process.exit_code, + 'started_at': process.started_at.isoformat() if process.started_at else None, + 'ended_at': process.ended_at.isoformat() if process.ended_at else None, + } + # Include optional fields if set + if process.binary_id: + record['binary_id'] = str(process.binary_id) + if process.pid: + record['pid'] = process.pid + if process.timeout: + record['timeout'] = process.timeout + return record + + +def machine_to_jsonl(machine) -> Dict[str, Any]: + """ + Convert a Machine model instance to a JSONL record. + """ + return { + 'type': TYPE_MACHINE, + 'id': str(machine.id), + 'guid': machine.guid, + 'hostname': machine.hostname, + 'os_arch': machine.os_arch, + 'os_family': machine.os_family, + 'os_platform': machine.os_platform, + 'os_release': machine.os_release, + } + + def process_records( records: Iterator[Dict[str, Any]], handlers: Dict[str, Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]] diff --git a/archivebox/misc/legacy.py b/archivebox/misc/legacy.py index 7328670f..5dfb787d 100644 --- a/archivebox/misc/legacy.py +++ b/archivebox/misc/legacy.py @@ -58,9 +58,10 @@ def parse_json_main_index(out_dir: Path) -> Iterator[SnapshotDict]: def parse_json_links_details(out_dir: Path) -> Iterator[SnapshotDict]: """ - Parse links from individual snapshot index.json files in archive directories. + Parse links from individual snapshot index.jsonl/index.json files in archive directories. - Walks through archive/*/index.json files to discover orphaned snapshots. + Walks through archive/*/index.jsonl and archive/*/index.json files to discover orphaned snapshots. + Prefers index.jsonl (new format) over index.json (legacy format). """ from archivebox.config import CONSTANTS @@ -72,19 +73,36 @@ def parse_json_links_details(out_dir: Path) -> Iterator[SnapshotDict]: if not entry.is_dir(): continue - index_file = Path(entry.path) / 'index.json' - if not index_file.exists(): - continue + # Try index.jsonl first (new format) + jsonl_file = Path(entry.path) / CONSTANTS.JSONL_INDEX_FILENAME + json_file = Path(entry.path) / CONSTANTS.JSON_INDEX_FILENAME - try: - with open(index_file, 'r', encoding='utf-8') as f: - link = json.load(f) + link = None + if jsonl_file.exists(): + try: + with open(jsonl_file, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line.startswith('{'): + record = json.loads(line) + if record.get('type') == 'Snapshot': + link = record + break + except (json.JSONDecodeError, KeyError, TypeError): + pass + + elif json_file.exists(): + try: + with open(json_file, 'r', encoding='utf-8') as f: + link = json.load(f) + except (json.JSONDecodeError, KeyError, TypeError): + pass + + if link: yield { 'url': link.get('url', ''), 'timestamp': link.get('timestamp', entry.name), 'title': link.get('title'), 'tags': link.get('tags', ''), } - except (json.JSONDecodeError, KeyError, TypeError): - continue