fix: correct CLI pipeline data flow for crawl -> snapshot -> extract

- archivebox crawl: creates Crawl records, outputs Crawl JSONL
- archivebox snapshot: accepts Crawl JSONL, creates Snapshots, outputs Snapshot JSONL
- archivebox extract: accepts Snapshot JSONL, runs extractors, outputs ArchiveResult JSONL

Changes:
- Add Crawl.from_jsonl() method for creating Crawl from JSONL records
- Rewrite archivebox_crawl.py to create Crawl jobs without immediately starting them
- Update archivebox_snapshot.py to accept both Crawl JSONL and plain URLs
- Update jsonl.py docstring to document the pipeline
This commit is contained in:
Claude
2025-12-30 19:42:41 +00:00
parent ae648c9bc1
commit 69965a2782
4 changed files with 193 additions and 236 deletions

View File

@@ -1,222 +1,113 @@
#!/usr/bin/env python3
"""
archivebox crawl [urls_or_snapshot_ids...] [--depth=N] [--plugin=NAME]
archivebox crawl [urls...] [--depth=N] [--tag=TAG]
Discover outgoing links from URLs or existing Snapshots.
If a URL is passed, creates a Snapshot for it first, then runs parser plugins.
If a snapshot_id is passed, runs parser plugins on the existing Snapshot.
Outputs discovered outlink URLs as JSONL.
Pipe the output to `archivebox snapshot` to archive the discovered URLs.
Create Crawl jobs from URLs. Accepts URLs as arguments, from stdin, or via JSONL.
Does NOT immediately start the crawl - pipe to `archivebox snapshot` to process.
Input formats:
- Plain URLs (one per line)
- Snapshot UUIDs (one per line)
- JSONL: {"type": "Snapshot", "url": "...", ...}
- JSONL: {"type": "Snapshot", "id": "...", ...}
- JSONL: {"url": "...", "depth": 1, "tags": "..."}
Output (JSONL):
{"type": "Snapshot", "url": "https://discovered-url.com", "via_extractor": "...", ...}
{"type": "Crawl", "id": "...", "urls": "...", "status": "queued", ...}
Examples:
# Discover links from a page (creates snapshot first)
# Create a crawl job
archivebox crawl https://example.com
# Discover links from an existing snapshot
archivebox crawl 01234567-89ab-cdef-0123-456789abcdef
# Create crawl with depth
archivebox crawl --depth=1 https://example.com
# Full recursive crawl pipeline
# Full pipeline: create crawl, create snapshots, run extractors
archivebox crawl https://example.com | archivebox snapshot | archivebox extract
# Use only specific parser plugin
archivebox crawl --plugin=parse_html_urls https://example.com
# Chain: create snapshot, then crawl its outlinks
archivebox snapshot https://example.com | archivebox crawl | archivebox snapshot | archivebox extract
# Process existing Crawl by ID (runs the crawl state machine)
archivebox crawl 01234567-89ab-cdef-0123-456789abcdef
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox crawl'
import sys
import json
from pathlib import Path
from typing import Optional
import rich_click as click
from archivebox.misc.util import docstring
def discover_outlinks(
def create_crawls(
args: tuple,
depth: int = 1,
plugin: str = '',
wait: bool = True,
depth: int = 0,
tag: str = '',
created_by_id: Optional[int] = None,
) -> int:
"""
Discover outgoing links from URLs or existing Snapshots.
Create Crawl jobs from URLs or JSONL records.
Accepts URLs or snapshot_ids. For URLs, creates Snapshots first.
Runs parser plugins, outputs discovered URLs as JSONL.
The output can be piped to `archivebox snapshot` to archive the discovered links.
Reads from args or stdin, creates Crawl objects, outputs JSONL.
Does NOT start the crawl - just creates the job in QUEUED state.
Exit codes:
0: Success
1: Failure
"""
from rich import print as rprint
from django.utils import timezone
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record,
TYPE_SNAPSHOT
)
from archivebox.misc.jsonl import read_args_or_stdin, write_record
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.crawls.models import Crawl
from archivebox.config import CONSTANTS
from archivebox.workers.orchestrator import Orchestrator
created_by_id = get_or_create_system_user_pk()
created_by_id = created_by_id or get_or_create_system_user_pk()
is_tty = sys.stdout.isatty()
# Collect all input records
records = list(read_args_or_stdin(args))
if not records:
rprint('[yellow]No URLs or snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
return 1
# Separate records into existing snapshots vs new URLs
existing_snapshot_ids = []
new_url_records = []
# Group URLs by crawl - each URL becomes its own Crawl for now
# (Could be enhanced to batch multiple URLs into one Crawl)
created_crawls = []
for record in records:
# Check if it's an existing snapshot (has id but no url, or looks like a UUID)
if record.get('id') and not record.get('url'):
existing_snapshot_ids.append(record['id'])
elif record.get('id'):
# Has both id and url - check if snapshot exists
try:
Snapshot.objects.get(id=record['id'])
existing_snapshot_ids.append(record['id'])
except Snapshot.DoesNotExist:
new_url_records.append(record)
elif record.get('url'):
new_url_records.append(record)
url = record.get('url')
if not url:
continue
# For new URLs, create a Crawl and Snapshots
snapshot_ids = list(existing_snapshot_ids)
try:
# Build crawl record
crawl_record = {
'url': url,
'max_depth': record.get('depth', depth),
'tags_str': record.get('tags', tag),
'label': record.get('label', ''),
}
if new_url_records:
# Create a Crawl to manage this operation
sources_file = CONSTANTS.SOURCES_DIR / f'{timezone.now().strftime("%Y-%m-%d__%H-%M-%S")}__crawl.txt'
sources_file.parent.mkdir(parents=True, exist_ok=True)
sources_file.write_text('\n'.join(r.get('url', '') for r in new_url_records if r.get('url')))
crawl = Crawl.from_jsonl(crawl_record, overrides={'created_by_id': created_by_id})
if crawl:
created_crawls.append(crawl)
crawl = Crawl.from_file(
sources_file,
max_depth=depth,
label=f'crawl --depth={depth}',
created_by=created_by_id,
)
# Output JSONL record (only when piped)
if not is_tty:
write_record(crawl.to_jsonl())
# Create snapshots for new URLs
for record in new_url_records:
try:
record['crawl_id'] = str(crawl.id)
record['depth'] = record.get('depth', 0)
except Exception as e:
rprint(f'[red]Error creating crawl: {e}[/red]', file=sys.stderr)
continue
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_jsonl(record, overrides=overrides)
if snapshot:
snapshot_ids.append(str(snapshot.id))
except Exception as e:
rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr)
continue
if not snapshot_ids:
rprint('[red]No snapshots to process[/red]', file=sys.stderr)
if not created_crawls:
rprint('[red]No crawls created[/red]', file=sys.stderr)
return 1
if existing_snapshot_ids:
rprint(f'[blue]Using {len(existing_snapshot_ids)} existing snapshots[/blue]', file=sys.stderr)
if new_url_records:
rprint(f'[blue]Created {len(snapshot_ids) - len(existing_snapshot_ids)} new snapshots[/blue]', file=sys.stderr)
rprint(f'[blue]Running parser plugins on {len(snapshot_ids)} snapshots...[/blue]', file=sys.stderr)
rprint(f'[green]Created {len(created_crawls)} crawls[/green]', file=sys.stderr)
# Create ArchiveResults for plugins
# If --plugin is specified, only run that one. Otherwise, run all available plugins.
# The orchestrator will handle dependency ordering (plugins declare deps in config.json)
for snapshot_id in snapshot_ids:
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
if plugin:
# User specified a single plugin to run
ArchiveResult.objects.get_or_create(
snapshot=snapshot,
extractor=plugin,
defaults={
'status': ArchiveResult.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
)
else:
# Create pending ArchiveResults for all enabled plugins
# This uses hook discovery to find available plugins dynamically
snapshot.create_pending_archiveresults()
# Mark snapshot as started
snapshot.status = Snapshot.StatusChoices.STARTED
snapshot.retry_at = timezone.now()
snapshot.save()
except Snapshot.DoesNotExist:
continue
# Run plugins
if wait:
rprint('[blue]Running outlink plugins...[/blue]', file=sys.stderr)
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
# Collect discovered URLs from urls.jsonl files
# Uses dynamic discovery - any plugin that outputs urls.jsonl is considered a parser
from archivebox.hooks import collect_urls_from_plugins
discovered_urls = {}
for snapshot_id in snapshot_ids:
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
snapshot_dir = Path(snapshot.output_dir)
# Dynamically collect urls.jsonl from ANY plugin subdirectory
for entry in collect_urls_from_plugins(snapshot_dir):
url = entry.get('url')
if url and url not in discovered_urls:
# Add metadata for crawl tracking
entry['type'] = TYPE_SNAPSHOT
entry['depth'] = snapshot.depth + 1
entry['via_snapshot'] = str(snapshot.id)
discovered_urls[url] = entry
except Snapshot.DoesNotExist:
continue
rprint(f'[green]Discovered {len(discovered_urls)} URLs[/green]', file=sys.stderr)
# Output discovered URLs as JSONL (when piped) or human-readable (when TTY)
for url, entry in discovered_urls.items():
if is_tty:
via = entry.get('via_extractor', 'unknown')
rprint(f' [dim]{via}[/dim] {url[:80]}', file=sys.stderr)
else:
write_record(entry)
# If TTY, show human-readable output
if is_tty:
for crawl in created_crawls:
first_url = crawl.get_urls_list()[0] if crawl.get_urls_list() else ''
rprint(f' [dim]{crawl.id}[/dim] {first_url[:60]}', file=sys.stderr)
return 0
@@ -262,12 +153,11 @@ def is_crawl_id(value: str) -> bool:
@click.command()
@click.option('--depth', '-d', type=int, default=1, help='Max depth for recursive crawling (default: 1)')
@click.option('--plugin', '-p', default='', help='Use only this parser plugin (e.g., parse_html_urls, parse_dom_outlinks)')
@click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)')
@click.option('--depth', '-d', type=int, default=0, help='Max depth for recursive crawling (default: 0, no recursion)')
@click.option('--tag', '-t', default='', help='Comma-separated tags to add to snapshots')
@click.argument('args', nargs=-1)
def main(depth: int, plugin: str, wait: bool, args: tuple):
"""Discover outgoing links from URLs or existing Snapshots, or process Crawl by ID"""
def main(depth: int, tag: str, args: tuple):
"""Create Crawl jobs from URLs, or process existing Crawls by ID"""
from archivebox.misc.jsonl import read_args_or_stdin
# Read all input
@@ -275,7 +165,7 @@ def main(depth: int, plugin: str, wait: bool, args: tuple):
if not records:
from rich import print as rprint
rprint('[yellow]No URLs, Snapshot IDs, or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
rprint('[yellow]No URLs or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
sys.exit(1)
# Check if input looks like existing Crawl IDs to process
@@ -295,8 +185,8 @@ def main(depth: int, plugin: str, wait: bool, args: tuple):
exit_code = result
sys.exit(exit_code)
else:
# Default behavior: discover outlinks from input (URLs or Snapshot IDs)
sys.exit(discover_outlinks(args, depth=depth, plugin=plugin, wait=wait))
# Default behavior: create Crawl jobs from URLs
sys.exit(create_crawls(args, depth=depth, tag=tag))
if __name__ == '__main__':

View File

@@ -1,29 +1,31 @@
#!/usr/bin/env python3
"""
archivebox snapshot [urls...] [--depth=N] [--tag=TAG] [--plugins=...]
archivebox snapshot [urls_or_crawl_ids...] [--tag=TAG] [--extract]
Create Snapshots from URLs. Accepts URLs as arguments, from stdin, or via JSONL.
Create Snapshots from URLs or Crawl jobs. Accepts URLs, Crawl JSONL, or Crawl IDs.
Input formats:
- Plain URLs (one per line)
- JSONL: {"type": "Crawl", "id": "...", "urls": "..."}
- JSONL: {"type": "Snapshot", "url": "...", "title": "...", "tags": "..."}
- Crawl UUIDs (one per line)
Output (JSONL):
{"type": "Snapshot", "id": "...", "url": "...", "status": "queued", ...}
Examples:
# Create snapshots from URLs
# Create snapshots from URLs directly
archivebox snapshot https://example.com https://foo.com
# Pipe from stdin
echo 'https://example.com' | archivebox snapshot
# Pipe from crawl command
archivebox crawl https://example.com | archivebox snapshot
# Chain with extract
archivebox snapshot https://example.com | archivebox extract
archivebox crawl https://example.com | archivebox snapshot | archivebox extract
# With crawl depth
archivebox snapshot --depth=1 https://example.com
# Process existing Snapshot by ID
archivebox snapshot 01234567-89ab-cdef-0123-456789abcdef
"""
__package__ = 'archivebox.cli'
@@ -67,17 +69,16 @@ def process_snapshot_by_id(snapshot_id: str) -> int:
def create_snapshots(
urls: tuple,
depth: int = 0,
args: tuple,
tag: str = '',
plugins: str = '',
extract: bool = False,
created_by_id: Optional[int] = None,
) -> int:
"""
Create Snapshots from URLs or JSONL records.
Create Snapshots from URLs, Crawl JSONL, or Crawl IDs.
Reads from args or stdin, creates Snapshot objects, outputs JSONL.
If --plugins is passed, also runs specified plugins (blocking).
If input is Crawl JSONL, creates Snapshots for all URLs in the Crawl.
Exit codes:
0: Success
@@ -88,63 +89,70 @@ def create_snapshots(
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record,
TYPE_SNAPSHOT, TYPE_TAG
TYPE_SNAPSHOT, TYPE_CRAWL
)
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.core.models import Snapshot
from archivebox.crawls.models import Crawl
from archivebox.config import CONSTANTS
created_by_id = created_by_id or get_or_create_system_user_pk()
is_tty = sys.stdout.isatty()
# Collect all input records
records = list(read_args_or_stdin(urls))
records = list(read_args_or_stdin(args))
if not records:
rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
rprint('[yellow]No URLs or Crawls provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
return 1
# If depth > 0, we need a Crawl to manage recursive discovery
crawl = None
if depth > 0:
# Create a crawl for this batch
sources_file = CONSTANTS.SOURCES_DIR / f'{timezone.now().strftime("%Y-%m-%d__%H-%M-%S")}__snapshot.txt'
sources_file.parent.mkdir(parents=True, exist_ok=True)
sources_file.write_text('\n'.join(r.get('url', '') for r in records if r.get('url')))
crawl = Crawl.from_file(
sources_file,
max_depth=depth,
label=f'snapshot --depth={depth}',
created_by=created_by_id,
)
# Process each record
# Process each record - handle Crawls and plain URLs/Snapshots
created_snapshots = []
for record in records:
if record.get('type') != TYPE_SNAPSHOT and 'url' not in record:
continue
record_type = record.get('type')
try:
# Add crawl info if we have one
if crawl:
record['crawl_id'] = str(crawl.id)
record['depth'] = record.get('depth', 0)
if record_type == TYPE_CRAWL:
# Input is a Crawl - get or create it, then create Snapshots for its URLs
crawl = None
crawl_id = record.get('id')
if crawl_id:
try:
crawl = Crawl.objects.get(id=crawl_id)
except Crawl.DoesNotExist:
# Crawl doesn't exist, create it
crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id})
else:
# No ID, create new crawl
crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id})
# Add tags if provided via CLI
if tag and not record.get('tags'):
record['tags'] = tag
if not crawl:
continue
# Get or create the snapshot
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_jsonl(record, overrides=overrides)
if snapshot:
created_snapshots.append(snapshot)
# Create snapshots for each URL in the crawl
for url in crawl.get_urls_list():
snapshot_record = {
'url': url,
'tags': crawl.tags_str,
'crawl_id': str(crawl.id),
'depth': 0,
}
snapshot = Snapshot.from_jsonl(snapshot_record, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
if not is_tty:
write_record(snapshot.to_jsonl())
# Output JSONL record (only when piped)
if not is_tty:
write_record(snapshot.to_jsonl())
elif record_type == TYPE_SNAPSHOT or record.get('url'):
# Input is a Snapshot or plain URL
# Add tags if provided via CLI
if tag and not record.get('tags'):
record['tags'] = tag
snapshot = Snapshot.from_jsonl(record, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
if not is_tty:
write_record(snapshot.to_jsonl())
except Exception as e:
rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr)
@@ -161,10 +169,10 @@ def create_snapshots(
for snapshot in created_snapshots:
rprint(f' [dim]{snapshot.id}[/dim] {snapshot.url[:60]}', file=sys.stderr)
# If --plugins is passed, run the orchestrator for those plugins
if plugins:
# If --extract is passed, run the orchestrator
if extract:
from archivebox.workers.orchestrator import Orchestrator
rprint(f'[blue]Running plugins: {plugins or "all"}...[/blue]', file=sys.stderr)
rprint('[blue]Running extractors...[/blue]', file=sys.stderr)
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
@@ -175,16 +183,19 @@ def is_snapshot_id(value: str) -> bool:
"""Check if value looks like a Snapshot UUID."""
import re
uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I)
return bool(uuid_pattern.match(value))
if not uuid_pattern.match(value):
return False
# Verify it's actually a Snapshot (not a Crawl or other object)
from archivebox.core.models import Snapshot
return Snapshot.objects.filter(id=value).exists()
@click.command()
@click.option('--depth', '-d', type=int, default=0, help='Recursively crawl linked pages up to N levels deep')
@click.option('--tag', '-t', default='', help='Comma-separated tags to add to each snapshot')
@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run after creating snapshots (e.g. title,screenshot)')
@click.option('--extract/--no-extract', default=False, help='Run extractors after creating snapshots')
@click.argument('args', nargs=-1)
def main(depth: int, tag: str, plugins: str, args: tuple):
"""Create Snapshots from URLs, or process existing Snapshots by ID"""
def main(tag: str, extract: bool, args: tuple):
"""Create Snapshots from URLs/Crawls, or process existing Snapshots by ID"""
from archivebox.misc.jsonl import read_args_or_stdin
# Read all input
@@ -192,17 +203,21 @@ def main(depth: int, tag: str, plugins: str, args: tuple):
if not records:
from rich import print as rprint
rprint('[yellow]No URLs or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
rprint('[yellow]No URLs, Crawl IDs, or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
sys.exit(1)
# Check if input looks like existing Snapshot IDs to process
# If ALL inputs are UUIDs with no URL, assume we're processing existing Snapshots
all_are_ids = all(
(r.get('id') and not r.get('url')) or is_snapshot_id(r.get('url', ''))
# If ALL inputs are UUIDs with no URL and exist as Snapshots, process them
all_are_snapshot_ids = all(
is_snapshot_id(r.get('id') or r.get('url', ''))
for r in records
if r.get('type') != 'Crawl' # Don't check Crawl records as Snapshot IDs
)
if all_are_ids:
# But also check that we're not receiving Crawl JSONL
has_crawl_records = any(r.get('type') == 'Crawl' for r in records)
if all_are_snapshot_ids and not has_crawl_records:
# Process existing Snapshots by ID
exit_code = 0
for record in records:
@@ -212,8 +227,8 @@ def main(depth: int, tag: str, plugins: str, args: tuple):
exit_code = result
sys.exit(exit_code)
else:
# Create new Snapshots from URLs
sys.exit(create_snapshots(args, depth=depth, tag=tag, plugins=plugins))
# Create new Snapshots from URLs or Crawls
sys.exit(create_snapshots(args, tag=tag, extract=extract))
if __name__ == '__main__':

View File

@@ -146,9 +146,55 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
'urls': self.urls,
'status': self.status,
'max_depth': self.max_depth,
'tags_str': self.tags_str,
'label': self.label,
'created_at': self.created_at.isoformat() if self.created_at else None,
}
@staticmethod
def from_jsonl(record: dict, overrides: dict = None):
"""
Create or get a Crawl from a JSONL record.
Args:
record: Dict with 'urls' (required), optional 'max_depth', 'tags_str', 'label'
overrides: Dict of field overrides (e.g., created_by_id)
Returns:
Crawl instance or None if invalid
"""
from django.utils import timezone
overrides = overrides or {}
# Check if crawl already exists by ID
crawl_id = record.get('id')
if crawl_id:
try:
return Crawl.objects.get(id=crawl_id)
except Crawl.DoesNotExist:
pass
# Get URLs - can be string (newline-separated) or from 'url' field
urls = record.get('urls', '')
if not urls and record.get('url'):
urls = record['url']
if not urls:
return None
# Create new crawl (status stays QUEUED, not started)
crawl = Crawl.objects.create(
urls=urls,
max_depth=record.get('max_depth', record.get('depth', 0)),
tags_str=record.get('tags_str', record.get('tags', '')),
label=record.get('label', ''),
status=Crawl.StatusChoices.QUEUED,
retry_at=timezone.now(),
**overrides,
)
return crawl
@property
def output_dir_parent(self) -> str:
"""Construct parent directory: users/{user_id}/crawls/{YYYYMMDD}"""

View File

@@ -4,9 +4,15 @@ JSONL (JSON Lines) utilities for ArchiveBox.
Provides functions for reading, writing, and processing typed JSONL records.
All CLI commands that accept stdin can read both plain URLs and typed JSONL.
CLI Pipeline:
archivebox crawl URL -> {"type": "Crawl", "id": "...", "urls": "...", ...}
archivebox snapshot -> {"type": "Snapshot", "id": "...", "url": "...", ...}
archivebox extract -> {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", ...}
Typed JSONL Format:
{"type": "Snapshot", "url": "https://example.com", "title": "...", "tags": "..."}
{"type": "ArchiveResult", "snapshot_id": "...", "extractor": "wget", ...}
{"type": "Crawl", "id": "...", "urls": "...", "max_depth": 0, ...}
{"type": "Snapshot", "id": "...", "url": "https://example.com", "title": "...", ...}
{"type": "ArchiveResult", "id": "...", "snapshot_id": "...", "plugin": "...", ...}
{"type": "Tag", "name": "..."}
Plain URLs (also supported):