refactor: move remaining JSONL methods to models, clean up jsonl.py

- Add Tag.to_jsonl() method with schema_version
- Add Crawl.to_jsonl() method with schema_version
- Fix Tag.from_jsonl() to not depend on jsonl.py helper
- Update tests to use Snapshot.from_jsonl() instead of non-existent get_or_create_snapshot

Remove model-specific functions from misc/jsonl.py:
- tag_to_jsonl() - use Tag.to_jsonl() instead
- crawl_to_jsonl() - use Crawl.to_jsonl() instead
- get_or_create_tag() - use Tag.from_jsonl() instead
- process_jsonl_records() - use model from_jsonl() methods directly

jsonl.py now only contains generic I/O utilities:
- Type constants (TYPE_SNAPSHOT, etc.)
- parse_line(), read_stdin(), read_file(), read_args_or_stdin()
- write_record(), write_records()
- filter_by_type(), process_records()
This commit is contained in:
Claude
2025-12-30 19:30:18 +00:00
parent bc273c5a7f
commit ae648c9bc1
4 changed files with 45 additions and 102 deletions

View File

@@ -664,7 +664,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
"""
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
get_or_create_snapshot, read_args_or_stdin,
read_args_or_stdin,
TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
@@ -673,7 +673,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# === archivebox snapshot https://example.com ===
url = 'https://test-pipeline-1.example.com'
snapshot = get_or_create_snapshot({'url': url}, created_by_id=created_by_id)
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
snapshot_jsonl = json.dumps(snapshot.to_jsonl())
# === | archivebox extract ===
@@ -698,7 +698,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
"""
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
get_or_create_snapshot, read_args_or_stdin,
read_args_or_stdin,
TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
@@ -709,7 +709,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# === archivebox crawl https://example.com ===
# Step 1: Create snapshot for starting URL
start_url = 'https://test-crawl-pipeline.example.com'
start_snapshot = get_or_create_snapshot({'url': start_url}, created_by_id=created_by_id)
start_snapshot = Snapshot.from_jsonl({'url': start_url}, overrides={'created_by_id': created_by_id})
# Step 2: Simulate extractor output with discovered URLs
snapshot_dir = Path(self.test_dir) / 'archive' / str(start_snapshot.timestamp)
@@ -738,7 +738,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Create snapshots for discovered URLs
created_snapshots = []
for record in records:
snap = get_or_create_snapshot(record, created_by_id=created_by_id)
snap = Snapshot.from_jsonl(record, overrides={'created_by_id': created_by_id})
created_snapshots.append(snap)
self.assertEqual(len(created_snapshots), 2)
@@ -787,14 +787,13 @@ class TestDepthWorkflows(unittest.TestCase):
Depth 0: Only archive the specified URL, no crawling.
"""
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import get_or_create_snapshot
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# Create snapshot
url = 'https://depth0-test.example.com'
snapshot = get_or_create_snapshot({'url': url}, created_by_id=created_by_id)
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
# Verify only one snapshot created
self.assertEqual(Snapshot.objects.filter(url=url).count(), 1)

View File

@@ -91,6 +91,19 @@ class Tag(ModelWithSerializers):
def api_url(self) -> str:
return reverse_lazy('api-1:get_tag', args=[self.id])
def to_jsonl(self) -> dict:
"""
Convert Tag model instance to a JSONL record.
"""
from archivebox.config import VERSION
return {
'type': 'Tag',
'schema_version': VERSION,
'id': str(self.id),
'name': self.name,
'slug': self.slug,
}
@staticmethod
def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None):
"""
@@ -103,19 +116,18 @@ class Tag(ModelWithSerializers):
Returns:
Tag instance or None
"""
from archivebox.misc.jsonl import get_or_create_tag
try:
tag = get_or_create_tag(record)
# Auto-attach to snapshot if in overrides
if overrides and 'snapshot' in overrides and tag:
overrides['snapshot'].tags.add(tag)
return tag
except ValueError:
name = record.get('name')
if not name:
return None
tag, _ = Tag.objects.get_or_create(name=name)
# Auto-attach to snapshot if in overrides
if overrides and 'snapshot' in overrides and tag:
overrides['snapshot'].tags.add(tag)
return tag
class SnapshotTag(models.Model):
id = models.AutoField(primary_key=True)

View File

@@ -134,6 +134,21 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
def api_url(self) -> str:
return reverse_lazy('api-1:get_crawl', args=[self.id])
def to_jsonl(self) -> dict:
"""
Convert Crawl model instance to a JSONL record.
"""
from archivebox.config import VERSION
return {
'type': 'Crawl',
'schema_version': VERSION,
'id': str(self.id),
'urls': self.urls,
'status': self.status,
'max_depth': self.max_depth,
'created_at': self.created_at.isoformat() if self.created_at else None,
}
@property
def output_dir_parent(self) -> str:
"""Construct parent directory: users/{user_id}/crawls/{YYYYMMDD}"""

View File

@@ -18,7 +18,7 @@ __package__ = 'archivebox.misc'
import sys
import json
from typing import Iterator, Dict, Any, Optional, TextIO, Callable, Union, List
from typing import Iterator, Dict, Any, Optional, TextIO, Callable
from pathlib import Path
@@ -154,32 +154,6 @@ def filter_by_type(records: Iterator[Dict[str, Any]], record_type: str) -> Itera
yield record
def tag_to_jsonl(tag) -> Dict[str, Any]:
"""
Convert a Tag model instance to a JSONL record.
"""
return {
'type': TYPE_TAG,
'id': str(tag.id),
'name': tag.name,
'slug': tag.slug,
}
def crawl_to_jsonl(crawl) -> Dict[str, Any]:
"""
Convert a Crawl model instance to a JSONL record.
"""
return {
'type': TYPE_CRAWL,
'id': str(crawl.id),
'urls': crawl.urls,
'status': crawl.status,
'max_depth': crawl.max_depth,
'created_at': crawl.created_at.isoformat() if crawl.created_at else None,
}
def process_records(
records: Iterator[Dict[str, Any]],
handlers: Dict[str, Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]]
@@ -203,60 +177,3 @@ def process_records(
yield result
def get_or_create_tag(record: Dict[str, Any]):
"""
Get or create a Tag from a JSONL record.
Returns the Tag instance.
"""
from archivebox.core.models import Tag
name = record.get('name')
if not name:
raise ValueError("Record missing required 'name' field")
tag, _ = Tag.objects.get_or_create(name=name)
return tag
def process_jsonl_records(records: Iterator[Dict[str, Any]], created_by_id: Optional[int] = None) -> Dict[str, List]:
"""
Process JSONL records, creating Tags and Snapshots as needed.
Args:
records: Iterator of JSONL record dicts
created_by_id: User ID for created objects
Returns:
Dict with 'tags' and 'snapshots' lists of created objects
"""
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = created_by_id or get_or_create_system_user_pk()
results = {
'tags': [],
'snapshots': [],
}
for record in records:
record_type = record.get('type', TYPE_SNAPSHOT)
if record_type == TYPE_TAG:
try:
tag = get_or_create_tag(record)
results['tags'].append(tag)
except ValueError:
continue
elif record_type == TYPE_SNAPSHOT or 'url' in record:
try:
from archivebox.core.models import Snapshot
overrides = {'created_by_id': created_by_id} if created_by_id else {}
snapshot = Snapshot.from_jsonl(record, overrides=overrides)
if snapshot:
results['snapshots'].append(snapshot)
except ValueError:
continue
return results