diff --git a/archivebox/cli/archivebox_crawl.py b/archivebox/cli/archivebox_crawl.py index 3bedaade..f8b52a11 100644 --- a/archivebox/cli/archivebox_crawl.py +++ b/archivebox/cli/archivebox_crawl.py @@ -1,222 +1,113 @@ #!/usr/bin/env python3 """ -archivebox crawl [urls_or_snapshot_ids...] [--depth=N] [--plugin=NAME] +archivebox crawl [urls...] [--depth=N] [--tag=TAG] -Discover outgoing links from URLs or existing Snapshots. - -If a URL is passed, creates a Snapshot for it first, then runs parser plugins. -If a snapshot_id is passed, runs parser plugins on the existing Snapshot. -Outputs discovered outlink URLs as JSONL. - -Pipe the output to `archivebox snapshot` to archive the discovered URLs. +Create Crawl jobs from URLs. Accepts URLs as arguments, from stdin, or via JSONL. +Does NOT immediately start the crawl - pipe to `archivebox snapshot` to process. Input formats: - Plain URLs (one per line) - - Snapshot UUIDs (one per line) - - JSONL: {"type": "Snapshot", "url": "...", ...} - - JSONL: {"type": "Snapshot", "id": "...", ...} + - JSONL: {"url": "...", "depth": 1, "tags": "..."} Output (JSONL): - {"type": "Snapshot", "url": "https://discovered-url.com", "via_extractor": "...", ...} + {"type": "Crawl", "id": "...", "urls": "...", "status": "queued", ...} Examples: - # Discover links from a page (creates snapshot first) + # Create a crawl job archivebox crawl https://example.com - # Discover links from an existing snapshot - archivebox crawl 01234567-89ab-cdef-0123-456789abcdef + # Create crawl with depth + archivebox crawl --depth=1 https://example.com - # Full recursive crawl pipeline + # Full pipeline: create crawl, create snapshots, run extractors archivebox crawl https://example.com | archivebox snapshot | archivebox extract - # Use only specific parser plugin - archivebox crawl --plugin=parse_html_urls https://example.com - - # Chain: create snapshot, then crawl its outlinks - archivebox snapshot https://example.com | archivebox crawl | archivebox snapshot | archivebox extract + # Process existing Crawl by ID (runs the crawl state machine) + archivebox crawl 01234567-89ab-cdef-0123-456789abcdef """ __package__ = 'archivebox.cli' __command__ = 'archivebox crawl' import sys -import json -from pathlib import Path from typing import Optional import rich_click as click -from archivebox.misc.util import docstring - -def discover_outlinks( +def create_crawls( args: tuple, - depth: int = 1, - plugin: str = '', - wait: bool = True, + depth: int = 0, + tag: str = '', + created_by_id: Optional[int] = None, ) -> int: """ - Discover outgoing links from URLs or existing Snapshots. + Create Crawl jobs from URLs or JSONL records. - Accepts URLs or snapshot_ids. For URLs, creates Snapshots first. - Runs parser plugins, outputs discovered URLs as JSONL. - The output can be piped to `archivebox snapshot` to archive the discovered links. + Reads from args or stdin, creates Crawl objects, outputs JSONL. + Does NOT start the crawl - just creates the job in QUEUED state. Exit codes: 0: Success 1: Failure """ from rich import print as rprint - from django.utils import timezone - from archivebox.misc.jsonl import ( - read_args_or_stdin, write_record, - TYPE_SNAPSHOT - ) + from archivebox.misc.jsonl import read_args_or_stdin, write_record from archivebox.base_models.models import get_or_create_system_user_pk - from archivebox.core.models import Snapshot, ArchiveResult from archivebox.crawls.models import Crawl - from archivebox.config import CONSTANTS - from archivebox.workers.orchestrator import Orchestrator - created_by_id = get_or_create_system_user_pk() + created_by_id = created_by_id or get_or_create_system_user_pk() is_tty = sys.stdout.isatty() # Collect all input records records = list(read_args_or_stdin(args)) if not records: - rprint('[yellow]No URLs or snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) + rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr) return 1 - # Separate records into existing snapshots vs new URLs - existing_snapshot_ids = [] - new_url_records = [] - + # Group URLs by crawl - each URL becomes its own Crawl for now + # (Could be enhanced to batch multiple URLs into one Crawl) + created_crawls = [] for record in records: - # Check if it's an existing snapshot (has id but no url, or looks like a UUID) - if record.get('id') and not record.get('url'): - existing_snapshot_ids.append(record['id']) - elif record.get('id'): - # Has both id and url - check if snapshot exists - try: - Snapshot.objects.get(id=record['id']) - existing_snapshot_ids.append(record['id']) - except Snapshot.DoesNotExist: - new_url_records.append(record) - elif record.get('url'): - new_url_records.append(record) + url = record.get('url') + if not url: + continue - # For new URLs, create a Crawl and Snapshots - snapshot_ids = list(existing_snapshot_ids) + try: + # Build crawl record + crawl_record = { + 'url': url, + 'max_depth': record.get('depth', depth), + 'tags_str': record.get('tags', tag), + 'label': record.get('label', ''), + } - if new_url_records: - # Create a Crawl to manage this operation - sources_file = CONSTANTS.SOURCES_DIR / f'{timezone.now().strftime("%Y-%m-%d__%H-%M-%S")}__crawl.txt' - sources_file.parent.mkdir(parents=True, exist_ok=True) - sources_file.write_text('\n'.join(r.get('url', '') for r in new_url_records if r.get('url'))) + crawl = Crawl.from_jsonl(crawl_record, overrides={'created_by_id': created_by_id}) + if crawl: + created_crawls.append(crawl) - crawl = Crawl.from_file( - sources_file, - max_depth=depth, - label=f'crawl --depth={depth}', - created_by=created_by_id, - ) + # Output JSONL record (only when piped) + if not is_tty: + write_record(crawl.to_jsonl()) - # Create snapshots for new URLs - for record in new_url_records: - try: - record['crawl_id'] = str(crawl.id) - record['depth'] = record.get('depth', 0) + except Exception as e: + rprint(f'[red]Error creating crawl: {e}[/red]', file=sys.stderr) + continue - overrides = {'created_by_id': created_by_id} - snapshot = Snapshot.from_jsonl(record, overrides=overrides) - if snapshot: - snapshot_ids.append(str(snapshot.id)) - - except Exception as e: - rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr) - continue - - if not snapshot_ids: - rprint('[red]No snapshots to process[/red]', file=sys.stderr) + if not created_crawls: + rprint('[red]No crawls created[/red]', file=sys.stderr) return 1 - if existing_snapshot_ids: - rprint(f'[blue]Using {len(existing_snapshot_ids)} existing snapshots[/blue]', file=sys.stderr) - if new_url_records: - rprint(f'[blue]Created {len(snapshot_ids) - len(existing_snapshot_ids)} new snapshots[/blue]', file=sys.stderr) - rprint(f'[blue]Running parser plugins on {len(snapshot_ids)} snapshots...[/blue]', file=sys.stderr) + rprint(f'[green]Created {len(created_crawls)} crawls[/green]', file=sys.stderr) - # Create ArchiveResults for plugins - # If --plugin is specified, only run that one. Otherwise, run all available plugins. - # The orchestrator will handle dependency ordering (plugins declare deps in config.json) - for snapshot_id in snapshot_ids: - try: - snapshot = Snapshot.objects.get(id=snapshot_id) - - if plugin: - # User specified a single plugin to run - ArchiveResult.objects.get_or_create( - snapshot=snapshot, - extractor=plugin, - defaults={ - 'status': ArchiveResult.StatusChoices.QUEUED, - 'retry_at': timezone.now(), - } - ) - else: - # Create pending ArchiveResults for all enabled plugins - # This uses hook discovery to find available plugins dynamically - snapshot.create_pending_archiveresults() - - # Mark snapshot as started - snapshot.status = Snapshot.StatusChoices.STARTED - snapshot.retry_at = timezone.now() - snapshot.save() - - except Snapshot.DoesNotExist: - continue - - # Run plugins - if wait: - rprint('[blue]Running outlink plugins...[/blue]', file=sys.stderr) - orchestrator = Orchestrator(exit_on_idle=True) - orchestrator.runloop() - - # Collect discovered URLs from urls.jsonl files - # Uses dynamic discovery - any plugin that outputs urls.jsonl is considered a parser - from archivebox.hooks import collect_urls_from_plugins - - discovered_urls = {} - for snapshot_id in snapshot_ids: - try: - snapshot = Snapshot.objects.get(id=snapshot_id) - snapshot_dir = Path(snapshot.output_dir) - - # Dynamically collect urls.jsonl from ANY plugin subdirectory - for entry in collect_urls_from_plugins(snapshot_dir): - url = entry.get('url') - if url and url not in discovered_urls: - # Add metadata for crawl tracking - entry['type'] = TYPE_SNAPSHOT - entry['depth'] = snapshot.depth + 1 - entry['via_snapshot'] = str(snapshot.id) - discovered_urls[url] = entry - - except Snapshot.DoesNotExist: - continue - - rprint(f'[green]Discovered {len(discovered_urls)} URLs[/green]', file=sys.stderr) - - # Output discovered URLs as JSONL (when piped) or human-readable (when TTY) - for url, entry in discovered_urls.items(): - if is_tty: - via = entry.get('via_extractor', 'unknown') - rprint(f' [dim]{via}[/dim] {url[:80]}', file=sys.stderr) - else: - write_record(entry) + # If TTY, show human-readable output + if is_tty: + for crawl in created_crawls: + first_url = crawl.get_urls_list()[0] if crawl.get_urls_list() else '' + rprint(f' [dim]{crawl.id}[/dim] {first_url[:60]}', file=sys.stderr) return 0 @@ -262,12 +153,11 @@ def is_crawl_id(value: str) -> bool: @click.command() -@click.option('--depth', '-d', type=int, default=1, help='Max depth for recursive crawling (default: 1)') -@click.option('--plugin', '-p', default='', help='Use only this parser plugin (e.g., parse_html_urls, parse_dom_outlinks)') -@click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)') +@click.option('--depth', '-d', type=int, default=0, help='Max depth for recursive crawling (default: 0, no recursion)') +@click.option('--tag', '-t', default='', help='Comma-separated tags to add to snapshots') @click.argument('args', nargs=-1) -def main(depth: int, plugin: str, wait: bool, args: tuple): - """Discover outgoing links from URLs or existing Snapshots, or process Crawl by ID""" +def main(depth: int, tag: str, args: tuple): + """Create Crawl jobs from URLs, or process existing Crawls by ID""" from archivebox.misc.jsonl import read_args_or_stdin # Read all input @@ -275,7 +165,7 @@ def main(depth: int, plugin: str, wait: bool, args: tuple): if not records: from rich import print as rprint - rprint('[yellow]No URLs, Snapshot IDs, or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) + rprint('[yellow]No URLs or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) sys.exit(1) # Check if input looks like existing Crawl IDs to process @@ -295,8 +185,8 @@ def main(depth: int, plugin: str, wait: bool, args: tuple): exit_code = result sys.exit(exit_code) else: - # Default behavior: discover outlinks from input (URLs or Snapshot IDs) - sys.exit(discover_outlinks(args, depth=depth, plugin=plugin, wait=wait)) + # Default behavior: create Crawl jobs from URLs + sys.exit(create_crawls(args, depth=depth, tag=tag)) if __name__ == '__main__': diff --git a/archivebox/cli/archivebox_snapshot.py b/archivebox/cli/archivebox_snapshot.py index 67f048fb..7ef2ff4a 100644 --- a/archivebox/cli/archivebox_snapshot.py +++ b/archivebox/cli/archivebox_snapshot.py @@ -1,29 +1,31 @@ #!/usr/bin/env python3 """ -archivebox snapshot [urls...] [--depth=N] [--tag=TAG] [--plugins=...] +archivebox snapshot [urls_or_crawl_ids...] [--tag=TAG] [--extract] -Create Snapshots from URLs. Accepts URLs as arguments, from stdin, or via JSONL. +Create Snapshots from URLs or Crawl jobs. Accepts URLs, Crawl JSONL, or Crawl IDs. Input formats: - Plain URLs (one per line) + - JSONL: {"type": "Crawl", "id": "...", "urls": "..."} - JSONL: {"type": "Snapshot", "url": "...", "title": "...", "tags": "..."} + - Crawl UUIDs (one per line) Output (JSONL): {"type": "Snapshot", "id": "...", "url": "...", "status": "queued", ...} Examples: - # Create snapshots from URLs + # Create snapshots from URLs directly archivebox snapshot https://example.com https://foo.com - # Pipe from stdin - echo 'https://example.com' | archivebox snapshot + # Pipe from crawl command + archivebox crawl https://example.com | archivebox snapshot # Chain with extract - archivebox snapshot https://example.com | archivebox extract + archivebox crawl https://example.com | archivebox snapshot | archivebox extract - # With crawl depth - archivebox snapshot --depth=1 https://example.com + # Process existing Snapshot by ID + archivebox snapshot 01234567-89ab-cdef-0123-456789abcdef """ __package__ = 'archivebox.cli' @@ -67,17 +69,16 @@ def process_snapshot_by_id(snapshot_id: str) -> int: def create_snapshots( - urls: tuple, - depth: int = 0, + args: tuple, tag: str = '', - plugins: str = '', + extract: bool = False, created_by_id: Optional[int] = None, ) -> int: """ - Create Snapshots from URLs or JSONL records. + Create Snapshots from URLs, Crawl JSONL, or Crawl IDs. Reads from args or stdin, creates Snapshot objects, outputs JSONL. - If --plugins is passed, also runs specified plugins (blocking). + If input is Crawl JSONL, creates Snapshots for all URLs in the Crawl. Exit codes: 0: Success @@ -88,63 +89,70 @@ def create_snapshots( from archivebox.misc.jsonl import ( read_args_or_stdin, write_record, - TYPE_SNAPSHOT, TYPE_TAG + TYPE_SNAPSHOT, TYPE_CRAWL ) from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.core.models import Snapshot from archivebox.crawls.models import Crawl - from archivebox.config import CONSTANTS created_by_id = created_by_id or get_or_create_system_user_pk() is_tty = sys.stdout.isatty() # Collect all input records - records = list(read_args_or_stdin(urls)) + records = list(read_args_or_stdin(args)) if not records: - rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr) + rprint('[yellow]No URLs or Crawls provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr) return 1 - # If depth > 0, we need a Crawl to manage recursive discovery - crawl = None - if depth > 0: - # Create a crawl for this batch - sources_file = CONSTANTS.SOURCES_DIR / f'{timezone.now().strftime("%Y-%m-%d__%H-%M-%S")}__snapshot.txt' - sources_file.parent.mkdir(parents=True, exist_ok=True) - sources_file.write_text('\n'.join(r.get('url', '') for r in records if r.get('url'))) - - crawl = Crawl.from_file( - sources_file, - max_depth=depth, - label=f'snapshot --depth={depth}', - created_by=created_by_id, - ) - - # Process each record + # Process each record - handle Crawls and plain URLs/Snapshots created_snapshots = [] for record in records: - if record.get('type') != TYPE_SNAPSHOT and 'url' not in record: - continue + record_type = record.get('type') try: - # Add crawl info if we have one - if crawl: - record['crawl_id'] = str(crawl.id) - record['depth'] = record.get('depth', 0) + if record_type == TYPE_CRAWL: + # Input is a Crawl - get or create it, then create Snapshots for its URLs + crawl = None + crawl_id = record.get('id') + if crawl_id: + try: + crawl = Crawl.objects.get(id=crawl_id) + except Crawl.DoesNotExist: + # Crawl doesn't exist, create it + crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id}) + else: + # No ID, create new crawl + crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id}) - # Add tags if provided via CLI - if tag and not record.get('tags'): - record['tags'] = tag + if not crawl: + continue - # Get or create the snapshot - overrides = {'created_by_id': created_by_id} - snapshot = Snapshot.from_jsonl(record, overrides=overrides) - if snapshot: - created_snapshots.append(snapshot) + # Create snapshots for each URL in the crawl + for url in crawl.get_urls_list(): + snapshot_record = { + 'url': url, + 'tags': crawl.tags_str, + 'crawl_id': str(crawl.id), + 'depth': 0, + } + snapshot = Snapshot.from_jsonl(snapshot_record, overrides={'created_by_id': created_by_id}) + if snapshot: + created_snapshots.append(snapshot) + if not is_tty: + write_record(snapshot.to_jsonl()) - # Output JSONL record (only when piped) - if not is_tty: - write_record(snapshot.to_jsonl()) + elif record_type == TYPE_SNAPSHOT or record.get('url'): + # Input is a Snapshot or plain URL + # Add tags if provided via CLI + if tag and not record.get('tags'): + record['tags'] = tag + + snapshot = Snapshot.from_jsonl(record, overrides={'created_by_id': created_by_id}) + if snapshot: + created_snapshots.append(snapshot) + if not is_tty: + write_record(snapshot.to_jsonl()) except Exception as e: rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr) @@ -161,10 +169,10 @@ def create_snapshots( for snapshot in created_snapshots: rprint(f' [dim]{snapshot.id}[/dim] {snapshot.url[:60]}', file=sys.stderr) - # If --plugins is passed, run the orchestrator for those plugins - if plugins: + # If --extract is passed, run the orchestrator + if extract: from archivebox.workers.orchestrator import Orchestrator - rprint(f'[blue]Running plugins: {plugins or "all"}...[/blue]', file=sys.stderr) + rprint('[blue]Running extractors...[/blue]', file=sys.stderr) orchestrator = Orchestrator(exit_on_idle=True) orchestrator.runloop() @@ -175,16 +183,19 @@ def is_snapshot_id(value: str) -> bool: """Check if value looks like a Snapshot UUID.""" import re uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I) - return bool(uuid_pattern.match(value)) + if not uuid_pattern.match(value): + return False + # Verify it's actually a Snapshot (not a Crawl or other object) + from archivebox.core.models import Snapshot + return Snapshot.objects.filter(id=value).exists() @click.command() -@click.option('--depth', '-d', type=int, default=0, help='Recursively crawl linked pages up to N levels deep') @click.option('--tag', '-t', default='', help='Comma-separated tags to add to each snapshot') -@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run after creating snapshots (e.g. title,screenshot)') +@click.option('--extract/--no-extract', default=False, help='Run extractors after creating snapshots') @click.argument('args', nargs=-1) -def main(depth: int, tag: str, plugins: str, args: tuple): - """Create Snapshots from URLs, or process existing Snapshots by ID""" +def main(tag: str, extract: bool, args: tuple): + """Create Snapshots from URLs/Crawls, or process existing Snapshots by ID""" from archivebox.misc.jsonl import read_args_or_stdin # Read all input @@ -192,17 +203,21 @@ def main(depth: int, tag: str, plugins: str, args: tuple): if not records: from rich import print as rprint - rprint('[yellow]No URLs or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) + rprint('[yellow]No URLs, Crawl IDs, or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) sys.exit(1) # Check if input looks like existing Snapshot IDs to process - # If ALL inputs are UUIDs with no URL, assume we're processing existing Snapshots - all_are_ids = all( - (r.get('id') and not r.get('url')) or is_snapshot_id(r.get('url', '')) + # If ALL inputs are UUIDs with no URL and exist as Snapshots, process them + all_are_snapshot_ids = all( + is_snapshot_id(r.get('id') or r.get('url', '')) for r in records + if r.get('type') != 'Crawl' # Don't check Crawl records as Snapshot IDs ) - if all_are_ids: + # But also check that we're not receiving Crawl JSONL + has_crawl_records = any(r.get('type') == 'Crawl' for r in records) + + if all_are_snapshot_ids and not has_crawl_records: # Process existing Snapshots by ID exit_code = 0 for record in records: @@ -212,8 +227,8 @@ def main(depth: int, tag: str, plugins: str, args: tuple): exit_code = result sys.exit(exit_code) else: - # Create new Snapshots from URLs - sys.exit(create_snapshots(args, depth=depth, tag=tag, plugins=plugins)) + # Create new Snapshots from URLs or Crawls + sys.exit(create_snapshots(args, tag=tag, extract=extract)) if __name__ == '__main__': diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index c6e768c1..3e1a53f9 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -146,9 +146,55 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith 'urls': self.urls, 'status': self.status, 'max_depth': self.max_depth, + 'tags_str': self.tags_str, + 'label': self.label, 'created_at': self.created_at.isoformat() if self.created_at else None, } + @staticmethod + def from_jsonl(record: dict, overrides: dict = None): + """ + Create or get a Crawl from a JSONL record. + + Args: + record: Dict with 'urls' (required), optional 'max_depth', 'tags_str', 'label' + overrides: Dict of field overrides (e.g., created_by_id) + + Returns: + Crawl instance or None if invalid + """ + from django.utils import timezone + + overrides = overrides or {} + + # Check if crawl already exists by ID + crawl_id = record.get('id') + if crawl_id: + try: + return Crawl.objects.get(id=crawl_id) + except Crawl.DoesNotExist: + pass + + # Get URLs - can be string (newline-separated) or from 'url' field + urls = record.get('urls', '') + if not urls and record.get('url'): + urls = record['url'] + + if not urls: + return None + + # Create new crawl (status stays QUEUED, not started) + crawl = Crawl.objects.create( + urls=urls, + max_depth=record.get('max_depth', record.get('depth', 0)), + tags_str=record.get('tags_str', record.get('tags', '')), + label=record.get('label', ''), + status=Crawl.StatusChoices.QUEUED, + retry_at=timezone.now(), + **overrides, + ) + return crawl + @property def output_dir_parent(self) -> str: """Construct parent directory: users/{user_id}/crawls/{YYYYMMDD}""" diff --git a/archivebox/misc/jsonl.py b/archivebox/misc/jsonl.py index 5d344d3a..1e555a0a 100644 --- a/archivebox/misc/jsonl.py +++ b/archivebox/misc/jsonl.py @@ -4,9 +4,15 @@ JSONL (JSON Lines) utilities for ArchiveBox. Provides functions for reading, writing, and processing typed JSONL records. All CLI commands that accept stdin can read both plain URLs and typed JSONL. +CLI Pipeline: + archivebox crawl URL -> {"type": "Crawl", "id": "...", "urls": "...", ...} + archivebox snapshot -> {"type": "Snapshot", "id": "...", "url": "...", ...} + archivebox extract -> {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", ...} + Typed JSONL Format: - {"type": "Snapshot", "url": "https://example.com", "title": "...", "tags": "..."} - {"type": "ArchiveResult", "snapshot_id": "...", "extractor": "wget", ...} + {"type": "Crawl", "id": "...", "urls": "...", "max_depth": 0, ...} + {"type": "Snapshot", "id": "...", "url": "https://example.com", "title": "...", ...} + {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", "plugin": "...", ...} {"type": "Tag", "name": "..."} Plain URLs (also supported):