Implement JSONL CLI pipeline architecture (Phases 1-4, 6)

Phase 1: Model Prerequisites
- Add ArchiveResult.from_json() and from_jsonl() methods
- Fix Snapshot.to_json() to use tags_str (consistent with Crawl)

Phase 2: Shared Utilities
- Create archivebox/cli/cli_utils.py with shared apply_filters()
- Update 7 CLI files to import from cli_utils.py instead of duplicating

Phase 3: Pass-Through Behavior
- Add pass-through to crawl create (non-Crawl records pass unchanged)
- Add pass-through to snapshot create (Crawl records + others pass through)
- Add pass-through to archiveresult create (Snapshot records + others)
- Add create-or-update behavior to run command:
  - Records WITHOUT id: Create via Model.from_json()
  - Records WITH id: Lookup existing, re-queue
  - Outputs JSONL of all processed records for chaining

Phase 4: Test Infrastructure
- Create archivebox/tests/conftest.py with pytest-django fixtures
- Include CLI helpers, output assertions, database assertions

Phase 6: Config Update
- Update supervisord_util.py: orchestrator -> run command

This enables Unix-style piping:
  archivebox crawl create URL | archivebox run
  archivebox archiveresult list --status=failed | archivebox run
  curl API | jq transform | archivebox crawl create | archivebox run
This commit is contained in:
Claude
2025-12-31 10:07:14 +00:00
parent 1c85b4daa3
commit f3e11b61fd
13 changed files with 529 additions and 145 deletions

View File

@@ -39,21 +39,7 @@ from typing import Optional
import rich_click as click
from rich import print as rprint
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""Apply Django-style filters from CLI kwargs to a QuerySet."""
filters = {}
for key, value in filter_kwargs.items():
if value is not None and key not in ('limit', 'offset'):
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
@@ -69,6 +55,7 @@ def create_archiveresults(
Create ArchiveResults for Snapshots.
Reads Snapshot records from stdin and creates ArchiveResult entries.
Pass-through: Non-Snapshot/ArchiveResult records are output unchanged.
If --plugin is specified, only creates results for that plugin.
Otherwise, creates results for all pending plugins.
@@ -78,7 +65,7 @@ def create_archiveresults(
"""
from django.utils import timezone
from archivebox.misc.jsonl import read_stdin, write_record, TYPE_SNAPSHOT
from archivebox.misc.jsonl import read_stdin, write_record, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
from archivebox.core.models import Snapshot, ArchiveResult
is_tty = sys.stdout.isatty()
@@ -87,6 +74,7 @@ def create_archiveresults(
if snapshot_id:
try:
snapshots = [Snapshot.objects.get(id=snapshot_id)]
pass_through_records = []
except Snapshot.DoesNotExist:
rprint(f'[red]Snapshot not found: {snapshot_id}[/red]', file=sys.stderr)
return 1
@@ -97,17 +85,44 @@ def create_archiveresults(
rprint('[yellow]No Snapshot records provided via stdin[/yellow]', file=sys.stderr)
return 1
# Filter to only Snapshot records
# Separate snapshot records from pass-through records
snapshot_ids = []
pass_through_records = []
for record in records:
if record.get('type') == TYPE_SNAPSHOT:
record_type = record.get('type', '')
if record_type == TYPE_SNAPSHOT:
# Pass through the Snapshot record itself
pass_through_records.append(record)
if record.get('id'):
snapshot_ids.append(record['id'])
elif record_type == TYPE_ARCHIVERESULT:
# ArchiveResult records: pass through if they have an id
if record.get('id'):
pass_through_records.append(record)
# If no id, we could create it, but for now just pass through
else:
pass_through_records.append(record)
elif record_type:
# Other typed records (Crawl, Tag, etc): pass through
pass_through_records.append(record)
elif record.get('id'):
# Assume it's a snapshot ID if no type specified
# Untyped record with id - assume it's a snapshot ID
snapshot_ids.append(record['id'])
# Output pass-through records first
if not is_tty:
for record in pass_through_records:
write_record(record)
if not snapshot_ids:
if pass_through_records:
rprint(f'[dim]Passed through {len(pass_through_records)} records, no new snapshots to process[/dim]', file=sys.stderr)
return 0
rprint('[yellow]No valid Snapshot IDs in input[/yellow]', file=sys.stderr)
return 1
@@ -115,7 +130,7 @@ def create_archiveresults(
if not snapshots:
rprint('[yellow]No matching snapshots found[/yellow]', file=sys.stderr)
return 1
return 0 if pass_through_records else 1
created_count = 0
for snapshot in snapshots:

View File

@@ -34,21 +34,7 @@ from typing import Optional
import rich_click as click
from rich import print as rprint
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""Apply Django-style filters from CLI kwargs to a QuerySet."""
filters = {}
for key, value in filter_kwargs.items():
if value is not None and key not in ('limit', 'offset'):
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset
from archivebox.cli.cli_utils import apply_filters
# =============================================================================

View File

@@ -39,21 +39,7 @@ from typing import Optional, Iterable
import rich_click as click
from rich import print as rprint
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""Apply Django-style filters from CLI kwargs to a QuerySet."""
filters = {}
for key, value in filter_kwargs.items():
if value is not None and key not in ('limit', 'offset'):
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
@@ -71,12 +57,13 @@ def create_crawl(
Create a Crawl job from URLs.
Takes URLs as args or stdin, creates one Crawl with all URLs, outputs JSONL.
Pass-through: Records that are not URLs are output unchanged (for piping).
Exit codes:
0: Success
1: Failure
"""
from archivebox.misc.jsonl import read_args_or_stdin, write_record
from archivebox.misc.jsonl import read_args_or_stdin, write_record, TYPE_CRAWL
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
@@ -90,14 +77,46 @@ def create_crawl(
rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
return 1
# Collect all URLs into a single newline-separated string
# Separate pass-through records from URL records
url_list = []
pass_through_records = []
for record in records:
record_type = record.get('type', '')
# Pass-through: output records that aren't URL/Crawl types
if record_type and record_type != TYPE_CRAWL and not record.get('url') and not record.get('urls'):
pass_through_records.append(record)
continue
# Handle existing Crawl records (just pass through with id)
if record_type == TYPE_CRAWL and record.get('id'):
pass_through_records.append(record)
continue
# Collect URLs
url = record.get('url')
if url:
url_list.append(url)
# Handle 'urls' field (newline-separated)
urls_field = record.get('urls')
if urls_field:
for line in urls_field.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
url_list.append(line)
# Output pass-through records first
if not is_tty:
for record in pass_through_records:
write_record(record)
if not url_list:
if pass_through_records:
# If we had pass-through records but no URLs, that's OK
rprint(f'[dim]Passed through {len(pass_through_records)} records, no new URLs[/dim]', file=sys.stderr)
return 0
rprint('[red]No valid URLs found[/red]', file=sys.stderr)
return 1

View File

@@ -28,21 +28,7 @@ from typing import Optional
import rich_click as click
from rich import print as rprint
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""Apply Django-style filters from CLI kwargs to a QuerySet."""
filters = {}
for key, value in filter_kwargs.items():
if value is not None and key not in ('limit', 'offset'):
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset
from archivebox.cli.cli_utils import apply_filters
# =============================================================================

View File

@@ -31,21 +31,7 @@ from typing import Optional
import rich_click as click
from rich import print as rprint
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""Apply Django-style filters from CLI kwargs to a QuerySet."""
filters = {}
for key, value in filter_kwargs.items():
if value is not None and key not in ('limit', 'offset'):
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset
from archivebox.cli.cli_utils import apply_filters
# =============================================================================

View File

@@ -38,58 +38,110 @@ def process_stdin_records() -> int:
"""
Process JSONL records from stdin.
Reads records, queues them for processing, then runs orchestrator until complete.
Handles any record type: Crawl, Snapshot, ArchiveResult, etc.
Create-or-update behavior:
- Records WITHOUT id: Create via Model.from_json(), then queue
- Records WITH id: Lookup existing, re-queue for processing
Outputs JSONL of all processed records (for chaining).
Handles any record type: Crawl, Snapshot, ArchiveResult.
Auto-cascades: Crawl → Snapshots → ArchiveResults.
Returns exit code (0 = success, 1 = error).
"""
from django.utils import timezone
from archivebox.misc.jsonl import read_stdin, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
from archivebox.misc.jsonl import read_stdin, write_record, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.crawls.models import Crawl
from archivebox.workers.orchestrator import Orchestrator
records = list(read_stdin())
is_tty = sys.stdout.isatty()
if not records:
return 0 # Nothing to process
created_by_id = get_or_create_system_user_pk()
queued_count = 0
output_records = []
for record in records:
record_type = record.get('type')
record_type = record.get('type', '')
record_id = record.get('id')
if not record_id:
continue
try:
if record_type == TYPE_CRAWL:
crawl = Crawl.objects.get(id=record_id)
if crawl.status in [Crawl.StatusChoices.QUEUED, Crawl.StatusChoices.STARTED]:
if record_id:
# Existing crawl - re-queue
try:
crawl = Crawl.objects.get(id=record_id)
except Crawl.DoesNotExist:
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
else:
# New crawl - create it
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
if crawl:
crawl.retry_at = timezone.now()
if crawl.status not in [Crawl.StatusChoices.SEALED]:
crawl.status = Crawl.StatusChoices.QUEUED
crawl.save()
output_records.append(crawl.to_json())
queued_count += 1
elif record_type == TYPE_SNAPSHOT:
snapshot = Snapshot.objects.get(id=record_id)
if snapshot.status in [Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED]:
elif record_type == TYPE_SNAPSHOT or (record.get('url') and not record_type):
if record_id:
# Existing snapshot - re-queue
try:
snapshot = Snapshot.objects.get(id=record_id)
except Snapshot.DoesNotExist:
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
else:
# New snapshot - create it
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
if snapshot:
snapshot.retry_at = timezone.now()
if snapshot.status not in [Snapshot.StatusChoices.SEALED]:
snapshot.status = Snapshot.StatusChoices.QUEUED
snapshot.save()
output_records.append(snapshot.to_json())
queued_count += 1
elif record_type == TYPE_ARCHIVERESULT:
archiveresult = ArchiveResult.objects.get(id=record_id)
if archiveresult.status in [ArchiveResult.StatusChoices.QUEUED, ArchiveResult.StatusChoices.STARTED, ArchiveResult.StatusChoices.BACKOFF]:
if record_id:
# Existing archiveresult - re-queue
try:
archiveresult = ArchiveResult.objects.get(id=record_id)
except ArchiveResult.DoesNotExist:
archiveresult = ArchiveResult.from_json(record)
else:
# New archiveresult - create it
archiveresult = ArchiveResult.from_json(record)
if archiveresult:
archiveresult.retry_at = timezone.now()
if archiveresult.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED, ArchiveResult.StatusChoices.BACKOFF]:
archiveresult.status = ArchiveResult.StatusChoices.QUEUED
archiveresult.save()
output_records.append(archiveresult.to_json())
queued_count += 1
except (Crawl.DoesNotExist, Snapshot.DoesNotExist, ArchiveResult.DoesNotExist):
rprint(f'[yellow]Record not found: {record_type} {record_id}[/yellow]', file=sys.stderr)
else:
# Unknown type - pass through
output_records.append(record)
except Exception as e:
rprint(f'[yellow]Error processing record: {e}[/yellow]', file=sys.stderr)
continue
# Output all processed records (for chaining)
if not is_tty:
for rec in output_records:
write_record(rec)
if queued_count == 0:
rprint('[yellow]No records to process[/yellow]', file=sys.stderr)
return 0

View File

@@ -36,21 +36,7 @@ from typing import Optional, Iterable
import rich_click as click
from rich import print as rprint
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""Apply Django-style filters from CLI kwargs to a QuerySet."""
filters = {}
for key, value in filter_kwargs.items():
if value is not None and key not in ('limit', 'offset'):
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
@@ -66,13 +52,12 @@ def create_snapshots(
) -> int:
"""
Create Snapshots from URLs or stdin JSONL (Crawl or Snapshot records).
Pass-through: Records that are not Crawl/Snapshot/URL are output unchanged.
Exit codes:
0: Success
1: Failure
"""
from django.utils import timezone
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record,
TYPE_SNAPSHOT, TYPE_CRAWL
@@ -93,11 +78,17 @@ def create_snapshots(
# Process each record - handle Crawls and plain URLs/Snapshots
created_snapshots = []
pass_through_count = 0
for record in records:
record_type = record.get('type')
record_type = record.get('type', '')
try:
if record_type == TYPE_CRAWL:
# Pass through the Crawl record itself first
if not is_tty:
write_record(record)
# Input is a Crawl - get or create it, then create Snapshots for its URLs
crawl = None
crawl_id = record.get('id')
@@ -144,11 +135,20 @@ def create_snapshots(
if not is_tty:
write_record(snapshot.to_json())
else:
# Pass-through: output records we don't handle
if not is_tty:
write_record(record)
pass_through_count += 1
except Exception as e:
rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr)
continue
if not created_snapshots:
if pass_through_count > 0:
rprint(f'[dim]Passed through {pass_through_count} records, no new snapshots[/dim]', file=sys.stderr)
return 0
rprint('[red]No snapshots created[/red]', file=sys.stderr)
return 1

View File

@@ -36,21 +36,7 @@ from typing import Optional, Iterable
import rich_click as click
from rich import print as rprint
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""Apply Django-style filters from CLI kwargs to a QuerySet."""
filters = {}
for key, value in filter_kwargs.items():
if value is not None and key not in ('limit', 'offset'):
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset
from archivebox.cli.cli_utils import apply_filters
# =============================================================================

View File

@@ -0,0 +1,46 @@
"""
Shared CLI utilities for ArchiveBox commands.
This module contains common utilities used across multiple CLI commands,
extracted to avoid code duplication.
"""
__package__ = 'archivebox.cli'
from typing import Optional
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""
Apply Django-style filters from CLI kwargs to a QuerySet.
Supports: --status=queued, --url__icontains=example, --id__in=uuid1,uuid2
Args:
queryset: Django QuerySet to filter
filter_kwargs: Dict of filter key-value pairs from CLI
limit: Optional limit on results
Returns:
Filtered QuerySet
Example:
queryset = Snapshot.objects.all()
filter_kwargs = {'status': 'queued', 'url__icontains': 'example.com'}
filtered = apply_filters(queryset, filter_kwargs, limit=10)
"""
filters = {}
for key, value in filter_kwargs.items():
if value is None or key in ('limit', 'offset'):
continue
# Handle CSV lists for __in filters
if key.endswith('__in') and isinstance(value, str):
value = [v.strip() for v in value.split(',')]
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset