refactor: move to_jsonl() methods to models

Move JSONL serialization from standalone functions to model methods
to mirror the from_jsonl() pattern:

- Add Binary.to_jsonl() method
- Add Process.to_jsonl() method
- Add ArchiveResult.to_jsonl() method
- Add Snapshot.to_jsonl() method
- Update write_index_jsonl() to use model methods
- Update jsonl.py functions to be thin wrappers
This commit is contained in:
Claude
2025-12-30 18:35:22 +00:00
parent d36079829b
commit a5206e7648
3 changed files with 102 additions and 78 deletions

View File

@@ -949,10 +949,6 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
- Process: process execution details (cmd, exit_code, timing, etc.)
"""
import json
from archivebox.misc.jsonl import (
snapshot_to_jsonl, archiveresult_to_jsonl,
binary_to_jsonl, process_to_jsonl,
)
index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
index_path.parent.mkdir(parents=True, exist_ok=True)
@@ -963,7 +959,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
with open(index_path, 'w') as f:
# Write Snapshot record first
snapshot_record = snapshot_to_jsonl(self)
snapshot_record = self.to_jsonl()
snapshot_record['crawl_id'] = str(self.crawl_id) if self.crawl_id else None
snapshot_record['fs_version'] = self.fs_version
f.write(json.dumps(snapshot_record) + '\n')
@@ -973,18 +969,15 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
# Write Binary record if not already written
if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen:
binaries_seen.add(ar.process.binary_id)
f.write(json.dumps(binary_to_jsonl(ar.process.binary)) + '\n')
f.write(json.dumps(ar.process.binary.to_jsonl()) + '\n')
# Write Process record if not already written
if ar.process and ar.process_id not in processes_seen:
processes_seen.add(ar.process_id)
f.write(json.dumps(process_to_jsonl(ar.process)) + '\n')
f.write(json.dumps(ar.process.to_jsonl()) + '\n')
# Write ArchiveResult record
ar_record = archiveresult_to_jsonl(ar)
if ar.process_id:
ar_record['process_id'] = str(ar.process_id)
f.write(json.dumps(ar_record) + '\n')
f.write(json.dumps(ar.to_jsonl()) + '\n')
def read_index_jsonl(self) -> dict:
"""
@@ -1405,6 +1398,23 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
return False
def to_jsonl(self) -> dict:
"""
Convert Snapshot model instance to a JSONL record.
"""
return {
'type': 'Snapshot',
'id': str(self.id),
'url': self.url,
'title': self.title,
'tags': self.tags_str() if hasattr(self, 'tags_str') else '',
'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'timestamp': self.timestamp,
'depth': getattr(self, 'depth', 0),
'status': self.status if hasattr(self, 'status') else None,
}
@staticmethod
def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None, queue_for_extraction: bool = True):
"""
@@ -2237,6 +2247,38 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
"""Convenience property to access the user who created this archive result via its snapshot's crawl."""
return self.snapshot.crawl.created_by
def to_jsonl(self) -> dict:
"""
Convert ArchiveResult model instance to a JSONL record.
"""
record = {
'type': 'ArchiveResult',
'id': str(self.id),
'snapshot_id': str(self.snapshot_id),
'plugin': self.plugin,
'hook_name': self.hook_name,
'status': self.status,
'output_str': self.output_str,
'start_ts': self.start_ts.isoformat() if self.start_ts else None,
'end_ts': self.end_ts.isoformat() if self.end_ts else None,
}
# Include optional fields if set
if self.output_json:
record['output_json'] = self.output_json
if self.output_files:
record['output_files'] = self.output_files
if self.output_size:
record['output_size'] = self.output_size
if self.output_mimetypes:
record['output_mimetypes'] = self.output_mimetypes
if self.cmd:
record['cmd'] = self.cmd
if self.cmd_version:
record['cmd_version'] = self.cmd_version
if self.process_id:
record['process_id'] = str(self.process_id)
return record
def save(self, *args, **kwargs):
is_new = self._state.adding

View File

@@ -242,6 +242,22 @@ class Binary(ModelWithHealthStats):
'is_valid': self.is_valid,
}
def to_jsonl(self) -> dict:
"""
Convert Binary model instance to a JSONL record.
"""
return {
'type': 'Binary',
'id': str(self.id),
'machine_id': str(self.machine_id),
'name': self.name,
'binprovider': self.binprovider,
'abspath': self.abspath,
'version': self.version,
'sha256': self.sha256,
'status': self.status,
}
@staticmethod
def from_jsonl(record: dict, overrides: dict = None):
"""
@@ -606,6 +622,30 @@ class Process(ModelWithHealthStats):
return self.archiveresult.hook_name
return ''
def to_jsonl(self) -> dict:
"""
Convert Process model instance to a JSONL record.
"""
record = {
'type': 'Process',
'id': str(self.id),
'machine_id': str(self.machine_id),
'cmd': self.cmd,
'pwd': self.pwd,
'status': self.status,
'exit_code': self.exit_code,
'started_at': self.started_at.isoformat() if self.started_at else None,
'ended_at': self.ended_at.isoformat() if self.ended_at else None,
}
# Include optional fields if set
if self.binary_id:
record['binary_id'] = str(self.binary_id)
if self.pid:
record['pid'] = self.pid
if self.timeout:
record['timeout'] = self.timeout
return record
def update_and_requeue(self, **kwargs):
"""
Update process fields and requeue for worker state machine.

View File

@@ -157,50 +157,17 @@ def filter_by_type(records: Iterator[Dict[str, Any]], record_type: str) -> Itera
def snapshot_to_jsonl(snapshot) -> Dict[str, Any]:
"""
Convert a Snapshot model instance to a JSONL record.
Wrapper that calls snapshot.to_jsonl() method.
"""
return {
'type': TYPE_SNAPSHOT,
'id': str(snapshot.id),
'url': snapshot.url,
'title': snapshot.title,
'tags': snapshot.tags_str() if hasattr(snapshot, 'tags_str') else '',
'bookmarked_at': snapshot.bookmarked_at.isoformat() if snapshot.bookmarked_at else None,
'created_at': snapshot.created_at.isoformat() if snapshot.created_at else None,
'timestamp': snapshot.timestamp,
'depth': getattr(snapshot, 'depth', 0),
'status': snapshot.status if hasattr(snapshot, 'status') else None,
}
return snapshot.to_jsonl()
def archiveresult_to_jsonl(result) -> Dict[str, Any]:
"""
Convert an ArchiveResult model instance to a JSONL record.
Wrapper that calls result.to_jsonl() method.
"""
record = {
'type': TYPE_ARCHIVERESULT,
'id': str(result.id),
'snapshot_id': str(result.snapshot_id),
'plugin': result.plugin,
'hook_name': result.hook_name,
'status': result.status,
'output_str': result.output_str,
'start_ts': result.start_ts.isoformat() if result.start_ts else None,
'end_ts': result.end_ts.isoformat() if result.end_ts else None,
}
# Include optional fields if set
if result.output_json:
record['output_json'] = result.output_json
if result.output_files:
record['output_files'] = result.output_files
if result.output_size:
record['output_size'] = result.output_size
if result.output_mimetypes:
record['output_mimetypes'] = result.output_mimetypes
if result.cmd:
record['cmd'] = result.cmd
if result.cmd_version:
record['cmd_version'] = result.cmd_version
return record
return result.to_jsonl()
def tag_to_jsonl(tag) -> Dict[str, Any]:
@@ -232,49 +199,24 @@ def crawl_to_jsonl(crawl) -> Dict[str, Any]:
def binary_to_jsonl(binary) -> Dict[str, Any]:
"""
Convert a Binary model instance to a JSONL record.
Wrapper that calls binary.to_jsonl() method.
"""
return {
'type': TYPE_BINARY,
'id': str(binary.id),
'machine_id': str(binary.machine_id),
'name': binary.name,
'binprovider': binary.binprovider,
'abspath': binary.abspath,
'version': binary.version,
'sha256': binary.sha256,
'status': binary.status,
}
return binary.to_jsonl()
def process_to_jsonl(process) -> Dict[str, Any]:
"""
Convert a Process model instance to a JSONL record.
Wrapper that calls process.to_jsonl() method.
"""
record = {
'type': TYPE_PROCESS,
'id': str(process.id),
'machine_id': str(process.machine_id),
'cmd': process.cmd,
'pwd': process.pwd,
'status': process.status,
'exit_code': process.exit_code,
'started_at': process.started_at.isoformat() if process.started_at else None,
'ended_at': process.ended_at.isoformat() if process.ended_at else None,
}
# Include optional fields if set
if process.binary_id:
record['binary_id'] = str(process.binary_id)
if process.pid:
record['pid'] = process.pid
if process.timeout:
record['timeout'] = process.timeout
return record
return process.to_jsonl()
def machine_to_jsonl(machine) -> Dict[str, Any]:
"""
Convert a Machine model instance to a JSONL record.
"""
# Machine.to_jsonl() not implemented yet, use inline conversion
return {
'type': TYPE_MACHINE,
'id': str(machine.id),