Convert snapshot index from JSON to JSONL (#1730)

<!-- IMPORTANT: Do not submit PRs with only formatting / PEP8 / line
length changes. -->

# Summary

<!--e.g. This PR fixes ABC or adds the ability to do XYZ...-->

# Related issues

<!-- e.g. #123 or Roadmap goal #
https://github.com/pirate/ArchiveBox/wiki/Roadmap -->

# Changes these areas

- [ ] Bugfixes
- [ ] Feature behavior
- [ ] Command line interface
- [ ] Configuration options
- [ ] Internal architecture
- [x] Snapshot data layout on disk

<!-- This is an auto-generated description by cubic. -->
---
## Summary by cubic
Switch snapshot index storage from index.json to a flat index.jsonl
format for easier parsing and extensibility. Includes automatic
migration and backward-compatible reading, plus updated CLI pipeline to
emit/consume JSONL records.

- **New Features**
- Write and read index.jsonl with per-line records (Snapshot,
ArchiveResult, Binary, Process); reconcile prefers JSONL.
- Auto-convert legacy index.json to JSONL during migration/update;
load_from_directory/create_from_directory support both formats.
- Serialization moved to model to_jsonl methods; added schema_version to
all records, including Tag, Crawl, Binary, and Process.
- CLI pipeline updated: crawl creates a single Crawl job from all input
URLs and outputs Crawl JSONL (no immediate crawling); snapshot accepts
Crawl JSONL/IDs and outputs Snapshot JSONL; extract outputs
ArchiveResult JSONL via model methods.

- **Migration**
- Conversion runs during filesystem migration and reconcile; no manual
steps.
- Legacy index.json is deleted after conversion; external tools should
switch to index.jsonl.

<sup>Written for commit 251fe33e49.
Summary will update on new commits.</sup>

<!-- End of auto-generated description by cubic. -->
This commit is contained in:
Nick Sweeting
2025-12-30 12:32:52 -08:00
committed by GitHub
10 changed files with 929 additions and 700 deletions

View File

@@ -1,224 +1,114 @@
#!/usr/bin/env python3
"""
archivebox crawl [urls_or_snapshot_ids...] [--depth=N] [--plugin=NAME]
archivebox crawl [urls...] [--depth=N] [--tag=TAG]
Discover outgoing links from URLs or existing Snapshots.
If a URL is passed, creates a Snapshot for it first, then runs parser plugins.
If a snapshot_id is passed, runs parser plugins on the existing Snapshot.
Outputs discovered outlink URLs as JSONL.
Pipe the output to `archivebox snapshot` to archive the discovered URLs.
Create Crawl jobs from URLs. Accepts URLs as arguments, from stdin, or via JSONL.
Does NOT immediately start the crawl - pipe to `archivebox snapshot` to process.
Input formats:
- Plain URLs (one per line)
- Snapshot UUIDs (one per line)
- JSONL: {"type": "Snapshot", "url": "...", ...}
- JSONL: {"type": "Snapshot", "id": "...", ...}
- JSONL: {"url": "...", "depth": 1, "tags": "..."}
Output (JSONL):
{"type": "Snapshot", "url": "https://discovered-url.com", "via_extractor": "...", ...}
{"type": "Crawl", "id": "...", "urls": "...", "status": "queued", ...}
Examples:
# Discover links from a page (creates snapshot first)
# Create a crawl job
archivebox crawl https://example.com
# Discover links from an existing snapshot
archivebox crawl 01234567-89ab-cdef-0123-456789abcdef
# Create crawl with depth
archivebox crawl --depth=1 https://example.com
# Full recursive crawl pipeline
# Full pipeline: create crawl, create snapshots, run extractors
archivebox crawl https://example.com | archivebox snapshot | archivebox extract
# Use only specific parser plugin
archivebox crawl --plugin=parse_html_urls https://example.com
# Chain: create snapshot, then crawl its outlinks
archivebox snapshot https://example.com | archivebox crawl | archivebox snapshot | archivebox extract
# Process existing Crawl by ID (runs the crawl state machine)
archivebox crawl 01234567-89ab-cdef-0123-456789abcdef
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox crawl'
import sys
import json
from pathlib import Path
from typing import Optional
import rich_click as click
from archivebox.misc.util import docstring
def discover_outlinks(
args: tuple,
depth: int = 1,
plugin: str = '',
wait: bool = True,
def create_crawls(
records: list,
depth: int = 0,
tag: str = '',
created_by_id: Optional[int] = None,
) -> int:
"""
Discover outgoing links from URLs or existing Snapshots.
Create a single Crawl job from all input URLs.
Accepts URLs or snapshot_ids. For URLs, creates Snapshots first.
Runs parser plugins, outputs discovered URLs as JSONL.
The output can be piped to `archivebox snapshot` to archive the discovered links.
Takes pre-read records, creates one Crawl with all URLs, outputs JSONL.
Does NOT start the crawl - just creates the job in QUEUED state.
Exit codes:
0: Success
1: Failure
"""
from rich import print as rprint
from django.utils import timezone
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record,
TYPE_SNAPSHOT
)
from archivebox.misc.jsonl import write_record
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.crawls.models import Crawl
from archivebox.config import CONSTANTS
from archivebox.workers.orchestrator import Orchestrator
created_by_id = get_or_create_system_user_pk()
created_by_id = created_by_id or get_or_create_system_user_pk()
is_tty = sys.stdout.isatty()
# Collect all input records
records = list(read_args_or_stdin(args))
if not records:
rprint('[yellow]No URLs or snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
return 1
# Separate records into existing snapshots vs new URLs
existing_snapshot_ids = []
new_url_records = []
# Collect all URLs into a single newline-separated string
urls = []
for record in records:
# Check if it's an existing snapshot (has id but no url, or looks like a UUID)
if record.get('id') and not record.get('url'):
existing_snapshot_ids.append(record['id'])
elif record.get('id'):
# Has both id and url - check if snapshot exists
try:
Snapshot.objects.get(id=record['id'])
existing_snapshot_ids.append(record['id'])
except Snapshot.DoesNotExist:
new_url_records.append(record)
elif record.get('url'):
new_url_records.append(record)
url = record.get('url')
if url:
urls.append(url)
# For new URLs, create a Crawl and Snapshots
snapshot_ids = list(existing_snapshot_ids)
if new_url_records:
# Create a Crawl to manage this operation
sources_file = CONSTANTS.SOURCES_DIR / f'{timezone.now().strftime("%Y-%m-%d__%H-%M-%S")}__crawl.txt'
sources_file.parent.mkdir(parents=True, exist_ok=True)
sources_file.write_text('\n'.join(r.get('url', '') for r in new_url_records if r.get('url')))
crawl = Crawl.from_file(
sources_file,
max_depth=depth,
label=f'crawl --depth={depth}',
created_by=created_by_id,
)
# Create snapshots for new URLs
for record in new_url_records:
try:
record['crawl_id'] = str(crawl.id)
record['depth'] = record.get('depth', 0)
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_jsonl(record, overrides=overrides)
if snapshot:
snapshot_ids.append(str(snapshot.id))
except Exception as e:
rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr)
continue
if not snapshot_ids:
rprint('[red]No snapshots to process[/red]', file=sys.stderr)
if not urls:
rprint('[red]No valid URLs found[/red]', file=sys.stderr)
return 1
if existing_snapshot_ids:
rprint(f'[blue]Using {len(existing_snapshot_ids)} existing snapshots[/blue]', file=sys.stderr)
if new_url_records:
rprint(f'[blue]Created {len(snapshot_ids) - len(existing_snapshot_ids)} new snapshots[/blue]', file=sys.stderr)
rprint(f'[blue]Running parser plugins on {len(snapshot_ids)} snapshots...[/blue]', file=sys.stderr)
try:
# Build crawl record with all URLs as newline-separated string
crawl_record = {
'urls': '\n'.join(urls),
'max_depth': depth,
'tags_str': tag,
'label': '',
}
# Create ArchiveResults for plugins
# If --plugin is specified, only run that one. Otherwise, run all available plugins.
# The orchestrator will handle dependency ordering (plugins declare deps in config.json)
for snapshot_id in snapshot_ids:
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
crawl = Crawl.from_jsonl(crawl_record, overrides={'created_by_id': created_by_id})
if not crawl:
rprint('[red]Failed to create crawl[/red]', file=sys.stderr)
return 1
if plugin:
# User specified a single plugin to run
ArchiveResult.objects.get_or_create(
snapshot=snapshot,
extractor=plugin,
defaults={
'status': ArchiveResult.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
)
else:
# Create pending ArchiveResults for all enabled plugins
# This uses hook discovery to find available plugins dynamically
snapshot.create_pending_archiveresults()
# Output JSONL record (only when piped)
if not is_tty:
write_record(crawl.to_jsonl())
# Mark snapshot as started
snapshot.status = Snapshot.StatusChoices.STARTED
snapshot.retry_at = timezone.now()
snapshot.save()
rprint(f'[green]Created crawl with {len(urls)} URLs[/green]', file=sys.stderr)
except Snapshot.DoesNotExist:
continue
# Run plugins
if wait:
rprint('[blue]Running outlink plugins...[/blue]', file=sys.stderr)
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
# Collect discovered URLs from urls.jsonl files
# Uses dynamic discovery - any plugin that outputs urls.jsonl is considered a parser
from archivebox.hooks import collect_urls_from_plugins
discovered_urls = {}
for snapshot_id in snapshot_ids:
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
snapshot_dir = Path(snapshot.output_dir)
# Dynamically collect urls.jsonl from ANY plugin subdirectory
for entry in collect_urls_from_plugins(snapshot_dir):
url = entry.get('url')
if url and url not in discovered_urls:
# Add metadata for crawl tracking
entry['type'] = TYPE_SNAPSHOT
entry['depth'] = snapshot.depth + 1
entry['via_snapshot'] = str(snapshot.id)
discovered_urls[url] = entry
except Snapshot.DoesNotExist:
continue
rprint(f'[green]Discovered {len(discovered_urls)} URLs[/green]', file=sys.stderr)
# Output discovered URLs as JSONL (when piped) or human-readable (when TTY)
for url, entry in discovered_urls.items():
# If TTY, show human-readable output
if is_tty:
via = entry.get('via_extractor', 'unknown')
rprint(f' [dim]{via}[/dim] {url[:80]}', file=sys.stderr)
else:
write_record(entry)
rprint(f' [dim]{crawl.id}[/dim]', file=sys.stderr)
for url in urls[:5]: # Show first 5 URLs
rprint(f' {url[:70]}', file=sys.stderr)
if len(urls) > 5:
rprint(f' ... and {len(urls) - 5} more', file=sys.stderr)
return 0
return 0
except Exception as e:
rprint(f'[red]Error creating crawl: {e}[/red]', file=sys.stderr)
return 1
def process_crawl_by_id(crawl_id: str) -> int:
@@ -262,12 +152,11 @@ def is_crawl_id(value: str) -> bool:
@click.command()
@click.option('--depth', '-d', type=int, default=1, help='Max depth for recursive crawling (default: 1)')
@click.option('--plugin', '-p', default='', help='Use only this parser plugin (e.g., parse_html_urls, parse_dom_outlinks)')
@click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)')
@click.option('--depth', '-d', type=int, default=0, help='Max depth for recursive crawling (default: 0, no recursion)')
@click.option('--tag', '-t', default='', help='Comma-separated tags to add to snapshots')
@click.argument('args', nargs=-1)
def main(depth: int, plugin: str, wait: bool, args: tuple):
"""Discover outgoing links from URLs or existing Snapshots, or process Crawl by ID"""
def main(depth: int, tag: str, args: tuple):
"""Create Crawl jobs from URLs, or process existing Crawls by ID"""
from archivebox.misc.jsonl import read_args_or_stdin
# Read all input
@@ -275,7 +164,7 @@ def main(depth: int, plugin: str, wait: bool, args: tuple):
if not records:
from rich import print as rprint
rprint('[yellow]No URLs, Snapshot IDs, or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
rprint('[yellow]No URLs or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
sys.exit(1)
# Check if input looks like existing Crawl IDs to process
@@ -295,8 +184,8 @@ def main(depth: int, plugin: str, wait: bool, args: tuple):
exit_code = result
sys.exit(exit_code)
else:
# Default behavior: discover outlinks from input (URLs or Snapshot IDs)
sys.exit(discover_outlinks(args, depth=depth, plugin=plugin, wait=wait))
# Default behavior: create Crawl jobs from URLs
sys.exit(create_crawls(records, depth=depth, tag=tag))
if __name__ == '__main__':

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
archivebox extract [snapshot_ids...] [--plugin=NAME]
archivebox extract [snapshot_ids...] [--plugins=NAMES]
Run plugins on Snapshots. Accepts snapshot IDs as arguments, from stdin, or via JSONL.
@@ -20,8 +20,8 @@ Examples:
# Pipe from snapshot command
archivebox snapshot https://example.com | archivebox extract
# Run specific plugin only
archivebox extract --plugin=screenshot 01234567-89ab-cdef-0123-456789abcdef
# Run specific plugins only
archivebox extract --plugins=screenshot,singlefile 01234567-89ab-cdef-0123-456789abcdef
# Chain commands
archivebox crawl https://example.com | archivebox snapshot | archivebox extract
@@ -76,7 +76,7 @@ def process_archiveresult_by_id(archiveresult_id: str) -> int:
def run_plugins(
args: tuple,
plugin: str = '',
plugins: str = '',
wait: bool = True,
) -> int:
"""
@@ -92,7 +92,7 @@ def run_plugins(
from django.utils import timezone
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record, archiveresult_to_jsonl,
read_args_or_stdin, write_record,
TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
)
from archivebox.core.models import Snapshot, ArchiveResult
@@ -147,21 +147,25 @@ def run_plugins(
continue
# Create pending ArchiveResults if needed
if plugin:
# Only create for specific plugin
result, created = ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin=plugin,
defaults={
'status': ArchiveResult.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
)
if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]:
# Reset for retry
result.status = ArchiveResult.StatusChoices.QUEUED
result.retry_at = timezone.now()
result.save()
if plugins:
# Parse comma-separated plugins list
plugins_list = [p.strip() for p in plugins.split(',') if p.strip()]
# Only create for specific plugins
for plugin_name in plugins_list:
result, created = ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin=plugin_name,
defaults={
'status': ArchiveResult.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
)
if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]:
# Reset for retry
result.status = ArchiveResult.StatusChoices.QUEUED
result.retry_at = timezone.now()
result.save()
else:
# Create all pending plugins
snapshot.create_pending_archiveresults()
@@ -191,8 +195,10 @@ def run_plugins(
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
results = snapshot.archiveresult_set.all()
if plugin:
results = results.filter(plugin=plugin)
if plugins:
# Parse comma-separated plugins list
plugins_list = [p.strip() for p in plugins.split(',') if p.strip()]
results = results.filter(plugin__in=plugins_list)
for result in results:
if is_tty:
@@ -203,7 +209,7 @@ def run_plugins(
}.get(result.status, 'dim')
rprint(f' [{status_color}]{result.status}[/{status_color}] {result.plugin}{result.output_str or ""}', file=sys.stderr)
else:
write_record(archiveresult_to_jsonl(result))
write_record(result.to_jsonl())
except Snapshot.DoesNotExist:
continue
@@ -222,10 +228,10 @@ def is_archiveresult_id(value: str) -> bool:
@click.command()
@click.option('--plugin', '-p', default='', help='Run only this plugin (e.g., screenshot, singlefile)')
@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run (e.g., screenshot,singlefile)')
@click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)')
@click.argument('args', nargs=-1)
def main(plugin: str, wait: bool, args: tuple):
def main(plugins: str, wait: bool, args: tuple):
"""Run plugins on Snapshots, or process existing ArchiveResults by ID"""
from archivebox.misc.jsonl import read_args_or_stdin
@@ -254,7 +260,7 @@ def main(plugin: str, wait: bool, args: tuple):
sys.exit(exit_code)
else:
# Default behavior: run plugins on Snapshots from input
sys.exit(run_plugins(args, plugin=plugin, wait=wait))
sys.exit(run_plugins(args, plugins=plugins, wait=wait))
if __name__ == '__main__':

View File

@@ -1,29 +1,34 @@
#!/usr/bin/env python3
"""
archivebox snapshot [urls...] [--depth=N] [--tag=TAG] [--plugins=...]
archivebox snapshot [urls_or_crawl_ids...] [--tag=TAG] [--plugins=NAMES]
Create Snapshots from URLs. Accepts URLs as arguments, from stdin, or via JSONL.
Create Snapshots from URLs or Crawl jobs. Accepts URLs, Crawl JSONL, or Crawl IDs.
Input formats:
- Plain URLs (one per line)
- JSONL: {"type": "Crawl", "id": "...", "urls": "..."}
- JSONL: {"type": "Snapshot", "url": "...", "title": "...", "tags": "..."}
- Crawl UUIDs (one per line)
Output (JSONL):
{"type": "Snapshot", "id": "...", "url": "...", "status": "queued", ...}
Examples:
# Create snapshots from URLs
# Create snapshots from URLs directly
archivebox snapshot https://example.com https://foo.com
# Pipe from stdin
echo 'https://example.com' | archivebox snapshot
# Pipe from crawl command
archivebox crawl https://example.com | archivebox snapshot
# Chain with extract
archivebox snapshot https://example.com | archivebox extract
archivebox crawl https://example.com | archivebox snapshot | archivebox extract
# With crawl depth
archivebox snapshot --depth=1 https://example.com
# Run specific plugins after creating snapshots
archivebox snapshot --plugins=screenshot,singlefile https://example.com
# Process existing Snapshot by ID
archivebox snapshot 01234567-89ab-cdef-0123-456789abcdef
"""
__package__ = 'archivebox.cli'
@@ -67,14 +72,13 @@ def process_snapshot_by_id(snapshot_id: str) -> int:
def create_snapshots(
urls: tuple,
depth: int = 0,
args: tuple,
tag: str = '',
plugins: str = '',
created_by_id: Optional[int] = None,
) -> int:
"""
Create Snapshots from URLs or JSONL records.
Create Snapshots from URLs, Crawl JSONL, or Crawl IDs.
Reads from args or stdin, creates Snapshot objects, outputs JSONL.
If --plugins is passed, also runs specified plugins (blocking).
@@ -87,64 +91,78 @@ def create_snapshots(
from django.utils import timezone
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record, snapshot_to_jsonl,
TYPE_SNAPSHOT, TYPE_TAG
read_args_or_stdin, write_record,
TYPE_SNAPSHOT, TYPE_CRAWL
)
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.core.models import Snapshot
from archivebox.crawls.models import Crawl
from archivebox.config import CONSTANTS
created_by_id = created_by_id or get_or_create_system_user_pk()
is_tty = sys.stdout.isatty()
# Collect all input records
records = list(read_args_or_stdin(urls))
records = list(read_args_or_stdin(args))
if not records:
rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
rprint('[yellow]No URLs or Crawls provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
return 1
# If depth > 0, we need a Crawl to manage recursive discovery
crawl = None
if depth > 0:
# Create a crawl for this batch
sources_file = CONSTANTS.SOURCES_DIR / f'{timezone.now().strftime("%Y-%m-%d__%H-%M-%S")}__snapshot.txt'
sources_file.parent.mkdir(parents=True, exist_ok=True)
sources_file.write_text('\n'.join(r.get('url', '') for r in records if r.get('url')))
crawl = Crawl.from_file(
sources_file,
max_depth=depth,
label=f'snapshot --depth={depth}',
created_by=created_by_id,
)
# Process each record
# Process each record - handle Crawls and plain URLs/Snapshots
created_snapshots = []
for record in records:
if record.get('type') != TYPE_SNAPSHOT and 'url' not in record:
continue
record_type = record.get('type')
try:
# Add crawl info if we have one
if crawl:
record['crawl_id'] = str(crawl.id)
record['depth'] = record.get('depth', 0)
if record_type == TYPE_CRAWL:
# Input is a Crawl - get or create it, then create Snapshots for its URLs
crawl = None
crawl_id = record.get('id')
if crawl_id:
try:
crawl = Crawl.objects.get(id=crawl_id)
except Crawl.DoesNotExist:
# Crawl doesn't exist, create it
crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id})
else:
# No ID, create new crawl
crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id})
# Add tags if provided via CLI
if tag and not record.get('tags'):
record['tags'] = tag
if not crawl:
continue
# Get or create the snapshot
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_jsonl(record, overrides=overrides)
if snapshot:
created_snapshots.append(snapshot)
# Create snapshots for each URL in the crawl
for url in crawl.get_urls_list():
# Merge CLI tags with crawl tags
merged_tags = crawl.tags_str
if tag:
if merged_tags:
merged_tags = f"{merged_tags},{tag}"
else:
merged_tags = tag
snapshot_record = {
'url': url,
'tags': merged_tags,
'crawl_id': str(crawl.id),
'depth': 0,
}
snapshot = Snapshot.from_jsonl(snapshot_record, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
if not is_tty:
write_record(snapshot.to_jsonl())
# Output JSONL record (only when piped)
if not is_tty:
write_record(snapshot_to_jsonl(snapshot))
elif record_type == TYPE_SNAPSHOT or record.get('url'):
# Input is a Snapshot or plain URL
# Add tags if provided via CLI
if tag and not record.get('tags'):
record['tags'] = tag
snapshot = Snapshot.from_jsonl(record, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
if not is_tty:
write_record(snapshot.to_jsonl())
except Exception as e:
rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr)
@@ -161,10 +179,32 @@ def create_snapshots(
for snapshot in created_snapshots:
rprint(f' [dim]{snapshot.id}[/dim] {snapshot.url[:60]}', file=sys.stderr)
# If --plugins is passed, run the orchestrator for those plugins
# If --plugins is passed, create ArchiveResults and run the orchestrator
if plugins:
from archivebox.core.models import ArchiveResult
from archivebox.workers.orchestrator import Orchestrator
rprint(f'[blue]Running plugins: {plugins or "all"}...[/blue]', file=sys.stderr)
# Parse comma-separated plugins list
plugins_list = [p.strip() for p in plugins.split(',') if p.strip()]
# Create ArchiveResults for the specific plugins on each snapshot
for snapshot in created_snapshots:
for plugin_name in plugins_list:
result, created = ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin=plugin_name,
defaults={
'status': ArchiveResult.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
)
if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]:
# Reset for retry
result.status = ArchiveResult.StatusChoices.QUEUED
result.retry_at = timezone.now()
result.save()
rprint(f'[blue]Running plugins: {plugins}...[/blue]', file=sys.stderr)
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
@@ -175,16 +215,19 @@ def is_snapshot_id(value: str) -> bool:
"""Check if value looks like a Snapshot UUID."""
import re
uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I)
return bool(uuid_pattern.match(value))
if not uuid_pattern.match(value):
return False
# Verify it's actually a Snapshot (not a Crawl or other object)
from archivebox.core.models import Snapshot
return Snapshot.objects.filter(id=value).exists()
@click.command()
@click.option('--depth', '-d', type=int, default=0, help='Recursively crawl linked pages up to N levels deep')
@click.option('--tag', '-t', default='', help='Comma-separated tags to add to each snapshot')
@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run after creating snapshots (e.g. title,screenshot)')
@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run after creating snapshots (e.g., screenshot,singlefile)')
@click.argument('args', nargs=-1)
def main(depth: int, tag: str, plugins: str, args: tuple):
"""Create Snapshots from URLs, or process existing Snapshots by ID"""
def main(tag: str, plugins: str, args: tuple):
"""Create Snapshots from URLs/Crawls, or process existing Snapshots by ID"""
from archivebox.misc.jsonl import read_args_or_stdin
# Read all input
@@ -192,17 +235,21 @@ def main(depth: int, tag: str, plugins: str, args: tuple):
if not records:
from rich import print as rprint
rprint('[yellow]No URLs or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
rprint('[yellow]No URLs, Crawl IDs, or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
sys.exit(1)
# Check if input looks like existing Snapshot IDs to process
# If ALL inputs are UUIDs with no URL, assume we're processing existing Snapshots
all_are_ids = all(
(r.get('id') and not r.get('url')) or is_snapshot_id(r.get('url', ''))
# If ALL inputs are UUIDs with no URL and exist as Snapshots, process them
all_are_snapshot_ids = all(
is_snapshot_id(r.get('id') or r.get('url', ''))
for r in records
if r.get('type') != 'Crawl' # Don't check Crawl records as Snapshot IDs
)
if all_are_ids:
# But also check that we're not receiving Crawl JSONL
has_crawl_records = any(r.get('type') == 'Crawl' for r in records)
if all_are_snapshot_ids and not has_crawl_records:
# Process existing Snapshots by ID
exit_code = 0
for record in records:
@@ -212,8 +259,8 @@ def main(depth: int, tag: str, plugins: str, args: tuple):
exit_code = result
sys.exit(exit_code)
else:
# Create new Snapshots from URLs
sys.exit(create_snapshots(args, depth=depth, tag=tag, plugins=plugins))
# Create new Snapshots from URLs or Crawls
sys.exit(create_snapshots(args, tag=tag, plugins=plugins))
if __name__ == '__main__':

View File

@@ -6,12 +6,15 @@ This module tests the JSONL-based piping between CLI commands as described in:
https://github.com/ArchiveBox/ArchiveBox/issues/1363
Workflows tested:
archivebox snapshot URL | archivebox extract
archivebox crawl URL -> Crawl JSONL
archivebox snapshot -> Snapshot JSONL (accepts Crawl or URL input)
archivebox extract -> ArchiveResult JSONL (accepts Snapshot input)
Pipeline:
archivebox crawl URL | archivebox snapshot | archivebox extract
archivebox crawl --plugin=PARSER URL | archivebox snapshot | archivebox extract
Each command should:
- Accept URLs, snapshot_ids, or JSONL as input (args or stdin)
- Accept URLs, IDs, or JSONL as input (args or stdin)
- Output JSONL to stdout when piped (not TTY)
- Output human-readable to stderr when TTY
"""
@@ -84,6 +87,18 @@ class TestJSONLParsing(unittest.TestCase):
self.assertEqual(result['url'], 'https://example.com')
self.assertEqual(result['tags'], 'test,demo')
def test_parse_jsonl_crawl(self):
"""JSONL Crawl records should be parsed correctly."""
from archivebox.misc.jsonl import parse_line, TYPE_CRAWL
line = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com", "max_depth": 1}'
result = parse_line(line)
self.assertIsNotNone(result)
self.assertEqual(result['type'], TYPE_CRAWL)
self.assertEqual(result['id'], 'abc123')
self.assertEqual(result['urls'], 'https://example.com')
self.assertEqual(result['max_depth'], 1)
def test_parse_jsonl_with_id(self):
"""JSONL with id field should be recognized."""
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
@@ -139,47 +154,32 @@ class TestJSONLParsing(unittest.TestCase):
class TestJSONLOutput(unittest.TestCase):
"""Test JSONL output formatting."""
def test_snapshot_to_jsonl(self):
"""Snapshot model should serialize to JSONL correctly."""
from archivebox.misc.jsonl import snapshot_to_jsonl, TYPE_SNAPSHOT
def test_crawl_to_jsonl(self):
"""Crawl model should serialize to JSONL correctly."""
from archivebox.misc.jsonl import TYPE_CRAWL
# Create a mock snapshot
mock_snapshot = MagicMock()
mock_snapshot.id = 'test-uuid-1234'
mock_snapshot.url = 'https://example.com'
mock_snapshot.title = 'Example Title'
mock_snapshot.tags_str.return_value = 'tag1,tag2'
mock_snapshot.bookmarked_at = None
mock_snapshot.created_at = None
mock_snapshot.timestamp = '1234567890'
mock_snapshot.depth = 0
mock_snapshot.status = 'queued'
# Create a mock crawl with to_jsonl method configured
mock_crawl = MagicMock()
mock_crawl.to_jsonl.return_value = {
'type': TYPE_CRAWL,
'schema_version': '0.9.0',
'id': 'test-crawl-uuid',
'urls': 'https://example.com',
'status': 'queued',
'max_depth': 0,
'tags_str': 'tag1,tag2',
'label': '',
'created_at': None,
}
result = snapshot_to_jsonl(mock_snapshot)
self.assertEqual(result['type'], TYPE_SNAPSHOT)
self.assertEqual(result['id'], 'test-uuid-1234')
self.assertEqual(result['url'], 'https://example.com')
self.assertEqual(result['title'], 'Example Title')
result = mock_crawl.to_jsonl()
self.assertEqual(result['type'], TYPE_CRAWL)
self.assertEqual(result['id'], 'test-crawl-uuid')
self.assertEqual(result['urls'], 'https://example.com')
self.assertEqual(result['status'], 'queued')
def test_archiveresult_to_jsonl(self):
"""ArchiveResult model should serialize to JSONL correctly."""
from archivebox.misc.jsonl import archiveresult_to_jsonl, TYPE_ARCHIVERESULT
mock_result = MagicMock()
mock_result.id = 'result-uuid-5678'
mock_result.snapshot_id = 'snapshot-uuid-1234'
mock_result.extractor = 'title'
mock_result.status = 'succeeded'
mock_result.output = 'Example Title'
mock_result.start_ts = None
mock_result.end_ts = None
result = archiveresult_to_jsonl(mock_result)
self.assertEqual(result['type'], TYPE_ARCHIVERESULT)
self.assertEqual(result['id'], 'result-uuid-5678')
self.assertEqual(result['snapshot_id'], 'snapshot-uuid-1234')
self.assertEqual(result['extractor'], 'title')
self.assertEqual(result['status'], 'succeeded')
# Note: Snapshot and ArchiveResult serialization is tested in integration tests
# (TestPipingWorkflowIntegration) using real model instances, not mocks.
class TestReadArgsOrStdin(unittest.TestCase):
@@ -226,6 +226,20 @@ class TestReadArgsOrStdin(unittest.TestCase):
self.assertEqual(records[0]['url'], 'https://example.com')
self.assertEqual(records[0]['tags'], 'test')
def test_read_crawl_jsonl_from_stdin(self):
"""Should read Crawl JSONL from stdin."""
from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL
stdin_content = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com\\nhttps://foo.com"}\n'
stream = StringIO(stdin_content)
stream.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stream))
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_CRAWL)
self.assertEqual(records[0]['id'], 'abc123')
def test_skip_tty_stdin(self):
"""Should not read from TTY stdin (would block)."""
from archivebox.misc.jsonl import read_args_or_stdin
@@ -263,55 +277,23 @@ class TestCrawlCommand(unittest.TestCase):
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['url'], 'https://example.com')
def test_crawl_accepts_snapshot_id(self):
"""crawl should accept snapshot IDs as input."""
from archivebox.misc.jsonl import read_args_or_stdin
def test_crawl_output_format(self):
"""crawl should output Crawl JSONL records."""
from archivebox.misc.jsonl import TYPE_CRAWL
uuid = '01234567-89ab-cdef-0123-456789abcdef'
args = (uuid,)
records = list(read_args_or_stdin(args))
# Mock crawl output
crawl_output = {
'type': TYPE_CRAWL,
'schema_version': '0.9.0',
'id': 'test-crawl-id',
'urls': 'https://example.com',
'status': 'queued',
'max_depth': 0,
}
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['id'], uuid)
def test_crawl_accepts_jsonl(self):
"""crawl should accept JSONL with snapshot info."""
from archivebox.misc.jsonl import read_args_or_stdin
stdin = StringIO('{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['id'], 'abc123')
self.assertEqual(records[0]['url'], 'https://example.com')
def test_crawl_separates_existing_vs_new(self):
"""crawl should identify existing snapshots vs new URLs."""
# This tests the logic in discover_outlinks() that separates
# records with 'id' (existing) from records with just 'url' (new)
records = [
{'type': 'Snapshot', 'id': 'existing-id-1'}, # Existing (id only)
{'type': 'Snapshot', 'url': 'https://new-url.com'}, # New (url only)
{'type': 'Snapshot', 'id': 'existing-id-2', 'url': 'https://existing.com'}, # Existing (has id)
]
existing = []
new = []
for record in records:
if record.get('id') and not record.get('url'):
existing.append(record['id'])
elif record.get('id'):
existing.append(record['id']) # Has both id and url - treat as existing
elif record.get('url'):
new.append(record)
self.assertEqual(len(existing), 2)
self.assertEqual(len(new), 1)
self.assertEqual(new[0]['url'], 'https://new-url.com')
self.assertEqual(crawl_output['type'], TYPE_CRAWL)
self.assertIn('id', crawl_output)
self.assertIn('urls', crawl_output)
class TestSnapshotCommand(unittest.TestCase):
@@ -336,6 +318,20 @@ class TestSnapshotCommand(unittest.TestCase):
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['url'], 'https://example.com')
def test_snapshot_accepts_crawl_jsonl(self):
"""snapshot should accept Crawl JSONL as input."""
from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL
stdin = StringIO('{"type": "Crawl", "id": "abc123", "urls": "https://example.com"}\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_CRAWL)
self.assertEqual(records[0]['id'], 'abc123')
self.assertEqual(records[0]['urls'], 'https://example.com')
def test_snapshot_accepts_jsonl_with_metadata(self):
"""snapshot should accept JSONL with tags and other metadata."""
from archivebox.misc.jsonl import read_args_or_stdin
@@ -350,26 +346,9 @@ class TestSnapshotCommand(unittest.TestCase):
self.assertEqual(records[0]['tags'], 'tag1,tag2')
self.assertEqual(records[0]['title'], 'Test')
def test_snapshot_output_format(self):
"""snapshot output should include id and url."""
from archivebox.misc.jsonl import snapshot_to_jsonl
mock_snapshot = MagicMock()
mock_snapshot.id = 'test-id'
mock_snapshot.url = 'https://example.com'
mock_snapshot.title = 'Test'
mock_snapshot.tags_str.return_value = ''
mock_snapshot.bookmarked_at = None
mock_snapshot.created_at = None
mock_snapshot.timestamp = '123'
mock_snapshot.depth = 0
mock_snapshot.status = 'queued'
output = snapshot_to_jsonl(mock_snapshot)
self.assertIn('id', output)
self.assertIn('url', output)
self.assertEqual(output['type'], 'Snapshot')
# Note: Snapshot output format is tested in integration tests
# (TestPipingWorkflowIntegration.test_snapshot_creates_and_outputs_jsonl)
# using real Snapshot instances.
class TestExtractCommand(unittest.TestCase):
@@ -537,6 +516,86 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
"""Clean up test database."""
shutil.rmtree(cls.test_dir, ignore_errors=True)
def test_crawl_creates_and_outputs_jsonl(self):
"""
Test: archivebox crawl URL1 URL2 URL3
Should create a single Crawl with all URLs and output JSONL when piped.
"""
from archivebox.crawls.models import Crawl
from archivebox.misc.jsonl import TYPE_CRAWL
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# Create crawl with multiple URLs (as newline-separated string)
urls = 'https://test-crawl-1.example.com\nhttps://test-crawl-2.example.com'
crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id})
self.assertIsNotNone(crawl)
self.assertIsNotNone(crawl.id)
self.assertEqual(crawl.urls, urls)
self.assertEqual(crawl.status, 'queued')
# Verify URLs list
urls_list = crawl.get_urls_list()
self.assertEqual(len(urls_list), 2)
self.assertIn('https://test-crawl-1.example.com', urls_list)
self.assertIn('https://test-crawl-2.example.com', urls_list)
# Verify output format
output = crawl.to_jsonl()
self.assertEqual(output['type'], TYPE_CRAWL)
self.assertIn('id', output)
self.assertEqual(output['urls'], urls)
self.assertIn('schema_version', output)
def test_snapshot_accepts_crawl_jsonl(self):
"""
Test: archivebox crawl URL | archivebox snapshot
Snapshot should accept Crawl JSONL and create Snapshots for each URL.
"""
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
read_args_or_stdin,
TYPE_CRAWL, TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# Step 1: Create crawl (simulating 'archivebox crawl')
urls = 'https://crawl-to-snap-1.example.com\nhttps://crawl-to-snap-2.example.com'
crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id})
crawl_output = crawl.to_jsonl()
# Step 2: Parse crawl output as snapshot input
stdin = StringIO(json.dumps(crawl_output) + '\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_CRAWL)
# Step 3: Create snapshots from crawl URLs
created_snapshots = []
for url in crawl.get_urls_list():
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
self.assertEqual(len(created_snapshots), 2)
# Verify snapshot output
for snapshot in created_snapshots:
output = snapshot.to_jsonl()
self.assertEqual(output['type'], TYPE_SNAPSHOT)
self.assertIn(output['url'], [
'https://crawl-to-snap-1.example.com',
'https://crawl-to-snap-2.example.com'
])
def test_snapshot_creates_and_outputs_jsonl(self):
"""
Test: archivebox snapshot URL
@@ -544,7 +603,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
"""
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record, snapshot_to_jsonl,
read_args_or_stdin, write_record,
TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
@@ -566,7 +625,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
self.assertEqual(snapshot.url, url)
# Verify output format
output = snapshot_to_jsonl(snapshot)
output = snapshot.to_jsonl()
self.assertEqual(output['type'], TYPE_SNAPSHOT)
self.assertIn('id', output)
self.assertEqual(output['url'], url)
@@ -578,7 +637,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
"""
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.misc.jsonl import (
snapshot_to_jsonl, read_args_or_stdin,
read_args_or_stdin,
TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
@@ -589,7 +648,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
url = 'https://test-extract-1.example.com'
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_jsonl({'url': url}, overrides=overrides)
snapshot_output = snapshot_to_jsonl(snapshot)
snapshot_output = snapshot.to_jsonl()
# Step 2: Parse snapshot output as extract input
stdin = StringIO(json.dumps(snapshot_output) + '\n')
@@ -609,143 +668,59 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
self.assertIn(str(snapshot.id), snapshot_ids)
def test_crawl_outputs_discovered_urls(self):
"""
Test: archivebox crawl URL
Should create snapshot, run plugins, output discovered URLs.
"""
from archivebox.hooks import collect_urls_from_plugins
from archivebox.misc.jsonl import TYPE_SNAPSHOT
# Create a mock snapshot directory with urls.jsonl
test_snapshot_dir = Path(self.test_dir) / 'archive' / 'test-crawl-snapshot'
test_snapshot_dir.mkdir(parents=True, exist_ok=True)
# Create mock extractor output
(test_snapshot_dir / 'parse_html_urls').mkdir()
(test_snapshot_dir / 'parse_html_urls' / 'urls.jsonl').write_text(
'{"url": "https://discovered-1.com"}\n'
'{"url": "https://discovered-2.com", "title": "Discovered 2"}\n'
)
# Collect URLs (as crawl does)
discovered = collect_urls_from_plugins(test_snapshot_dir)
self.assertEqual(len(discovered), 2)
# Add crawl metadata (as crawl does)
for entry in discovered:
entry['type'] = TYPE_SNAPSHOT
entry['depth'] = 1
entry['via_snapshot'] = 'test-crawl-snapshot'
# Verify output format
self.assertEqual(discovered[0]['type'], TYPE_SNAPSHOT)
self.assertEqual(discovered[0]['depth'], 1)
self.assertEqual(discovered[0]['url'], 'https://discovered-1.com')
def test_full_pipeline_snapshot_extract(self):
"""
Test: archivebox snapshot URL | archivebox extract
This is equivalent to: archivebox add URL
"""
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
get_or_create_snapshot, snapshot_to_jsonl, read_args_or_stdin,
TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# === archivebox snapshot https://example.com ===
url = 'https://test-pipeline-1.example.com'
snapshot = get_or_create_snapshot({'url': url}, created_by_id=created_by_id)
snapshot_jsonl = json.dumps(snapshot_to_jsonl(snapshot))
# === | archivebox extract ===
stdin = StringIO(snapshot_jsonl + '\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
# Extract should receive the snapshot ID
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['id'], str(snapshot.id))
# Verify snapshot exists in DB
db_snapshot = Snapshot.objects.get(id=snapshot.id)
self.assertEqual(db_snapshot.url, url)
def test_full_pipeline_crawl_snapshot_extract(self):
"""
Test: archivebox crawl URL | archivebox snapshot | archivebox extract
This is equivalent to: archivebox add --depth=1 URL
This is equivalent to: archivebox add --depth=0 URL
"""
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
get_or_create_snapshot, snapshot_to_jsonl, read_args_or_stdin,
TYPE_SNAPSHOT
read_args_or_stdin,
TYPE_CRAWL, TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.hooks import collect_urls_from_plugins
created_by_id = get_or_create_system_user_pk()
# === archivebox crawl https://example.com ===
# Step 1: Create snapshot for starting URL
start_url = 'https://test-crawl-pipeline.example.com'
start_snapshot = get_or_create_snapshot({'url': start_url}, created_by_id=created_by_id)
# Step 2: Simulate extractor output with discovered URLs
snapshot_dir = Path(self.test_dir) / 'archive' / str(start_snapshot.timestamp)
snapshot_dir.mkdir(parents=True, exist_ok=True)
(snapshot_dir / 'parse_html_urls').mkdir(exist_ok=True)
(snapshot_dir / 'parse_html_urls' / 'urls.jsonl').write_text(
'{"url": "https://outlink-1.example.com"}\n'
'{"url": "https://outlink-2.example.com"}\n'
)
# Step 3: Collect discovered URLs (crawl output)
discovered = collect_urls_from_plugins(snapshot_dir)
crawl_output = []
for entry in discovered:
entry['type'] = TYPE_SNAPSHOT
entry['depth'] = 1
crawl_output.append(json.dumps(entry))
url = 'https://test-pipeline-full.example.com'
crawl = Crawl.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
crawl_jsonl = json.dumps(crawl.to_jsonl())
# === | archivebox snapshot ===
stdin = StringIO('\n'.join(crawl_output) + '\n')
stdin = StringIO(crawl_jsonl + '\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 2)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_CRAWL)
# Create snapshots for discovered URLs
# Create snapshots from crawl
created_snapshots = []
for record in records:
snap = get_or_create_snapshot(record, created_by_id=created_by_id)
created_snapshots.append(snap)
if record.get('type') == TYPE_CRAWL:
crawl_id = record.get('id')
if crawl_id:
db_crawl = Crawl.objects.get(id=crawl_id)
for crawl_url in db_crawl.get_urls_list():
snapshot = Snapshot.from_jsonl({'url': crawl_url}, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
self.assertEqual(len(created_snapshots), 2)
self.assertEqual(len(created_snapshots), 1)
self.assertEqual(created_snapshots[0].url, url)
# === | archivebox extract ===
snapshot_jsonl_lines = [json.dumps(snapshot_to_jsonl(s)) for s in created_snapshots]
snapshot_jsonl_lines = [json.dumps(s.to_jsonl()) for s in created_snapshots]
stdin = StringIO('\n'.join(snapshot_jsonl_lines) + '\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 2)
# Verify all snapshots exist in DB
for record in records:
db_snapshot = Snapshot.objects.get(id=record['id'])
self.assertIn(db_snapshot.url, [
'https://outlink-1.example.com',
'https://outlink-2.example.com'
])
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_SNAPSHOT)
self.assertEqual(records[0]['id'], str(created_snapshots[0].id))
class TestDepthWorkflows(unittest.TestCase):
@@ -770,47 +745,44 @@ class TestDepthWorkflows(unittest.TestCase):
def test_depth_0_workflow(self):
"""
Test: archivebox snapshot URL | archivebox extract
Test: archivebox crawl URL | archivebox snapshot | archivebox extract
Depth 0: Only archive the specified URL, no crawling.
Depth 0: Only archive the specified URL, no recursive crawling.
"""
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import get_or_create_snapshot
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# Create snapshot
# Create crawl with depth 0
url = 'https://depth0-test.example.com'
snapshot = get_or_create_snapshot({'url': url}, created_by_id=created_by_id)
crawl = Crawl.from_jsonl({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id})
# Verify only one snapshot created
self.assertEqual(Snapshot.objects.filter(url=url).count(), 1)
self.assertEqual(crawl.max_depth, 0)
# Create snapshot
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
self.assertEqual(snapshot.url, url)
def test_depth_1_workflow(self):
"""
Test: archivebox crawl URL | archivebox snapshot | archivebox extract
def test_depth_metadata_in_crawl(self):
"""Test that depth metadata is stored in Crawl."""
from archivebox.crawls.models import Crawl
from archivebox.base_models.models import get_or_create_system_user_pk
Depth 1: Archive URL + all outlinks from that URL.
"""
# This is tested in test_full_pipeline_crawl_snapshot_extract
pass
created_by_id = get_or_create_system_user_pk()
def test_depth_metadata_propagation(self):
"""Test that depth metadata propagates through the pipeline."""
from archivebox.misc.jsonl import TYPE_SNAPSHOT
# Create crawl with depth
crawl = Crawl.from_jsonl(
{'url': 'https://depth-meta-test.example.com', 'max_depth': 2},
overrides={'created_by_id': created_by_id}
)
# Simulate crawl output with depth metadata
crawl_output = [
{'type': TYPE_SNAPSHOT, 'url': 'https://hop1.com', 'depth': 1, 'via_snapshot': 'root'},
{'type': TYPE_SNAPSHOT, 'url': 'https://hop2.com', 'depth': 2, 'via_snapshot': 'hop1'},
]
self.assertEqual(crawl.max_depth, 2)
# Verify depth is preserved
for entry in crawl_output:
self.assertIn('depth', entry)
self.assertIn('via_snapshot', entry)
# Verify in JSONL output
output = crawl.to_jsonl()
self.assertEqual(output['max_depth'], 2)
class TestParserPluginWorkflows(unittest.TestCase):
@@ -963,6 +935,26 @@ class TestEdgeCases(unittest.TestCase):
# UUID
self.assertEqual(records[2]['id'], '01234567-89ab-cdef-0123-456789abcdef')
def test_crawl_with_multiple_urls(self):
"""Crawl should handle multiple URLs in a single crawl."""
from archivebox.misc.jsonl import TYPE_CRAWL
# Test crawl JSONL with multiple URLs
crawl_output = {
'type': TYPE_CRAWL,
'id': 'test-multi-url-crawl',
'urls': 'https://url1.com\nhttps://url2.com\nhttps://url3.com',
'max_depth': 0,
}
# Parse the URLs
urls = [u.strip() for u in crawl_output['urls'].split('\n') if u.strip()]
self.assertEqual(len(urls), 3)
self.assertEqual(urls[0], 'https://url1.com')
self.assertEqual(urls[1], 'https://url2.com')
self.assertEqual(urls[2], 'https://url3.com')
if __name__ == '__main__':
unittest.main()

View File

@@ -100,6 +100,7 @@ class ConstantsDict(Mapping):
DATABASE_FILE: Path = DATA_DIR / SQL_INDEX_FILENAME
JSON_INDEX_FILENAME: str = 'index.json'
JSONL_INDEX_FILENAME: str = 'index.jsonl'
HTML_INDEX_FILENAME: str = 'index.html'
ROBOTS_TXT_FILENAME: str = 'robots.txt'
FAVICON_FILENAME: str = 'favicon.ico'
@@ -187,6 +188,7 @@ class ConstantsDict(Mapping):
"queue.sqlite3-wal",
"queue.sqlite3-shm",
JSON_INDEX_FILENAME,
JSONL_INDEX_FILENAME,
HTML_INDEX_FILENAME,
ROBOTS_TXT_FILENAME,
FAVICON_FILENAME,

View File

@@ -91,6 +91,19 @@ class Tag(ModelWithSerializers):
def api_url(self) -> str:
return reverse_lazy('api-1:get_tag', args=[self.id])
def to_jsonl(self) -> dict:
"""
Convert Tag model instance to a JSONL record.
"""
from archivebox.config import VERSION
return {
'type': 'Tag',
'schema_version': VERSION,
'id': str(self.id),
'name': self.name,
'slug': self.slug,
}
@staticmethod
def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None):
"""
@@ -103,19 +116,18 @@ class Tag(ModelWithSerializers):
Returns:
Tag instance or None
"""
from archivebox.misc.jsonl import get_or_create_tag
try:
tag = get_or_create_tag(record)
# Auto-attach to snapshot if in overrides
if overrides and 'snapshot' in overrides and tag:
overrides['snapshot'].tags.add(tag)
return tag
except ValueError:
name = record.get('name')
if not name:
return None
tag, _ = Tag.objects.get_or_create(name=name)
# Auto-attach to snapshot if in overrides
if overrides and 'snapshot' in overrides and tag:
overrides['snapshot'].tags.add(tag)
return tag
class SnapshotTag(models.Model):
id = models.AutoField(primary_key=True)
@@ -415,10 +427,11 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
Transaction handling:
1. Copy files INSIDE transaction
2. Create symlink INSIDE transaction
3. Update fs_version INSIDE transaction (done by save())
4. Exit transaction (DB commit)
5. Delete old files OUTSIDE transaction (after commit)
2. Convert index.json to index.jsonl INSIDE transaction
3. Create symlink INSIDE transaction
4. Update fs_version INSIDE transaction (done by save())
5. Exit transaction (DB commit)
6. Delete old files OUTSIDE transaction (after commit)
"""
import shutil
from django.db import transaction
@@ -427,11 +440,13 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
new_dir = self.get_storage_path_for_version('0.9.0')
if not old_dir.exists() or old_dir == new_dir or new_dir.exists():
# Even if no directory migration needed, still convert index format
self.convert_index_json_to_jsonl()
return
new_dir.mkdir(parents=True, exist_ok=True)
# Copy all files (idempotent)
# Copy all files (idempotent), skipping index.json (will be converted to jsonl)
for old_file in old_dir.rglob('*'):
if not old_file.is_file():
continue
@@ -456,6 +471,9 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
missing = old_files.keys() - new_files.keys()
raise Exception(f"Migration incomplete: missing {missing}")
# Convert index.json to index.jsonl in the new directory
self.convert_index_json_to_jsonl()
# Create backwards-compat symlink (INSIDE transaction)
symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
if symlink_path.is_symlink():
@@ -557,9 +575,9 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
@classmethod
def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
"""
Load existing Snapshot from DB by reading index.json.
Load existing Snapshot from DB by reading index.jsonl or index.json.
Reads index.json, extracts url+timestamp, queries DB.
Reads index file, extracts url+timestamp, queries DB.
Returns existing Snapshot or None if not found/invalid.
Does NOT create new snapshots.
@@ -567,21 +585,38 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
"""
import json
index_path = snapshot_dir / 'index.json'
if not index_path.exists():
return None
# Try index.jsonl first (new format), then index.json (legacy)
jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
try:
with open(index_path) as f:
data = json.load(f)
except:
data = None
if jsonl_path.exists():
try:
with open(jsonl_path) as f:
for line in f:
line = line.strip()
if line.startswith('{'):
record = json.loads(line)
if record.get('type') == 'Snapshot':
data = record
break
except (json.JSONDecodeError, OSError):
pass
elif json_path.exists():
try:
with open(json_path) as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
pass
if not data:
return None
url = data.get('url')
if not url:
return None
# Get timestamp - prefer index.json, fallback to folder name
# Get timestamp - prefer index file, fallback to folder name
timestamp = cls._select_best_timestamp(
index_timestamp=data.get('timestamp'),
folder_name=snapshot_dir.name
@@ -611,14 +646,31 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
"""
import json
index_path = snapshot_dir / 'index.json'
if not index_path.exists():
return None
# Try index.jsonl first (new format), then index.json (legacy)
jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
try:
with open(index_path) as f:
data = json.load(f)
except:
data = None
if jsonl_path.exists():
try:
with open(jsonl_path) as f:
for line in f:
line = line.strip()
if line.startswith('{'):
record = json.loads(line)
if record.get('type') == 'Snapshot':
data = record
break
except (json.JSONDecodeError, OSError):
pass
elif json_path.exists():
try:
with open(json_path) as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
pass
if not data:
return None
url = data.get('url')
@@ -721,26 +773,40 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
# Index.json Reconciliation
# =========================================================================
def reconcile_with_index_json(self):
def reconcile_with_index(self):
"""
Merge index.json with DB. DB is source of truth.
Merge index.json/index.jsonl with DB. DB is source of truth.
- Title: longest non-URL
- Tags: union
- ArchiveResults: keep both (by plugin+start_ts)
Writes back in 0.9.x format.
Converts index.json to index.jsonl if needed, then writes back in JSONL format.
Used by: archivebox update (to sync index.json with DB)
Used by: archivebox update (to sync index with DB)
"""
import json
index_path = Path(self.output_dir) / 'index.json'
# Try to convert index.json to index.jsonl first
self.convert_index_json_to_jsonl()
# Check for index.jsonl (preferred) or index.json (legacy)
jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
index_data = {}
if index_path.exists():
if jsonl_path.exists():
# Read from JSONL format
jsonl_data = self.read_index_jsonl()
if jsonl_data['snapshot']:
index_data = jsonl_data['snapshot']
# Convert archive_results list to expected format
index_data['archive_results'] = jsonl_data['archive_results']
elif json_path.exists():
# Fallback to legacy JSON format
try:
with open(index_path) as f:
with open(json_path) as f:
index_data = json.load(f)
except:
pass
@@ -754,8 +820,12 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
# Merge ArchiveResults
self._merge_archive_results_from_index(index_data)
# Write back
self.write_index_json()
# Write back in JSONL format
self.write_index_jsonl()
def reconcile_with_index_json(self):
"""Deprecated: use reconcile_with_index() instead."""
return self.reconcile_with_index()
def _merge_title_from_index(self, index_data: dict):
"""Merge title - prefer longest non-URL title."""
@@ -831,12 +901,15 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
except:
pass
# Support both 'output' (legacy) and 'output_str' (new JSONL) field names
output_str = result_data.get('output_str') or result_data.get('output', '')
ArchiveResult.objects.create(
snapshot=self,
plugin=plugin,
hook_name=result_data.get('hook_name', ''),
status=result_data.get('status', 'failed'),
output_str=result_data.get('output', ''),
output_str=output_str,
cmd=result_data.get('cmd', []),
pwd=result_data.get('pwd', str(self.output_dir)),
start_ts=start_ts,
@@ -846,7 +919,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
pass
def write_index_json(self):
"""Write index.json in 0.9.x format."""
"""Write index.json in 0.9.x format (deprecated, use write_index_jsonl)."""
import json
index_path = Path(self.output_dir) / 'index.json'
@@ -877,6 +950,174 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
with open(index_path, 'w') as f:
json.dump(data, f, indent=2, sort_keys=True)
def write_index_jsonl(self):
"""
Write index.jsonl in flat JSONL format.
Each line is a JSON record with a 'type' field:
- Snapshot: snapshot metadata (crawl_id, url, tags, etc.)
- ArchiveResult: extractor results (plugin, status, output, etc.)
- Binary: binary info used for the extraction
- Process: process execution details (cmd, exit_code, timing, etc.)
"""
import json
index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
index_path.parent.mkdir(parents=True, exist_ok=True)
# Collect unique binaries and processes from archive results
binaries_seen = set()
processes_seen = set()
with open(index_path, 'w') as f:
# Write Snapshot record first
snapshot_record = self.to_jsonl()
snapshot_record['crawl_id'] = str(self.crawl_id) if self.crawl_id else None
snapshot_record['fs_version'] = self.fs_version
f.write(json.dumps(snapshot_record) + '\n')
# Write ArchiveResult records with their associated Binary and Process
for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts'):
# Write Binary record if not already written
if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen:
binaries_seen.add(ar.process.binary_id)
f.write(json.dumps(ar.process.binary.to_jsonl()) + '\n')
# Write Process record if not already written
if ar.process and ar.process_id not in processes_seen:
processes_seen.add(ar.process_id)
f.write(json.dumps(ar.process.to_jsonl()) + '\n')
# Write ArchiveResult record
f.write(json.dumps(ar.to_jsonl()) + '\n')
def read_index_jsonl(self) -> dict:
"""
Read index.jsonl and return parsed records grouped by type.
Returns dict with keys: 'snapshot', 'archive_results', 'binaries', 'processes'
"""
import json
from archivebox.misc.jsonl import (
TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY, TYPE_PROCESS,
)
index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
result = {
'snapshot': None,
'archive_results': [],
'binaries': [],
'processes': [],
}
if not index_path.exists():
return result
with open(index_path, 'r') as f:
for line in f:
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
record = json.loads(line)
record_type = record.get('type')
if record_type == TYPE_SNAPSHOT:
result['snapshot'] = record
elif record_type == TYPE_ARCHIVERESULT:
result['archive_results'].append(record)
elif record_type == TYPE_BINARY:
result['binaries'].append(record)
elif record_type == TYPE_PROCESS:
result['processes'].append(record)
except json.JSONDecodeError:
continue
return result
def convert_index_json_to_jsonl(self) -> bool:
"""
Convert index.json to index.jsonl format.
Reads existing index.json, creates index.jsonl, and removes index.json.
Returns True if conversion was performed, False if no conversion needed.
"""
import json
json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
# Skip if already converted or no json file exists
if jsonl_path.exists() or not json_path.exists():
return False
try:
with open(json_path, 'r') as f:
data = json.load(f)
except (json.JSONDecodeError, OSError):
return False
# Detect format version and extract records
fs_version = data.get('fs_version', '0.7.0')
jsonl_path.parent.mkdir(parents=True, exist_ok=True)
with open(jsonl_path, 'w') as f:
# Write Snapshot record
snapshot_record = {
'type': 'Snapshot',
'id': str(self.id),
'crawl_id': str(self.crawl_id) if self.crawl_id else None,
'url': data.get('url', self.url),
'timestamp': data.get('timestamp', self.timestamp),
'title': data.get('title', self.title or ''),
'tags': data.get('tags', ''),
'fs_version': fs_version,
'bookmarked_at': data.get('bookmarked_at'),
'created_at': data.get('created_at'),
}
f.write(json.dumps(snapshot_record) + '\n')
# Handle 0.8.x/0.9.x format (archive_results list)
for result_data in data.get('archive_results', []):
ar_record = {
'type': 'ArchiveResult',
'snapshot_id': str(self.id),
'plugin': result_data.get('plugin', ''),
'status': result_data.get('status', ''),
'output_str': result_data.get('output', ''),
'start_ts': result_data.get('start_ts'),
'end_ts': result_data.get('end_ts'),
}
if result_data.get('cmd'):
ar_record['cmd'] = result_data['cmd']
f.write(json.dumps(ar_record) + '\n')
# Handle 0.7.x format (history dict)
if 'history' in data and isinstance(data['history'], dict):
for plugin, result_list in data['history'].items():
if not isinstance(result_list, list):
continue
for result_data in result_list:
ar_record = {
'type': 'ArchiveResult',
'snapshot_id': str(self.id),
'plugin': result_data.get('plugin') or result_data.get('extractor') or plugin,
'status': result_data.get('status', ''),
'output_str': result_data.get('output', ''),
'start_ts': result_data.get('start_ts'),
'end_ts': result_data.get('end_ts'),
}
if result_data.get('cmd'):
ar_record['cmd'] = result_data['cmd']
f.write(json.dumps(ar_record) + '\n')
# Remove old index.json after successful conversion
try:
json_path.unlink()
except OSError:
pass
return True
# =========================================================================
# Snapshot Utilities
# =========================================================================
@@ -1169,6 +1410,25 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
return False
def to_jsonl(self) -> dict:
"""
Convert Snapshot model instance to a JSONL record.
"""
from archivebox.config import VERSION
return {
'type': 'Snapshot',
'schema_version': VERSION,
'id': str(self.id),
'url': self.url,
'title': self.title,
'tags': self.tags_str() if hasattr(self, 'tags_str') else '',
'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'timestamp': self.timestamp,
'depth': getattr(self, 'depth', 0),
'status': self.status if hasattr(self, 'status') else None,
}
@staticmethod
def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None, queue_for_extraction: bool = True):
"""
@@ -2001,6 +2261,40 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
"""Convenience property to access the user who created this archive result via its snapshot's crawl."""
return self.snapshot.crawl.created_by
def to_jsonl(self) -> dict:
"""
Convert ArchiveResult model instance to a JSONL record.
"""
from archivebox.config import VERSION
record = {
'type': 'ArchiveResult',
'schema_version': VERSION,
'id': str(self.id),
'snapshot_id': str(self.snapshot_id),
'plugin': self.plugin,
'hook_name': self.hook_name,
'status': self.status,
'output_str': self.output_str,
'start_ts': self.start_ts.isoformat() if self.start_ts else None,
'end_ts': self.end_ts.isoformat() if self.end_ts else None,
}
# Include optional fields if set
if self.output_json:
record['output_json'] = self.output_json
if self.output_files:
record['output_files'] = self.output_files
if self.output_size:
record['output_size'] = self.output_size
if self.output_mimetypes:
record['output_mimetypes'] = self.output_mimetypes
if self.cmd:
record['cmd'] = self.cmd
if self.cmd_version:
record['cmd_version'] = self.cmd_version
if self.process_id:
record['process_id'] = str(self.process_id)
return record
def save(self, *args, **kwargs):
is_new = self._state.adding

View File

@@ -134,6 +134,67 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
def api_url(self) -> str:
return reverse_lazy('api-1:get_crawl', args=[self.id])
def to_jsonl(self) -> dict:
"""
Convert Crawl model instance to a JSONL record.
"""
from archivebox.config import VERSION
return {
'type': 'Crawl',
'schema_version': VERSION,
'id': str(self.id),
'urls': self.urls,
'status': self.status,
'max_depth': self.max_depth,
'tags_str': self.tags_str,
'label': self.label,
'created_at': self.created_at.isoformat() if self.created_at else None,
}
@staticmethod
def from_jsonl(record: dict, overrides: dict = None):
"""
Create or get a Crawl from a JSONL record.
Args:
record: Dict with 'urls' (required), optional 'max_depth', 'tags_str', 'label'
overrides: Dict of field overrides (e.g., created_by_id)
Returns:
Crawl instance or None if invalid
"""
from django.utils import timezone
overrides = overrides or {}
# Check if crawl already exists by ID
crawl_id = record.get('id')
if crawl_id:
try:
return Crawl.objects.get(id=crawl_id)
except Crawl.DoesNotExist:
pass
# Get URLs - can be string (newline-separated) or from 'url' field
urls = record.get('urls', '')
if not urls and record.get('url'):
urls = record['url']
if not urls:
return None
# Create new crawl (status stays QUEUED, not started)
crawl = Crawl.objects.create(
urls=urls,
max_depth=record.get('max_depth', record.get('depth', 0)),
tags_str=record.get('tags_str', record.get('tags', '')),
label=record.get('label', ''),
status=Crawl.StatusChoices.QUEUED,
retry_at=timezone.now(),
**overrides,
)
return crawl
@property
def output_dir_parent(self) -> str:
"""Construct parent directory: users/{user_id}/crawls/{YYYYMMDD}"""

View File

@@ -242,6 +242,24 @@ class Binary(ModelWithHealthStats):
'is_valid': self.is_valid,
}
def to_jsonl(self) -> dict:
"""
Convert Binary model instance to a JSONL record.
"""
from archivebox.config import VERSION
return {
'type': 'Binary',
'schema_version': VERSION,
'id': str(self.id),
'machine_id': str(self.machine_id),
'name': self.name,
'binprovider': self.binprovider,
'abspath': self.abspath,
'version': self.version,
'sha256': self.sha256,
'status': self.status,
}
@staticmethod
def from_jsonl(record: dict, overrides: dict = None):
"""
@@ -606,6 +624,32 @@ class Process(ModelWithHealthStats):
return self.archiveresult.hook_name
return ''
def to_jsonl(self) -> dict:
"""
Convert Process model instance to a JSONL record.
"""
from archivebox.config import VERSION
record = {
'type': 'Process',
'schema_version': VERSION,
'id': str(self.id),
'machine_id': str(self.machine_id),
'cmd': self.cmd,
'pwd': self.pwd,
'status': self.status,
'exit_code': self.exit_code,
'started_at': self.started_at.isoformat() if self.started_at else None,
'ended_at': self.ended_at.isoformat() if self.ended_at else None,
}
# Include optional fields if set
if self.binary_id:
record['binary_id'] = str(self.binary_id)
if self.pid:
record['pid'] = self.pid
if self.timeout:
record['timeout'] = self.timeout
return record
def update_and_requeue(self, **kwargs):
"""
Update process fields and requeue for worker state machine.

View File

@@ -4,9 +4,15 @@ JSONL (JSON Lines) utilities for ArchiveBox.
Provides functions for reading, writing, and processing typed JSONL records.
All CLI commands that accept stdin can read both plain URLs and typed JSONL.
CLI Pipeline:
archivebox crawl URL -> {"type": "Crawl", "id": "...", "urls": "...", ...}
archivebox snapshot -> {"type": "Snapshot", "id": "...", "url": "...", ...}
archivebox extract -> {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", ...}
Typed JSONL Format:
{"type": "Snapshot", "url": "https://example.com", "title": "...", "tags": "..."}
{"type": "ArchiveResult", "snapshot_id": "...", "extractor": "wget", ...}
{"type": "Crawl", "id": "...", "urls": "...", "max_depth": 0, ...}
{"type": "Snapshot", "id": "...", "url": "https://example.com", "title": "...", ...}
{"type": "ArchiveResult", "id": "...", "snapshot_id": "...", "plugin": "...", ...}
{"type": "Tag", "name": "..."}
Plain URLs (also supported):
@@ -18,7 +24,7 @@ __package__ = 'archivebox.misc'
import sys
import json
from typing import Iterator, Dict, Any, Optional, TextIO, Callable, Union, List
from typing import Iterator, Dict, Any, Optional, TextIO, Callable
from pathlib import Path
@@ -28,8 +34,10 @@ TYPE_ARCHIVERESULT = 'ArchiveResult'
TYPE_TAG = 'Tag'
TYPE_CRAWL = 'Crawl'
TYPE_BINARY = 'Binary'
TYPE_PROCESS = 'Process'
TYPE_MACHINE = 'Machine'
VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY}
VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY, TYPE_PROCESS, TYPE_MACHINE}
def parse_line(line: str) -> Optional[Dict[str, Any]]:
@@ -152,81 +160,6 @@ def filter_by_type(records: Iterator[Dict[str, Any]], record_type: str) -> Itera
yield record
def snapshot_to_jsonl(snapshot) -> Dict[str, Any]:
"""
Convert a Snapshot model instance to a JSONL record.
"""
return {
'type': TYPE_SNAPSHOT,
'id': str(snapshot.id),
'url': snapshot.url,
'title': snapshot.title,
'tags': snapshot.tags_str() if hasattr(snapshot, 'tags_str') else '',
'bookmarked_at': snapshot.bookmarked_at.isoformat() if snapshot.bookmarked_at else None,
'created_at': snapshot.created_at.isoformat() if snapshot.created_at else None,
'timestamp': snapshot.timestamp,
'depth': getattr(snapshot, 'depth', 0),
'status': snapshot.status if hasattr(snapshot, 'status') else None,
}
def archiveresult_to_jsonl(result) -> Dict[str, Any]:
"""
Convert an ArchiveResult model instance to a JSONL record.
"""
record = {
'type': TYPE_ARCHIVERESULT,
'id': str(result.id),
'snapshot_id': str(result.snapshot_id),
'plugin': result.plugin,
'hook_name': result.hook_name,
'status': result.status,
'output_str': result.output_str,
'start_ts': result.start_ts.isoformat() if result.start_ts else None,
'end_ts': result.end_ts.isoformat() if result.end_ts else None,
}
# Include optional fields if set
if result.output_json:
record['output_json'] = result.output_json
if result.output_files:
record['output_files'] = result.output_files
if result.output_size:
record['output_size'] = result.output_size
if result.output_mimetypes:
record['output_mimetypes'] = result.output_mimetypes
if result.cmd:
record['cmd'] = result.cmd
if result.cmd_version:
record['cmd_version'] = result.cmd_version
return record
def tag_to_jsonl(tag) -> Dict[str, Any]:
"""
Convert a Tag model instance to a JSONL record.
"""
return {
'type': TYPE_TAG,
'id': str(tag.id),
'name': tag.name,
'slug': tag.slug,
}
def crawl_to_jsonl(crawl) -> Dict[str, Any]:
"""
Convert a Crawl model instance to a JSONL record.
"""
return {
'type': TYPE_CRAWL,
'id': str(crawl.id),
'urls': crawl.urls,
'status': crawl.status,
'max_depth': crawl.max_depth,
'created_at': crawl.created_at.isoformat() if crawl.created_at else None,
}
def process_records(
records: Iterator[Dict[str, Any]],
handlers: Dict[str, Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]]
@@ -250,60 +183,3 @@ def process_records(
yield result
def get_or_create_tag(record: Dict[str, Any]):
"""
Get or create a Tag from a JSONL record.
Returns the Tag instance.
"""
from archivebox.core.models import Tag
name = record.get('name')
if not name:
raise ValueError("Record missing required 'name' field")
tag, _ = Tag.objects.get_or_create(name=name)
return tag
def process_jsonl_records(records: Iterator[Dict[str, Any]], created_by_id: Optional[int] = None) -> Dict[str, List]:
"""
Process JSONL records, creating Tags and Snapshots as needed.
Args:
records: Iterator of JSONL record dicts
created_by_id: User ID for created objects
Returns:
Dict with 'tags' and 'snapshots' lists of created objects
"""
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = created_by_id or get_or_create_system_user_pk()
results = {
'tags': [],
'snapshots': [],
}
for record in records:
record_type = record.get('type', TYPE_SNAPSHOT)
if record_type == TYPE_TAG:
try:
tag = get_or_create_tag(record)
results['tags'].append(tag)
except ValueError:
continue
elif record_type == TYPE_SNAPSHOT or 'url' in record:
try:
from archivebox.core.models import Snapshot
overrides = {'created_by_id': created_by_id} if created_by_id else {}
snapshot = Snapshot.from_jsonl(record, overrides=overrides)
if snapshot:
results['snapshots'].append(snapshot)
except ValueError:
continue
return results

View File

@@ -58,9 +58,10 @@ def parse_json_main_index(out_dir: Path) -> Iterator[SnapshotDict]:
def parse_json_links_details(out_dir: Path) -> Iterator[SnapshotDict]:
"""
Parse links from individual snapshot index.json files in archive directories.
Parse links from individual snapshot index.jsonl/index.json files in archive directories.
Walks through archive/*/index.json files to discover orphaned snapshots.
Walks through archive/*/index.jsonl and archive/*/index.json files to discover orphaned snapshots.
Prefers index.jsonl (new format) over index.json (legacy format).
"""
from archivebox.config import CONSTANTS
@@ -72,19 +73,36 @@ def parse_json_links_details(out_dir: Path) -> Iterator[SnapshotDict]:
if not entry.is_dir():
continue
index_file = Path(entry.path) / 'index.json'
if not index_file.exists():
continue
# Try index.jsonl first (new format)
jsonl_file = Path(entry.path) / CONSTANTS.JSONL_INDEX_FILENAME
json_file = Path(entry.path) / CONSTANTS.JSON_INDEX_FILENAME
try:
with open(index_file, 'r', encoding='utf-8') as f:
link = json.load(f)
link = None
if jsonl_file.exists():
try:
with open(jsonl_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line.startswith('{'):
record = json.loads(line)
if record.get('type') == 'Snapshot':
link = record
break
except (json.JSONDecodeError, KeyError, TypeError):
pass
if link is None and json_file.exists():
try:
with open(json_file, 'r', encoding='utf-8') as f:
link = json.load(f)
except (json.JSONDecodeError, KeyError, TypeError):
pass
if link:
yield {
'url': link.get('url', ''),
'timestamp': link.get('timestamp', entry.name),
'title': link.get('title'),
'tags': link.get('tags', ''),
}
except (json.JSONDecodeError, KeyError, TypeError):
continue