diff --git a/archivebox/cli/archivebox_crawl.py b/archivebox/cli/archivebox_crawl.py index 3bedaade..d8c3c7ad 100644 --- a/archivebox/cli/archivebox_crawl.py +++ b/archivebox/cli/archivebox_crawl.py @@ -1,224 +1,114 @@ #!/usr/bin/env python3 """ -archivebox crawl [urls_or_snapshot_ids...] [--depth=N] [--plugin=NAME] +archivebox crawl [urls...] [--depth=N] [--tag=TAG] -Discover outgoing links from URLs or existing Snapshots. - -If a URL is passed, creates a Snapshot for it first, then runs parser plugins. -If a snapshot_id is passed, runs parser plugins on the existing Snapshot. -Outputs discovered outlink URLs as JSONL. - -Pipe the output to `archivebox snapshot` to archive the discovered URLs. +Create Crawl jobs from URLs. Accepts URLs as arguments, from stdin, or via JSONL. +Does NOT immediately start the crawl - pipe to `archivebox snapshot` to process. Input formats: - Plain URLs (one per line) - - Snapshot UUIDs (one per line) - - JSONL: {"type": "Snapshot", "url": "...", ...} - - JSONL: {"type": "Snapshot", "id": "...", ...} + - JSONL: {"url": "...", "depth": 1, "tags": "..."} Output (JSONL): - {"type": "Snapshot", "url": "https://discovered-url.com", "via_extractor": "...", ...} + {"type": "Crawl", "id": "...", "urls": "...", "status": "queued", ...} Examples: - # Discover links from a page (creates snapshot first) + # Create a crawl job archivebox crawl https://example.com - # Discover links from an existing snapshot - archivebox crawl 01234567-89ab-cdef-0123-456789abcdef + # Create crawl with depth + archivebox crawl --depth=1 https://example.com - # Full recursive crawl pipeline + # Full pipeline: create crawl, create snapshots, run extractors archivebox crawl https://example.com | archivebox snapshot | archivebox extract - # Use only specific parser plugin - archivebox crawl --plugin=parse_html_urls https://example.com - - # Chain: create snapshot, then crawl its outlinks - archivebox snapshot https://example.com | archivebox crawl | archivebox snapshot | archivebox extract + # Process existing Crawl by ID (runs the crawl state machine) + archivebox crawl 01234567-89ab-cdef-0123-456789abcdef """ __package__ = 'archivebox.cli' __command__ = 'archivebox crawl' import sys -import json -from pathlib import Path from typing import Optional import rich_click as click -from archivebox.misc.util import docstring - -def discover_outlinks( - args: tuple, - depth: int = 1, - plugin: str = '', - wait: bool = True, +def create_crawls( + records: list, + depth: int = 0, + tag: str = '', + created_by_id: Optional[int] = None, ) -> int: """ - Discover outgoing links from URLs or existing Snapshots. + Create a single Crawl job from all input URLs. - Accepts URLs or snapshot_ids. For URLs, creates Snapshots first. - Runs parser plugins, outputs discovered URLs as JSONL. - The output can be piped to `archivebox snapshot` to archive the discovered links. + Takes pre-read records, creates one Crawl with all URLs, outputs JSONL. + Does NOT start the crawl - just creates the job in QUEUED state. Exit codes: 0: Success 1: Failure """ from rich import print as rprint - from django.utils import timezone - from archivebox.misc.jsonl import ( - read_args_or_stdin, write_record, - TYPE_SNAPSHOT - ) + from archivebox.misc.jsonl import write_record from archivebox.base_models.models import get_or_create_system_user_pk - from archivebox.core.models import Snapshot, ArchiveResult from archivebox.crawls.models import Crawl - from archivebox.config import CONSTANTS - from archivebox.workers.orchestrator import Orchestrator - created_by_id = get_or_create_system_user_pk() + created_by_id = created_by_id or get_or_create_system_user_pk() is_tty = sys.stdout.isatty() - # Collect all input records - records = list(read_args_or_stdin(args)) - if not records: - rprint('[yellow]No URLs or snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) + rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr) return 1 - # Separate records into existing snapshots vs new URLs - existing_snapshot_ids = [] - new_url_records = [] - + # Collect all URLs into a single newline-separated string + urls = [] for record in records: - # Check if it's an existing snapshot (has id but no url, or looks like a UUID) - if record.get('id') and not record.get('url'): - existing_snapshot_ids.append(record['id']) - elif record.get('id'): - # Has both id and url - check if snapshot exists - try: - Snapshot.objects.get(id=record['id']) - existing_snapshot_ids.append(record['id']) - except Snapshot.DoesNotExist: - new_url_records.append(record) - elif record.get('url'): - new_url_records.append(record) + url = record.get('url') + if url: + urls.append(url) - # For new URLs, create a Crawl and Snapshots - snapshot_ids = list(existing_snapshot_ids) - - if new_url_records: - # Create a Crawl to manage this operation - sources_file = CONSTANTS.SOURCES_DIR / f'{timezone.now().strftime("%Y-%m-%d__%H-%M-%S")}__crawl.txt' - sources_file.parent.mkdir(parents=True, exist_ok=True) - sources_file.write_text('\n'.join(r.get('url', '') for r in new_url_records if r.get('url'))) - - crawl = Crawl.from_file( - sources_file, - max_depth=depth, - label=f'crawl --depth={depth}', - created_by=created_by_id, - ) - - # Create snapshots for new URLs - for record in new_url_records: - try: - record['crawl_id'] = str(crawl.id) - record['depth'] = record.get('depth', 0) - - overrides = {'created_by_id': created_by_id} - snapshot = Snapshot.from_jsonl(record, overrides=overrides) - if snapshot: - snapshot_ids.append(str(snapshot.id)) - - except Exception as e: - rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr) - continue - - if not snapshot_ids: - rprint('[red]No snapshots to process[/red]', file=sys.stderr) + if not urls: + rprint('[red]No valid URLs found[/red]', file=sys.stderr) return 1 - if existing_snapshot_ids: - rprint(f'[blue]Using {len(existing_snapshot_ids)} existing snapshots[/blue]', file=sys.stderr) - if new_url_records: - rprint(f'[blue]Created {len(snapshot_ids) - len(existing_snapshot_ids)} new snapshots[/blue]', file=sys.stderr) - rprint(f'[blue]Running parser plugins on {len(snapshot_ids)} snapshots...[/blue]', file=sys.stderr) + try: + # Build crawl record with all URLs as newline-separated string + crawl_record = { + 'urls': '\n'.join(urls), + 'max_depth': depth, + 'tags_str': tag, + 'label': '', + } - # Create ArchiveResults for plugins - # If --plugin is specified, only run that one. Otherwise, run all available plugins. - # The orchestrator will handle dependency ordering (plugins declare deps in config.json) - for snapshot_id in snapshot_ids: - try: - snapshot = Snapshot.objects.get(id=snapshot_id) + crawl = Crawl.from_jsonl(crawl_record, overrides={'created_by_id': created_by_id}) + if not crawl: + rprint('[red]Failed to create crawl[/red]', file=sys.stderr) + return 1 - if plugin: - # User specified a single plugin to run - ArchiveResult.objects.get_or_create( - snapshot=snapshot, - extractor=plugin, - defaults={ - 'status': ArchiveResult.StatusChoices.QUEUED, - 'retry_at': timezone.now(), - } - ) - else: - # Create pending ArchiveResults for all enabled plugins - # This uses hook discovery to find available plugins dynamically - snapshot.create_pending_archiveresults() + # Output JSONL record (only when piped) + if not is_tty: + write_record(crawl.to_jsonl()) - # Mark snapshot as started - snapshot.status = Snapshot.StatusChoices.STARTED - snapshot.retry_at = timezone.now() - snapshot.save() + rprint(f'[green]Created crawl with {len(urls)} URLs[/green]', file=sys.stderr) - except Snapshot.DoesNotExist: - continue - - # Run plugins - if wait: - rprint('[blue]Running outlink plugins...[/blue]', file=sys.stderr) - orchestrator = Orchestrator(exit_on_idle=True) - orchestrator.runloop() - - # Collect discovered URLs from urls.jsonl files - # Uses dynamic discovery - any plugin that outputs urls.jsonl is considered a parser - from archivebox.hooks import collect_urls_from_plugins - - discovered_urls = {} - for snapshot_id in snapshot_ids: - try: - snapshot = Snapshot.objects.get(id=snapshot_id) - snapshot_dir = Path(snapshot.output_dir) - - # Dynamically collect urls.jsonl from ANY plugin subdirectory - for entry in collect_urls_from_plugins(snapshot_dir): - url = entry.get('url') - if url and url not in discovered_urls: - # Add metadata for crawl tracking - entry['type'] = TYPE_SNAPSHOT - entry['depth'] = snapshot.depth + 1 - entry['via_snapshot'] = str(snapshot.id) - discovered_urls[url] = entry - - except Snapshot.DoesNotExist: - continue - - rprint(f'[green]Discovered {len(discovered_urls)} URLs[/green]', file=sys.stderr) - - # Output discovered URLs as JSONL (when piped) or human-readable (when TTY) - for url, entry in discovered_urls.items(): + # If TTY, show human-readable output if is_tty: - via = entry.get('via_extractor', 'unknown') - rprint(f' [dim]{via}[/dim] {url[:80]}', file=sys.stderr) - else: - write_record(entry) + rprint(f' [dim]{crawl.id}[/dim]', file=sys.stderr) + for url in urls[:5]: # Show first 5 URLs + rprint(f' {url[:70]}', file=sys.stderr) + if len(urls) > 5: + rprint(f' ... and {len(urls) - 5} more', file=sys.stderr) - return 0 + return 0 + + except Exception as e: + rprint(f'[red]Error creating crawl: {e}[/red]', file=sys.stderr) + return 1 def process_crawl_by_id(crawl_id: str) -> int: @@ -262,12 +152,11 @@ def is_crawl_id(value: str) -> bool: @click.command() -@click.option('--depth', '-d', type=int, default=1, help='Max depth for recursive crawling (default: 1)') -@click.option('--plugin', '-p', default='', help='Use only this parser plugin (e.g., parse_html_urls, parse_dom_outlinks)') -@click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)') +@click.option('--depth', '-d', type=int, default=0, help='Max depth for recursive crawling (default: 0, no recursion)') +@click.option('--tag', '-t', default='', help='Comma-separated tags to add to snapshots') @click.argument('args', nargs=-1) -def main(depth: int, plugin: str, wait: bool, args: tuple): - """Discover outgoing links from URLs or existing Snapshots, or process Crawl by ID""" +def main(depth: int, tag: str, args: tuple): + """Create Crawl jobs from URLs, or process existing Crawls by ID""" from archivebox.misc.jsonl import read_args_or_stdin # Read all input @@ -275,7 +164,7 @@ def main(depth: int, plugin: str, wait: bool, args: tuple): if not records: from rich import print as rprint - rprint('[yellow]No URLs, Snapshot IDs, or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) + rprint('[yellow]No URLs or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) sys.exit(1) # Check if input looks like existing Crawl IDs to process @@ -295,8 +184,8 @@ def main(depth: int, plugin: str, wait: bool, args: tuple): exit_code = result sys.exit(exit_code) else: - # Default behavior: discover outlinks from input (URLs or Snapshot IDs) - sys.exit(discover_outlinks(args, depth=depth, plugin=plugin, wait=wait)) + # Default behavior: create Crawl jobs from URLs + sys.exit(create_crawls(records, depth=depth, tag=tag)) if __name__ == '__main__': diff --git a/archivebox/cli/archivebox_extract.py b/archivebox/cli/archivebox_extract.py index 29abd63d..6747e74e 100644 --- a/archivebox/cli/archivebox_extract.py +++ b/archivebox/cli/archivebox_extract.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -archivebox extract [snapshot_ids...] [--plugin=NAME] +archivebox extract [snapshot_ids...] [--plugins=NAMES] Run plugins on Snapshots. Accepts snapshot IDs as arguments, from stdin, or via JSONL. @@ -20,8 +20,8 @@ Examples: # Pipe from snapshot command archivebox snapshot https://example.com | archivebox extract - # Run specific plugin only - archivebox extract --plugin=screenshot 01234567-89ab-cdef-0123-456789abcdef + # Run specific plugins only + archivebox extract --plugins=screenshot,singlefile 01234567-89ab-cdef-0123-456789abcdef # Chain commands archivebox crawl https://example.com | archivebox snapshot | archivebox extract @@ -76,7 +76,7 @@ def process_archiveresult_by_id(archiveresult_id: str) -> int: def run_plugins( args: tuple, - plugin: str = '', + plugins: str = '', wait: bool = True, ) -> int: """ @@ -92,7 +92,7 @@ def run_plugins( from django.utils import timezone from archivebox.misc.jsonl import ( - read_args_or_stdin, write_record, archiveresult_to_jsonl, + read_args_or_stdin, write_record, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT ) from archivebox.core.models import Snapshot, ArchiveResult @@ -147,21 +147,25 @@ def run_plugins( continue # Create pending ArchiveResults if needed - if plugin: - # Only create for specific plugin - result, created = ArchiveResult.objects.get_or_create( - snapshot=snapshot, - plugin=plugin, - defaults={ - 'status': ArchiveResult.StatusChoices.QUEUED, - 'retry_at': timezone.now(), - } - ) - if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]: - # Reset for retry - result.status = ArchiveResult.StatusChoices.QUEUED - result.retry_at = timezone.now() - result.save() + if plugins: + # Parse comma-separated plugins list + plugins_list = [p.strip() for p in plugins.split(',') if p.strip()] + + # Only create for specific plugins + for plugin_name in plugins_list: + result, created = ArchiveResult.objects.get_or_create( + snapshot=snapshot, + plugin=plugin_name, + defaults={ + 'status': ArchiveResult.StatusChoices.QUEUED, + 'retry_at': timezone.now(), + } + ) + if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]: + # Reset for retry + result.status = ArchiveResult.StatusChoices.QUEUED + result.retry_at = timezone.now() + result.save() else: # Create all pending plugins snapshot.create_pending_archiveresults() @@ -191,8 +195,10 @@ def run_plugins( try: snapshot = Snapshot.objects.get(id=snapshot_id) results = snapshot.archiveresult_set.all() - if plugin: - results = results.filter(plugin=plugin) + if plugins: + # Parse comma-separated plugins list + plugins_list = [p.strip() for p in plugins.split(',') if p.strip()] + results = results.filter(plugin__in=plugins_list) for result in results: if is_tty: @@ -203,7 +209,7 @@ def run_plugins( }.get(result.status, 'dim') rprint(f' [{status_color}]{result.status}[/{status_color}] {result.plugin} → {result.output_str or ""}', file=sys.stderr) else: - write_record(archiveresult_to_jsonl(result)) + write_record(result.to_jsonl()) except Snapshot.DoesNotExist: continue @@ -222,10 +228,10 @@ def is_archiveresult_id(value: str) -> bool: @click.command() -@click.option('--plugin', '-p', default='', help='Run only this plugin (e.g., screenshot, singlefile)') +@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run (e.g., screenshot,singlefile)') @click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)') @click.argument('args', nargs=-1) -def main(plugin: str, wait: bool, args: tuple): +def main(plugins: str, wait: bool, args: tuple): """Run plugins on Snapshots, or process existing ArchiveResults by ID""" from archivebox.misc.jsonl import read_args_or_stdin @@ -254,7 +260,7 @@ def main(plugin: str, wait: bool, args: tuple): sys.exit(exit_code) else: # Default behavior: run plugins on Snapshots from input - sys.exit(run_plugins(args, plugin=plugin, wait=wait)) + sys.exit(run_plugins(args, plugins=plugins, wait=wait)) if __name__ == '__main__': diff --git a/archivebox/cli/archivebox_snapshot.py b/archivebox/cli/archivebox_snapshot.py index 4d2f7b5f..dc540139 100644 --- a/archivebox/cli/archivebox_snapshot.py +++ b/archivebox/cli/archivebox_snapshot.py @@ -1,29 +1,34 @@ #!/usr/bin/env python3 """ -archivebox snapshot [urls...] [--depth=N] [--tag=TAG] [--plugins=...] +archivebox snapshot [urls_or_crawl_ids...] [--tag=TAG] [--plugins=NAMES] -Create Snapshots from URLs. Accepts URLs as arguments, from stdin, or via JSONL. +Create Snapshots from URLs or Crawl jobs. Accepts URLs, Crawl JSONL, or Crawl IDs. Input formats: - Plain URLs (one per line) + - JSONL: {"type": "Crawl", "id": "...", "urls": "..."} - JSONL: {"type": "Snapshot", "url": "...", "title": "...", "tags": "..."} + - Crawl UUIDs (one per line) Output (JSONL): {"type": "Snapshot", "id": "...", "url": "...", "status": "queued", ...} Examples: - # Create snapshots from URLs + # Create snapshots from URLs directly archivebox snapshot https://example.com https://foo.com - # Pipe from stdin - echo 'https://example.com' | archivebox snapshot + # Pipe from crawl command + archivebox crawl https://example.com | archivebox snapshot # Chain with extract - archivebox snapshot https://example.com | archivebox extract + archivebox crawl https://example.com | archivebox snapshot | archivebox extract - # With crawl depth - archivebox snapshot --depth=1 https://example.com + # Run specific plugins after creating snapshots + archivebox snapshot --plugins=screenshot,singlefile https://example.com + + # Process existing Snapshot by ID + archivebox snapshot 01234567-89ab-cdef-0123-456789abcdef """ __package__ = 'archivebox.cli' @@ -67,14 +72,13 @@ def process_snapshot_by_id(snapshot_id: str) -> int: def create_snapshots( - urls: tuple, - depth: int = 0, + args: tuple, tag: str = '', plugins: str = '', created_by_id: Optional[int] = None, ) -> int: """ - Create Snapshots from URLs or JSONL records. + Create Snapshots from URLs, Crawl JSONL, or Crawl IDs. Reads from args or stdin, creates Snapshot objects, outputs JSONL. If --plugins is passed, also runs specified plugins (blocking). @@ -87,64 +91,78 @@ def create_snapshots( from django.utils import timezone from archivebox.misc.jsonl import ( - read_args_or_stdin, write_record, snapshot_to_jsonl, - TYPE_SNAPSHOT, TYPE_TAG + read_args_or_stdin, write_record, + TYPE_SNAPSHOT, TYPE_CRAWL ) from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.core.models import Snapshot from archivebox.crawls.models import Crawl - from archivebox.config import CONSTANTS created_by_id = created_by_id or get_or_create_system_user_pk() is_tty = sys.stdout.isatty() # Collect all input records - records = list(read_args_or_stdin(urls)) + records = list(read_args_or_stdin(args)) if not records: - rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr) + rprint('[yellow]No URLs or Crawls provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr) return 1 - # If depth > 0, we need a Crawl to manage recursive discovery - crawl = None - if depth > 0: - # Create a crawl for this batch - sources_file = CONSTANTS.SOURCES_DIR / f'{timezone.now().strftime("%Y-%m-%d__%H-%M-%S")}__snapshot.txt' - sources_file.parent.mkdir(parents=True, exist_ok=True) - sources_file.write_text('\n'.join(r.get('url', '') for r in records if r.get('url'))) - - crawl = Crawl.from_file( - sources_file, - max_depth=depth, - label=f'snapshot --depth={depth}', - created_by=created_by_id, - ) - - # Process each record + # Process each record - handle Crawls and plain URLs/Snapshots created_snapshots = [] for record in records: - if record.get('type') != TYPE_SNAPSHOT and 'url' not in record: - continue + record_type = record.get('type') try: - # Add crawl info if we have one - if crawl: - record['crawl_id'] = str(crawl.id) - record['depth'] = record.get('depth', 0) + if record_type == TYPE_CRAWL: + # Input is a Crawl - get or create it, then create Snapshots for its URLs + crawl = None + crawl_id = record.get('id') + if crawl_id: + try: + crawl = Crawl.objects.get(id=crawl_id) + except Crawl.DoesNotExist: + # Crawl doesn't exist, create it + crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id}) + else: + # No ID, create new crawl + crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id}) - # Add tags if provided via CLI - if tag and not record.get('tags'): - record['tags'] = tag + if not crawl: + continue - # Get or create the snapshot - overrides = {'created_by_id': created_by_id} - snapshot = Snapshot.from_jsonl(record, overrides=overrides) - if snapshot: - created_snapshots.append(snapshot) + # Create snapshots for each URL in the crawl + for url in crawl.get_urls_list(): + # Merge CLI tags with crawl tags + merged_tags = crawl.tags_str + if tag: + if merged_tags: + merged_tags = f"{merged_tags},{tag}" + else: + merged_tags = tag + snapshot_record = { + 'url': url, + 'tags': merged_tags, + 'crawl_id': str(crawl.id), + 'depth': 0, + } + snapshot = Snapshot.from_jsonl(snapshot_record, overrides={'created_by_id': created_by_id}) + if snapshot: + created_snapshots.append(snapshot) + if not is_tty: + write_record(snapshot.to_jsonl()) - # Output JSONL record (only when piped) - if not is_tty: - write_record(snapshot_to_jsonl(snapshot)) + elif record_type == TYPE_SNAPSHOT or record.get('url'): + # Input is a Snapshot or plain URL + # Add tags if provided via CLI + if tag and not record.get('tags'): + record['tags'] = tag + + snapshot = Snapshot.from_jsonl(record, overrides={'created_by_id': created_by_id}) + if snapshot: + created_snapshots.append(snapshot) + if not is_tty: + write_record(snapshot.to_jsonl()) except Exception as e: rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr) @@ -161,10 +179,32 @@ def create_snapshots( for snapshot in created_snapshots: rprint(f' [dim]{snapshot.id}[/dim] {snapshot.url[:60]}', file=sys.stderr) - # If --plugins is passed, run the orchestrator for those plugins + # If --plugins is passed, create ArchiveResults and run the orchestrator if plugins: + from archivebox.core.models import ArchiveResult from archivebox.workers.orchestrator import Orchestrator - rprint(f'[blue]Running plugins: {plugins or "all"}...[/blue]', file=sys.stderr) + + # Parse comma-separated plugins list + plugins_list = [p.strip() for p in plugins.split(',') if p.strip()] + + # Create ArchiveResults for the specific plugins on each snapshot + for snapshot in created_snapshots: + for plugin_name in plugins_list: + result, created = ArchiveResult.objects.get_or_create( + snapshot=snapshot, + plugin=plugin_name, + defaults={ + 'status': ArchiveResult.StatusChoices.QUEUED, + 'retry_at': timezone.now(), + } + ) + if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]: + # Reset for retry + result.status = ArchiveResult.StatusChoices.QUEUED + result.retry_at = timezone.now() + result.save() + + rprint(f'[blue]Running plugins: {plugins}...[/blue]', file=sys.stderr) orchestrator = Orchestrator(exit_on_idle=True) orchestrator.runloop() @@ -175,16 +215,19 @@ def is_snapshot_id(value: str) -> bool: """Check if value looks like a Snapshot UUID.""" import re uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I) - return bool(uuid_pattern.match(value)) + if not uuid_pattern.match(value): + return False + # Verify it's actually a Snapshot (not a Crawl or other object) + from archivebox.core.models import Snapshot + return Snapshot.objects.filter(id=value).exists() @click.command() -@click.option('--depth', '-d', type=int, default=0, help='Recursively crawl linked pages up to N levels deep') @click.option('--tag', '-t', default='', help='Comma-separated tags to add to each snapshot') -@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run after creating snapshots (e.g. title,screenshot)') +@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run after creating snapshots (e.g., screenshot,singlefile)') @click.argument('args', nargs=-1) -def main(depth: int, tag: str, plugins: str, args: tuple): - """Create Snapshots from URLs, or process existing Snapshots by ID""" +def main(tag: str, plugins: str, args: tuple): + """Create Snapshots from URLs/Crawls, or process existing Snapshots by ID""" from archivebox.misc.jsonl import read_args_or_stdin # Read all input @@ -192,17 +235,21 @@ def main(depth: int, tag: str, plugins: str, args: tuple): if not records: from rich import print as rprint - rprint('[yellow]No URLs or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) + rprint('[yellow]No URLs, Crawl IDs, or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr) sys.exit(1) # Check if input looks like existing Snapshot IDs to process - # If ALL inputs are UUIDs with no URL, assume we're processing existing Snapshots - all_are_ids = all( - (r.get('id') and not r.get('url')) or is_snapshot_id(r.get('url', '')) + # If ALL inputs are UUIDs with no URL and exist as Snapshots, process them + all_are_snapshot_ids = all( + is_snapshot_id(r.get('id') or r.get('url', '')) for r in records + if r.get('type') != 'Crawl' # Don't check Crawl records as Snapshot IDs ) - if all_are_ids: + # But also check that we're not receiving Crawl JSONL + has_crawl_records = any(r.get('type') == 'Crawl' for r in records) + + if all_are_snapshot_ids and not has_crawl_records: # Process existing Snapshots by ID exit_code = 0 for record in records: @@ -212,8 +259,8 @@ def main(depth: int, tag: str, plugins: str, args: tuple): exit_code = result sys.exit(exit_code) else: - # Create new Snapshots from URLs - sys.exit(create_snapshots(args, depth=depth, tag=tag, plugins=plugins)) + # Create new Snapshots from URLs or Crawls + sys.exit(create_snapshots(args, tag=tag, plugins=plugins)) if __name__ == '__main__': diff --git a/archivebox/cli/tests_piping.py b/archivebox/cli/tests_piping.py index 26125935..f6aee426 100644 --- a/archivebox/cli/tests_piping.py +++ b/archivebox/cli/tests_piping.py @@ -6,12 +6,15 @@ This module tests the JSONL-based piping between CLI commands as described in: https://github.com/ArchiveBox/ArchiveBox/issues/1363 Workflows tested: - archivebox snapshot URL | archivebox extract + archivebox crawl URL -> Crawl JSONL + archivebox snapshot -> Snapshot JSONL (accepts Crawl or URL input) + archivebox extract -> ArchiveResult JSONL (accepts Snapshot input) + +Pipeline: archivebox crawl URL | archivebox snapshot | archivebox extract - archivebox crawl --plugin=PARSER URL | archivebox snapshot | archivebox extract Each command should: - - Accept URLs, snapshot_ids, or JSONL as input (args or stdin) + - Accept URLs, IDs, or JSONL as input (args or stdin) - Output JSONL to stdout when piped (not TTY) - Output human-readable to stderr when TTY """ @@ -84,6 +87,18 @@ class TestJSONLParsing(unittest.TestCase): self.assertEqual(result['url'], 'https://example.com') self.assertEqual(result['tags'], 'test,demo') + def test_parse_jsonl_crawl(self): + """JSONL Crawl records should be parsed correctly.""" + from archivebox.misc.jsonl import parse_line, TYPE_CRAWL + + line = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com", "max_depth": 1}' + result = parse_line(line) + self.assertIsNotNone(result) + self.assertEqual(result['type'], TYPE_CRAWL) + self.assertEqual(result['id'], 'abc123') + self.assertEqual(result['urls'], 'https://example.com') + self.assertEqual(result['max_depth'], 1) + def test_parse_jsonl_with_id(self): """JSONL with id field should be recognized.""" from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT @@ -139,47 +154,32 @@ class TestJSONLParsing(unittest.TestCase): class TestJSONLOutput(unittest.TestCase): """Test JSONL output formatting.""" - def test_snapshot_to_jsonl(self): - """Snapshot model should serialize to JSONL correctly.""" - from archivebox.misc.jsonl import snapshot_to_jsonl, TYPE_SNAPSHOT + def test_crawl_to_jsonl(self): + """Crawl model should serialize to JSONL correctly.""" + from archivebox.misc.jsonl import TYPE_CRAWL - # Create a mock snapshot - mock_snapshot = MagicMock() - mock_snapshot.id = 'test-uuid-1234' - mock_snapshot.url = 'https://example.com' - mock_snapshot.title = 'Example Title' - mock_snapshot.tags_str.return_value = 'tag1,tag2' - mock_snapshot.bookmarked_at = None - mock_snapshot.created_at = None - mock_snapshot.timestamp = '1234567890' - mock_snapshot.depth = 0 - mock_snapshot.status = 'queued' + # Create a mock crawl with to_jsonl method configured + mock_crawl = MagicMock() + mock_crawl.to_jsonl.return_value = { + 'type': TYPE_CRAWL, + 'schema_version': '0.9.0', + 'id': 'test-crawl-uuid', + 'urls': 'https://example.com', + 'status': 'queued', + 'max_depth': 0, + 'tags_str': 'tag1,tag2', + 'label': '', + 'created_at': None, + } - result = snapshot_to_jsonl(mock_snapshot) - self.assertEqual(result['type'], TYPE_SNAPSHOT) - self.assertEqual(result['id'], 'test-uuid-1234') - self.assertEqual(result['url'], 'https://example.com') - self.assertEqual(result['title'], 'Example Title') + result = mock_crawl.to_jsonl() + self.assertEqual(result['type'], TYPE_CRAWL) + self.assertEqual(result['id'], 'test-crawl-uuid') + self.assertEqual(result['urls'], 'https://example.com') + self.assertEqual(result['status'], 'queued') - def test_archiveresult_to_jsonl(self): - """ArchiveResult model should serialize to JSONL correctly.""" - from archivebox.misc.jsonl import archiveresult_to_jsonl, TYPE_ARCHIVERESULT - - mock_result = MagicMock() - mock_result.id = 'result-uuid-5678' - mock_result.snapshot_id = 'snapshot-uuid-1234' - mock_result.extractor = 'title' - mock_result.status = 'succeeded' - mock_result.output = 'Example Title' - mock_result.start_ts = None - mock_result.end_ts = None - - result = archiveresult_to_jsonl(mock_result) - self.assertEqual(result['type'], TYPE_ARCHIVERESULT) - self.assertEqual(result['id'], 'result-uuid-5678') - self.assertEqual(result['snapshot_id'], 'snapshot-uuid-1234') - self.assertEqual(result['extractor'], 'title') - self.assertEqual(result['status'], 'succeeded') + # Note: Snapshot and ArchiveResult serialization is tested in integration tests + # (TestPipingWorkflowIntegration) using real model instances, not mocks. class TestReadArgsOrStdin(unittest.TestCase): @@ -226,6 +226,20 @@ class TestReadArgsOrStdin(unittest.TestCase): self.assertEqual(records[0]['url'], 'https://example.com') self.assertEqual(records[0]['tags'], 'test') + def test_read_crawl_jsonl_from_stdin(self): + """Should read Crawl JSONL from stdin.""" + from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL + + stdin_content = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com\\nhttps://foo.com"}\n' + stream = StringIO(stdin_content) + stream.isatty = lambda: False + + records = list(read_args_or_stdin((), stream=stream)) + + self.assertEqual(len(records), 1) + self.assertEqual(records[0]['type'], TYPE_CRAWL) + self.assertEqual(records[0]['id'], 'abc123') + def test_skip_tty_stdin(self): """Should not read from TTY stdin (would block).""" from archivebox.misc.jsonl import read_args_or_stdin @@ -263,55 +277,23 @@ class TestCrawlCommand(unittest.TestCase): self.assertEqual(len(records), 1) self.assertEqual(records[0]['url'], 'https://example.com') - def test_crawl_accepts_snapshot_id(self): - """crawl should accept snapshot IDs as input.""" - from archivebox.misc.jsonl import read_args_or_stdin + def test_crawl_output_format(self): + """crawl should output Crawl JSONL records.""" + from archivebox.misc.jsonl import TYPE_CRAWL - uuid = '01234567-89ab-cdef-0123-456789abcdef' - args = (uuid,) - records = list(read_args_or_stdin(args)) + # Mock crawl output + crawl_output = { + 'type': TYPE_CRAWL, + 'schema_version': '0.9.0', + 'id': 'test-crawl-id', + 'urls': 'https://example.com', + 'status': 'queued', + 'max_depth': 0, + } - self.assertEqual(len(records), 1) - self.assertEqual(records[0]['id'], uuid) - - def test_crawl_accepts_jsonl(self): - """crawl should accept JSONL with snapshot info.""" - from archivebox.misc.jsonl import read_args_or_stdin - - stdin = StringIO('{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}\n') - stdin.isatty = lambda: False - - records = list(read_args_or_stdin((), stream=stdin)) - - self.assertEqual(len(records), 1) - self.assertEqual(records[0]['id'], 'abc123') - self.assertEqual(records[0]['url'], 'https://example.com') - - def test_crawl_separates_existing_vs_new(self): - """crawl should identify existing snapshots vs new URLs.""" - # This tests the logic in discover_outlinks() that separates - # records with 'id' (existing) from records with just 'url' (new) - - records = [ - {'type': 'Snapshot', 'id': 'existing-id-1'}, # Existing (id only) - {'type': 'Snapshot', 'url': 'https://new-url.com'}, # New (url only) - {'type': 'Snapshot', 'id': 'existing-id-2', 'url': 'https://existing.com'}, # Existing (has id) - ] - - existing = [] - new = [] - - for record in records: - if record.get('id') and not record.get('url'): - existing.append(record['id']) - elif record.get('id'): - existing.append(record['id']) # Has both id and url - treat as existing - elif record.get('url'): - new.append(record) - - self.assertEqual(len(existing), 2) - self.assertEqual(len(new), 1) - self.assertEqual(new[0]['url'], 'https://new-url.com') + self.assertEqual(crawl_output['type'], TYPE_CRAWL) + self.assertIn('id', crawl_output) + self.assertIn('urls', crawl_output) class TestSnapshotCommand(unittest.TestCase): @@ -336,6 +318,20 @@ class TestSnapshotCommand(unittest.TestCase): self.assertEqual(len(records), 1) self.assertEqual(records[0]['url'], 'https://example.com') + def test_snapshot_accepts_crawl_jsonl(self): + """snapshot should accept Crawl JSONL as input.""" + from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL + + stdin = StringIO('{"type": "Crawl", "id": "abc123", "urls": "https://example.com"}\n') + stdin.isatty = lambda: False + + records = list(read_args_or_stdin((), stream=stdin)) + + self.assertEqual(len(records), 1) + self.assertEqual(records[0]['type'], TYPE_CRAWL) + self.assertEqual(records[0]['id'], 'abc123') + self.assertEqual(records[0]['urls'], 'https://example.com') + def test_snapshot_accepts_jsonl_with_metadata(self): """snapshot should accept JSONL with tags and other metadata.""" from archivebox.misc.jsonl import read_args_or_stdin @@ -350,26 +346,9 @@ class TestSnapshotCommand(unittest.TestCase): self.assertEqual(records[0]['tags'], 'tag1,tag2') self.assertEqual(records[0]['title'], 'Test') - def test_snapshot_output_format(self): - """snapshot output should include id and url.""" - from archivebox.misc.jsonl import snapshot_to_jsonl - - mock_snapshot = MagicMock() - mock_snapshot.id = 'test-id' - mock_snapshot.url = 'https://example.com' - mock_snapshot.title = 'Test' - mock_snapshot.tags_str.return_value = '' - mock_snapshot.bookmarked_at = None - mock_snapshot.created_at = None - mock_snapshot.timestamp = '123' - mock_snapshot.depth = 0 - mock_snapshot.status = 'queued' - - output = snapshot_to_jsonl(mock_snapshot) - - self.assertIn('id', output) - self.assertIn('url', output) - self.assertEqual(output['type'], 'Snapshot') + # Note: Snapshot output format is tested in integration tests + # (TestPipingWorkflowIntegration.test_snapshot_creates_and_outputs_jsonl) + # using real Snapshot instances. class TestExtractCommand(unittest.TestCase): @@ -537,6 +516,86 @@ class TestPipingWorkflowIntegration(unittest.TestCase): """Clean up test database.""" shutil.rmtree(cls.test_dir, ignore_errors=True) + def test_crawl_creates_and_outputs_jsonl(self): + """ + Test: archivebox crawl URL1 URL2 URL3 + Should create a single Crawl with all URLs and output JSONL when piped. + """ + from archivebox.crawls.models import Crawl + from archivebox.misc.jsonl import TYPE_CRAWL + from archivebox.base_models.models import get_or_create_system_user_pk + + created_by_id = get_or_create_system_user_pk() + + # Create crawl with multiple URLs (as newline-separated string) + urls = 'https://test-crawl-1.example.com\nhttps://test-crawl-2.example.com' + crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id}) + + self.assertIsNotNone(crawl) + self.assertIsNotNone(crawl.id) + self.assertEqual(crawl.urls, urls) + self.assertEqual(crawl.status, 'queued') + + # Verify URLs list + urls_list = crawl.get_urls_list() + self.assertEqual(len(urls_list), 2) + self.assertIn('https://test-crawl-1.example.com', urls_list) + self.assertIn('https://test-crawl-2.example.com', urls_list) + + # Verify output format + output = crawl.to_jsonl() + self.assertEqual(output['type'], TYPE_CRAWL) + self.assertIn('id', output) + self.assertEqual(output['urls'], urls) + self.assertIn('schema_version', output) + + def test_snapshot_accepts_crawl_jsonl(self): + """ + Test: archivebox crawl URL | archivebox snapshot + Snapshot should accept Crawl JSONL and create Snapshots for each URL. + """ + from archivebox.crawls.models import Crawl + from archivebox.core.models import Snapshot + from archivebox.misc.jsonl import ( + read_args_or_stdin, + TYPE_CRAWL, TYPE_SNAPSHOT + ) + from archivebox.base_models.models import get_or_create_system_user_pk + + created_by_id = get_or_create_system_user_pk() + + # Step 1: Create crawl (simulating 'archivebox crawl') + urls = 'https://crawl-to-snap-1.example.com\nhttps://crawl-to-snap-2.example.com' + crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id}) + crawl_output = crawl.to_jsonl() + + # Step 2: Parse crawl output as snapshot input + stdin = StringIO(json.dumps(crawl_output) + '\n') + stdin.isatty = lambda: False + + records = list(read_args_or_stdin((), stream=stdin)) + + self.assertEqual(len(records), 1) + self.assertEqual(records[0]['type'], TYPE_CRAWL) + + # Step 3: Create snapshots from crawl URLs + created_snapshots = [] + for url in crawl.get_urls_list(): + snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id}) + if snapshot: + created_snapshots.append(snapshot) + + self.assertEqual(len(created_snapshots), 2) + + # Verify snapshot output + for snapshot in created_snapshots: + output = snapshot.to_jsonl() + self.assertEqual(output['type'], TYPE_SNAPSHOT) + self.assertIn(output['url'], [ + 'https://crawl-to-snap-1.example.com', + 'https://crawl-to-snap-2.example.com' + ]) + def test_snapshot_creates_and_outputs_jsonl(self): """ Test: archivebox snapshot URL @@ -544,7 +603,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): """ from archivebox.core.models import Snapshot from archivebox.misc.jsonl import ( - read_args_or_stdin, write_record, snapshot_to_jsonl, + read_args_or_stdin, write_record, TYPE_SNAPSHOT ) from archivebox.base_models.models import get_or_create_system_user_pk @@ -566,7 +625,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): self.assertEqual(snapshot.url, url) # Verify output format - output = snapshot_to_jsonl(snapshot) + output = snapshot.to_jsonl() self.assertEqual(output['type'], TYPE_SNAPSHOT) self.assertIn('id', output) self.assertEqual(output['url'], url) @@ -578,7 +637,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): """ from archivebox.core.models import Snapshot, ArchiveResult from archivebox.misc.jsonl import ( - snapshot_to_jsonl, read_args_or_stdin, + read_args_or_stdin, TYPE_SNAPSHOT ) from archivebox.base_models.models import get_or_create_system_user_pk @@ -589,7 +648,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase): url = 'https://test-extract-1.example.com' overrides = {'created_by_id': created_by_id} snapshot = Snapshot.from_jsonl({'url': url}, overrides=overrides) - snapshot_output = snapshot_to_jsonl(snapshot) + snapshot_output = snapshot.to_jsonl() # Step 2: Parse snapshot output as extract input stdin = StringIO(json.dumps(snapshot_output) + '\n') @@ -609,143 +668,59 @@ class TestPipingWorkflowIntegration(unittest.TestCase): self.assertIn(str(snapshot.id), snapshot_ids) - def test_crawl_outputs_discovered_urls(self): - """ - Test: archivebox crawl URL - Should create snapshot, run plugins, output discovered URLs. - """ - from archivebox.hooks import collect_urls_from_plugins - from archivebox.misc.jsonl import TYPE_SNAPSHOT - - # Create a mock snapshot directory with urls.jsonl - test_snapshot_dir = Path(self.test_dir) / 'archive' / 'test-crawl-snapshot' - test_snapshot_dir.mkdir(parents=True, exist_ok=True) - - # Create mock extractor output - (test_snapshot_dir / 'parse_html_urls').mkdir() - (test_snapshot_dir / 'parse_html_urls' / 'urls.jsonl').write_text( - '{"url": "https://discovered-1.com"}\n' - '{"url": "https://discovered-2.com", "title": "Discovered 2"}\n' - ) - - # Collect URLs (as crawl does) - discovered = collect_urls_from_plugins(test_snapshot_dir) - - self.assertEqual(len(discovered), 2) - - # Add crawl metadata (as crawl does) - for entry in discovered: - entry['type'] = TYPE_SNAPSHOT - entry['depth'] = 1 - entry['via_snapshot'] = 'test-crawl-snapshot' - - # Verify output format - self.assertEqual(discovered[0]['type'], TYPE_SNAPSHOT) - self.assertEqual(discovered[0]['depth'], 1) - self.assertEqual(discovered[0]['url'], 'https://discovered-1.com') - - def test_full_pipeline_snapshot_extract(self): - """ - Test: archivebox snapshot URL | archivebox extract - - This is equivalent to: archivebox add URL - """ - from archivebox.core.models import Snapshot - from archivebox.misc.jsonl import ( - get_or_create_snapshot, snapshot_to_jsonl, read_args_or_stdin, - TYPE_SNAPSHOT - ) - from archivebox.base_models.models import get_or_create_system_user_pk - - created_by_id = get_or_create_system_user_pk() - - # === archivebox snapshot https://example.com === - url = 'https://test-pipeline-1.example.com' - snapshot = get_or_create_snapshot({'url': url}, created_by_id=created_by_id) - snapshot_jsonl = json.dumps(snapshot_to_jsonl(snapshot)) - - # === | archivebox extract === - stdin = StringIO(snapshot_jsonl + '\n') - stdin.isatty = lambda: False - - records = list(read_args_or_stdin((), stream=stdin)) - - # Extract should receive the snapshot ID - self.assertEqual(len(records), 1) - self.assertEqual(records[0]['id'], str(snapshot.id)) - - # Verify snapshot exists in DB - db_snapshot = Snapshot.objects.get(id=snapshot.id) - self.assertEqual(db_snapshot.url, url) - def test_full_pipeline_crawl_snapshot_extract(self): """ Test: archivebox crawl URL | archivebox snapshot | archivebox extract - This is equivalent to: archivebox add --depth=1 URL + This is equivalent to: archivebox add --depth=0 URL """ + from archivebox.crawls.models import Crawl from archivebox.core.models import Snapshot from archivebox.misc.jsonl import ( - get_or_create_snapshot, snapshot_to_jsonl, read_args_or_stdin, - TYPE_SNAPSHOT + read_args_or_stdin, + TYPE_CRAWL, TYPE_SNAPSHOT ) from archivebox.base_models.models import get_or_create_system_user_pk - from archivebox.hooks import collect_urls_from_plugins created_by_id = get_or_create_system_user_pk() # === archivebox crawl https://example.com === - # Step 1: Create snapshot for starting URL - start_url = 'https://test-crawl-pipeline.example.com' - start_snapshot = get_or_create_snapshot({'url': start_url}, created_by_id=created_by_id) - - # Step 2: Simulate extractor output with discovered URLs - snapshot_dir = Path(self.test_dir) / 'archive' / str(start_snapshot.timestamp) - snapshot_dir.mkdir(parents=True, exist_ok=True) - (snapshot_dir / 'parse_html_urls').mkdir(exist_ok=True) - (snapshot_dir / 'parse_html_urls' / 'urls.jsonl').write_text( - '{"url": "https://outlink-1.example.com"}\n' - '{"url": "https://outlink-2.example.com"}\n' - ) - - # Step 3: Collect discovered URLs (crawl output) - discovered = collect_urls_from_plugins(snapshot_dir) - crawl_output = [] - for entry in discovered: - entry['type'] = TYPE_SNAPSHOT - entry['depth'] = 1 - crawl_output.append(json.dumps(entry)) + url = 'https://test-pipeline-full.example.com' + crawl = Crawl.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id}) + crawl_jsonl = json.dumps(crawl.to_jsonl()) # === | archivebox snapshot === - stdin = StringIO('\n'.join(crawl_output) + '\n') + stdin = StringIO(crawl_jsonl + '\n') stdin.isatty = lambda: False records = list(read_args_or_stdin((), stream=stdin)) - self.assertEqual(len(records), 2) + self.assertEqual(len(records), 1) + self.assertEqual(records[0]['type'], TYPE_CRAWL) - # Create snapshots for discovered URLs + # Create snapshots from crawl created_snapshots = [] for record in records: - snap = get_or_create_snapshot(record, created_by_id=created_by_id) - created_snapshots.append(snap) + if record.get('type') == TYPE_CRAWL: + crawl_id = record.get('id') + if crawl_id: + db_crawl = Crawl.objects.get(id=crawl_id) + for crawl_url in db_crawl.get_urls_list(): + snapshot = Snapshot.from_jsonl({'url': crawl_url}, overrides={'created_by_id': created_by_id}) + if snapshot: + created_snapshots.append(snapshot) - self.assertEqual(len(created_snapshots), 2) + self.assertEqual(len(created_snapshots), 1) + self.assertEqual(created_snapshots[0].url, url) # === | archivebox extract === - snapshot_jsonl_lines = [json.dumps(snapshot_to_jsonl(s)) for s in created_snapshots] + snapshot_jsonl_lines = [json.dumps(s.to_jsonl()) for s in created_snapshots] stdin = StringIO('\n'.join(snapshot_jsonl_lines) + '\n') stdin.isatty = lambda: False records = list(read_args_or_stdin((), stream=stdin)) - self.assertEqual(len(records), 2) - - # Verify all snapshots exist in DB - for record in records: - db_snapshot = Snapshot.objects.get(id=record['id']) - self.assertIn(db_snapshot.url, [ - 'https://outlink-1.example.com', - 'https://outlink-2.example.com' - ]) + self.assertEqual(len(records), 1) + self.assertEqual(records[0]['type'], TYPE_SNAPSHOT) + self.assertEqual(records[0]['id'], str(created_snapshots[0].id)) class TestDepthWorkflows(unittest.TestCase): @@ -770,47 +745,44 @@ class TestDepthWorkflows(unittest.TestCase): def test_depth_0_workflow(self): """ - Test: archivebox snapshot URL | archivebox extract + Test: archivebox crawl URL | archivebox snapshot | archivebox extract - Depth 0: Only archive the specified URL, no crawling. + Depth 0: Only archive the specified URL, no recursive crawling. """ + from archivebox.crawls.models import Crawl from archivebox.core.models import Snapshot - from archivebox.misc.jsonl import get_or_create_snapshot from archivebox.base_models.models import get_or_create_system_user_pk created_by_id = get_or_create_system_user_pk() - # Create snapshot + # Create crawl with depth 0 url = 'https://depth0-test.example.com' - snapshot = get_or_create_snapshot({'url': url}, created_by_id=created_by_id) + crawl = Crawl.from_jsonl({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id}) - # Verify only one snapshot created - self.assertEqual(Snapshot.objects.filter(url=url).count(), 1) + self.assertEqual(crawl.max_depth, 0) + + # Create snapshot + snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id}) self.assertEqual(snapshot.url, url) - def test_depth_1_workflow(self): - """ - Test: archivebox crawl URL | archivebox snapshot | archivebox extract + def test_depth_metadata_in_crawl(self): + """Test that depth metadata is stored in Crawl.""" + from archivebox.crawls.models import Crawl + from archivebox.base_models.models import get_or_create_system_user_pk - Depth 1: Archive URL + all outlinks from that URL. - """ - # This is tested in test_full_pipeline_crawl_snapshot_extract - pass + created_by_id = get_or_create_system_user_pk() - def test_depth_metadata_propagation(self): - """Test that depth metadata propagates through the pipeline.""" - from archivebox.misc.jsonl import TYPE_SNAPSHOT + # Create crawl with depth + crawl = Crawl.from_jsonl( + {'url': 'https://depth-meta-test.example.com', 'max_depth': 2}, + overrides={'created_by_id': created_by_id} + ) - # Simulate crawl output with depth metadata - crawl_output = [ - {'type': TYPE_SNAPSHOT, 'url': 'https://hop1.com', 'depth': 1, 'via_snapshot': 'root'}, - {'type': TYPE_SNAPSHOT, 'url': 'https://hop2.com', 'depth': 2, 'via_snapshot': 'hop1'}, - ] + self.assertEqual(crawl.max_depth, 2) - # Verify depth is preserved - for entry in crawl_output: - self.assertIn('depth', entry) - self.assertIn('via_snapshot', entry) + # Verify in JSONL output + output = crawl.to_jsonl() + self.assertEqual(output['max_depth'], 2) class TestParserPluginWorkflows(unittest.TestCase): @@ -963,6 +935,26 @@ class TestEdgeCases(unittest.TestCase): # UUID self.assertEqual(records[2]['id'], '01234567-89ab-cdef-0123-456789abcdef') + def test_crawl_with_multiple_urls(self): + """Crawl should handle multiple URLs in a single crawl.""" + from archivebox.misc.jsonl import TYPE_CRAWL + + # Test crawl JSONL with multiple URLs + crawl_output = { + 'type': TYPE_CRAWL, + 'id': 'test-multi-url-crawl', + 'urls': 'https://url1.com\nhttps://url2.com\nhttps://url3.com', + 'max_depth': 0, + } + + # Parse the URLs + urls = [u.strip() for u in crawl_output['urls'].split('\n') if u.strip()] + + self.assertEqual(len(urls), 3) + self.assertEqual(urls[0], 'https://url1.com') + self.assertEqual(urls[1], 'https://url2.com') + self.assertEqual(urls[2], 'https://url3.com') + if __name__ == '__main__': unittest.main() diff --git a/archivebox/config/constants.py b/archivebox/config/constants.py index a5c29ff4..59e64aeb 100644 --- a/archivebox/config/constants.py +++ b/archivebox/config/constants.py @@ -100,6 +100,7 @@ class ConstantsDict(Mapping): DATABASE_FILE: Path = DATA_DIR / SQL_INDEX_FILENAME JSON_INDEX_FILENAME: str = 'index.json' + JSONL_INDEX_FILENAME: str = 'index.jsonl' HTML_INDEX_FILENAME: str = 'index.html' ROBOTS_TXT_FILENAME: str = 'robots.txt' FAVICON_FILENAME: str = 'favicon.ico' @@ -187,6 +188,7 @@ class ConstantsDict(Mapping): "queue.sqlite3-wal", "queue.sqlite3-shm", JSON_INDEX_FILENAME, + JSONL_INDEX_FILENAME, HTML_INDEX_FILENAME, ROBOTS_TXT_FILENAME, FAVICON_FILENAME, diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 0a94df61..adfd8c02 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -91,6 +91,19 @@ class Tag(ModelWithSerializers): def api_url(self) -> str: return reverse_lazy('api-1:get_tag', args=[self.id]) + def to_jsonl(self) -> dict: + """ + Convert Tag model instance to a JSONL record. + """ + from archivebox.config import VERSION + return { + 'type': 'Tag', + 'schema_version': VERSION, + 'id': str(self.id), + 'name': self.name, + 'slug': self.slug, + } + @staticmethod def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None): """ @@ -103,19 +116,18 @@ class Tag(ModelWithSerializers): Returns: Tag instance or None """ - from archivebox.misc.jsonl import get_or_create_tag - - try: - tag = get_or_create_tag(record) - - # Auto-attach to snapshot if in overrides - if overrides and 'snapshot' in overrides and tag: - overrides['snapshot'].tags.add(tag) - - return tag - except ValueError: + name = record.get('name') + if not name: return None + tag, _ = Tag.objects.get_or_create(name=name) + + # Auto-attach to snapshot if in overrides + if overrides and 'snapshot' in overrides and tag: + overrides['snapshot'].tags.add(tag) + + return tag + class SnapshotTag(models.Model): id = models.AutoField(primary_key=True) @@ -415,10 +427,11 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea Transaction handling: 1. Copy files INSIDE transaction - 2. Create symlink INSIDE transaction - 3. Update fs_version INSIDE transaction (done by save()) - 4. Exit transaction (DB commit) - 5. Delete old files OUTSIDE transaction (after commit) + 2. Convert index.json to index.jsonl INSIDE transaction + 3. Create symlink INSIDE transaction + 4. Update fs_version INSIDE transaction (done by save()) + 5. Exit transaction (DB commit) + 6. Delete old files OUTSIDE transaction (after commit) """ import shutil from django.db import transaction @@ -427,11 +440,13 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea new_dir = self.get_storage_path_for_version('0.9.0') if not old_dir.exists() or old_dir == new_dir or new_dir.exists(): + # Even if no directory migration needed, still convert index format + self.convert_index_json_to_jsonl() return new_dir.mkdir(parents=True, exist_ok=True) - # Copy all files (idempotent) + # Copy all files (idempotent), skipping index.json (will be converted to jsonl) for old_file in old_dir.rglob('*'): if not old_file.is_file(): continue @@ -456,6 +471,9 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea missing = old_files.keys() - new_files.keys() raise Exception(f"Migration incomplete: missing {missing}") + # Convert index.json to index.jsonl in the new directory + self.convert_index_json_to_jsonl() + # Create backwards-compat symlink (INSIDE transaction) symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp if symlink_path.is_symlink(): @@ -557,9 +575,9 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea @classmethod def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']: """ - Load existing Snapshot from DB by reading index.json. + Load existing Snapshot from DB by reading index.jsonl or index.json. - Reads index.json, extracts url+timestamp, queries DB. + Reads index file, extracts url+timestamp, queries DB. Returns existing Snapshot or None if not found/invalid. Does NOT create new snapshots. @@ -567,21 +585,38 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea """ import json - index_path = snapshot_dir / 'index.json' - if not index_path.exists(): - return None + # Try index.jsonl first (new format), then index.json (legacy) + jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME + json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME - try: - with open(index_path) as f: - data = json.load(f) - except: + data = None + if jsonl_path.exists(): + try: + with open(jsonl_path) as f: + for line in f: + line = line.strip() + if line.startswith('{'): + record = json.loads(line) + if record.get('type') == 'Snapshot': + data = record + break + except (json.JSONDecodeError, OSError): + pass + elif json_path.exists(): + try: + with open(json_path) as f: + data = json.load(f) + except (json.JSONDecodeError, OSError): + pass + + if not data: return None url = data.get('url') if not url: return None - # Get timestamp - prefer index.json, fallback to folder name + # Get timestamp - prefer index file, fallback to folder name timestamp = cls._select_best_timestamp( index_timestamp=data.get('timestamp'), folder_name=snapshot_dir.name @@ -611,14 +646,31 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea """ import json - index_path = snapshot_dir / 'index.json' - if not index_path.exists(): - return None + # Try index.jsonl first (new format), then index.json (legacy) + jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME + json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME - try: - with open(index_path) as f: - data = json.load(f) - except: + data = None + if jsonl_path.exists(): + try: + with open(jsonl_path) as f: + for line in f: + line = line.strip() + if line.startswith('{'): + record = json.loads(line) + if record.get('type') == 'Snapshot': + data = record + break + except (json.JSONDecodeError, OSError): + pass + elif json_path.exists(): + try: + with open(json_path) as f: + data = json.load(f) + except (json.JSONDecodeError, OSError): + pass + + if not data: return None url = data.get('url') @@ -721,26 +773,40 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea # Index.json Reconciliation # ========================================================================= - def reconcile_with_index_json(self): + def reconcile_with_index(self): """ - Merge index.json with DB. DB is source of truth. + Merge index.json/index.jsonl with DB. DB is source of truth. - Title: longest non-URL - Tags: union - ArchiveResults: keep both (by plugin+start_ts) - Writes back in 0.9.x format. + Converts index.json to index.jsonl if needed, then writes back in JSONL format. - Used by: archivebox update (to sync index.json with DB) + Used by: archivebox update (to sync index with DB) """ import json - index_path = Path(self.output_dir) / 'index.json' + # Try to convert index.json to index.jsonl first + self.convert_index_json_to_jsonl() + + # Check for index.jsonl (preferred) or index.json (legacy) + jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME + json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME index_data = {} - if index_path.exists(): + + if jsonl_path.exists(): + # Read from JSONL format + jsonl_data = self.read_index_jsonl() + if jsonl_data['snapshot']: + index_data = jsonl_data['snapshot'] + # Convert archive_results list to expected format + index_data['archive_results'] = jsonl_data['archive_results'] + elif json_path.exists(): + # Fallback to legacy JSON format try: - with open(index_path) as f: + with open(json_path) as f: index_data = json.load(f) except: pass @@ -754,8 +820,12 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea # Merge ArchiveResults self._merge_archive_results_from_index(index_data) - # Write back - self.write_index_json() + # Write back in JSONL format + self.write_index_jsonl() + + def reconcile_with_index_json(self): + """Deprecated: use reconcile_with_index() instead.""" + return self.reconcile_with_index() def _merge_title_from_index(self, index_data: dict): """Merge title - prefer longest non-URL title.""" @@ -831,12 +901,15 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea except: pass + # Support both 'output' (legacy) and 'output_str' (new JSONL) field names + output_str = result_data.get('output_str') or result_data.get('output', '') + ArchiveResult.objects.create( snapshot=self, plugin=plugin, hook_name=result_data.get('hook_name', ''), status=result_data.get('status', 'failed'), - output_str=result_data.get('output', ''), + output_str=output_str, cmd=result_data.get('cmd', []), pwd=result_data.get('pwd', str(self.output_dir)), start_ts=start_ts, @@ -846,7 +919,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea pass def write_index_json(self): - """Write index.json in 0.9.x format.""" + """Write index.json in 0.9.x format (deprecated, use write_index_jsonl).""" import json index_path = Path(self.output_dir) / 'index.json' @@ -877,6 +950,174 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea with open(index_path, 'w') as f: json.dump(data, f, indent=2, sort_keys=True) + def write_index_jsonl(self): + """ + Write index.jsonl in flat JSONL format. + + Each line is a JSON record with a 'type' field: + - Snapshot: snapshot metadata (crawl_id, url, tags, etc.) + - ArchiveResult: extractor results (plugin, status, output, etc.) + - Binary: binary info used for the extraction + - Process: process execution details (cmd, exit_code, timing, etc.) + """ + import json + + index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME + index_path.parent.mkdir(parents=True, exist_ok=True) + + # Collect unique binaries and processes from archive results + binaries_seen = set() + processes_seen = set() + + with open(index_path, 'w') as f: + # Write Snapshot record first + snapshot_record = self.to_jsonl() + snapshot_record['crawl_id'] = str(self.crawl_id) if self.crawl_id else None + snapshot_record['fs_version'] = self.fs_version + f.write(json.dumps(snapshot_record) + '\n') + + # Write ArchiveResult records with their associated Binary and Process + for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts'): + # Write Binary record if not already written + if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen: + binaries_seen.add(ar.process.binary_id) + f.write(json.dumps(ar.process.binary.to_jsonl()) + '\n') + + # Write Process record if not already written + if ar.process and ar.process_id not in processes_seen: + processes_seen.add(ar.process_id) + f.write(json.dumps(ar.process.to_jsonl()) + '\n') + + # Write ArchiveResult record + f.write(json.dumps(ar.to_jsonl()) + '\n') + + def read_index_jsonl(self) -> dict: + """ + Read index.jsonl and return parsed records grouped by type. + + Returns dict with keys: 'snapshot', 'archive_results', 'binaries', 'processes' + """ + import json + from archivebox.misc.jsonl import ( + TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY, TYPE_PROCESS, + ) + + index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME + result = { + 'snapshot': None, + 'archive_results': [], + 'binaries': [], + 'processes': [], + } + + if not index_path.exists(): + return result + + with open(index_path, 'r') as f: + for line in f: + line = line.strip() + if not line or not line.startswith('{'): + continue + try: + record = json.loads(line) + record_type = record.get('type') + if record_type == TYPE_SNAPSHOT: + result['snapshot'] = record + elif record_type == TYPE_ARCHIVERESULT: + result['archive_results'].append(record) + elif record_type == TYPE_BINARY: + result['binaries'].append(record) + elif record_type == TYPE_PROCESS: + result['processes'].append(record) + except json.JSONDecodeError: + continue + + return result + + def convert_index_json_to_jsonl(self) -> bool: + """ + Convert index.json to index.jsonl format. + + Reads existing index.json, creates index.jsonl, and removes index.json. + Returns True if conversion was performed, False if no conversion needed. + """ + import json + + json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME + jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME + + # Skip if already converted or no json file exists + if jsonl_path.exists() or not json_path.exists(): + return False + + try: + with open(json_path, 'r') as f: + data = json.load(f) + except (json.JSONDecodeError, OSError): + return False + + # Detect format version and extract records + fs_version = data.get('fs_version', '0.7.0') + + jsonl_path.parent.mkdir(parents=True, exist_ok=True) + with open(jsonl_path, 'w') as f: + # Write Snapshot record + snapshot_record = { + 'type': 'Snapshot', + 'id': str(self.id), + 'crawl_id': str(self.crawl_id) if self.crawl_id else None, + 'url': data.get('url', self.url), + 'timestamp': data.get('timestamp', self.timestamp), + 'title': data.get('title', self.title or ''), + 'tags': data.get('tags', ''), + 'fs_version': fs_version, + 'bookmarked_at': data.get('bookmarked_at'), + 'created_at': data.get('created_at'), + } + f.write(json.dumps(snapshot_record) + '\n') + + # Handle 0.8.x/0.9.x format (archive_results list) + for result_data in data.get('archive_results', []): + ar_record = { + 'type': 'ArchiveResult', + 'snapshot_id': str(self.id), + 'plugin': result_data.get('plugin', ''), + 'status': result_data.get('status', ''), + 'output_str': result_data.get('output', ''), + 'start_ts': result_data.get('start_ts'), + 'end_ts': result_data.get('end_ts'), + } + if result_data.get('cmd'): + ar_record['cmd'] = result_data['cmd'] + f.write(json.dumps(ar_record) + '\n') + + # Handle 0.7.x format (history dict) + if 'history' in data and isinstance(data['history'], dict): + for plugin, result_list in data['history'].items(): + if not isinstance(result_list, list): + continue + for result_data in result_list: + ar_record = { + 'type': 'ArchiveResult', + 'snapshot_id': str(self.id), + 'plugin': result_data.get('plugin') or result_data.get('extractor') or plugin, + 'status': result_data.get('status', ''), + 'output_str': result_data.get('output', ''), + 'start_ts': result_data.get('start_ts'), + 'end_ts': result_data.get('end_ts'), + } + if result_data.get('cmd'): + ar_record['cmd'] = result_data['cmd'] + f.write(json.dumps(ar_record) + '\n') + + # Remove old index.json after successful conversion + try: + json_path.unlink() + except OSError: + pass + + return True + # ========================================================================= # Snapshot Utilities # ========================================================================= @@ -1169,6 +1410,25 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea return False + def to_jsonl(self) -> dict: + """ + Convert Snapshot model instance to a JSONL record. + """ + from archivebox.config import VERSION + return { + 'type': 'Snapshot', + 'schema_version': VERSION, + 'id': str(self.id), + 'url': self.url, + 'title': self.title, + 'tags': self.tags_str() if hasattr(self, 'tags_str') else '', + 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'timestamp': self.timestamp, + 'depth': getattr(self, 'depth', 0), + 'status': self.status if hasattr(self, 'status') else None, + } + @staticmethod def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None, queue_for_extraction: bool = True): """ @@ -2001,6 +2261,40 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi """Convenience property to access the user who created this archive result via its snapshot's crawl.""" return self.snapshot.crawl.created_by + def to_jsonl(self) -> dict: + """ + Convert ArchiveResult model instance to a JSONL record. + """ + from archivebox.config import VERSION + record = { + 'type': 'ArchiveResult', + 'schema_version': VERSION, + 'id': str(self.id), + 'snapshot_id': str(self.snapshot_id), + 'plugin': self.plugin, + 'hook_name': self.hook_name, + 'status': self.status, + 'output_str': self.output_str, + 'start_ts': self.start_ts.isoformat() if self.start_ts else None, + 'end_ts': self.end_ts.isoformat() if self.end_ts else None, + } + # Include optional fields if set + if self.output_json: + record['output_json'] = self.output_json + if self.output_files: + record['output_files'] = self.output_files + if self.output_size: + record['output_size'] = self.output_size + if self.output_mimetypes: + record['output_mimetypes'] = self.output_mimetypes + if self.cmd: + record['cmd'] = self.cmd + if self.cmd_version: + record['cmd_version'] = self.cmd_version + if self.process_id: + record['process_id'] = str(self.process_id) + return record + def save(self, *args, **kwargs): is_new = self._state.adding diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 1f0c880f..3e1a53f9 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -134,6 +134,67 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith def api_url(self) -> str: return reverse_lazy('api-1:get_crawl', args=[self.id]) + def to_jsonl(self) -> dict: + """ + Convert Crawl model instance to a JSONL record. + """ + from archivebox.config import VERSION + return { + 'type': 'Crawl', + 'schema_version': VERSION, + 'id': str(self.id), + 'urls': self.urls, + 'status': self.status, + 'max_depth': self.max_depth, + 'tags_str': self.tags_str, + 'label': self.label, + 'created_at': self.created_at.isoformat() if self.created_at else None, + } + + @staticmethod + def from_jsonl(record: dict, overrides: dict = None): + """ + Create or get a Crawl from a JSONL record. + + Args: + record: Dict with 'urls' (required), optional 'max_depth', 'tags_str', 'label' + overrides: Dict of field overrides (e.g., created_by_id) + + Returns: + Crawl instance or None if invalid + """ + from django.utils import timezone + + overrides = overrides or {} + + # Check if crawl already exists by ID + crawl_id = record.get('id') + if crawl_id: + try: + return Crawl.objects.get(id=crawl_id) + except Crawl.DoesNotExist: + pass + + # Get URLs - can be string (newline-separated) or from 'url' field + urls = record.get('urls', '') + if not urls and record.get('url'): + urls = record['url'] + + if not urls: + return None + + # Create new crawl (status stays QUEUED, not started) + crawl = Crawl.objects.create( + urls=urls, + max_depth=record.get('max_depth', record.get('depth', 0)), + tags_str=record.get('tags_str', record.get('tags', '')), + label=record.get('label', ''), + status=Crawl.StatusChoices.QUEUED, + retry_at=timezone.now(), + **overrides, + ) + return crawl + @property def output_dir_parent(self) -> str: """Construct parent directory: users/{user_id}/crawls/{YYYYMMDD}""" diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index 708ae68e..4c351efc 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -242,6 +242,24 @@ class Binary(ModelWithHealthStats): 'is_valid': self.is_valid, } + def to_jsonl(self) -> dict: + """ + Convert Binary model instance to a JSONL record. + """ + from archivebox.config import VERSION + return { + 'type': 'Binary', + 'schema_version': VERSION, + 'id': str(self.id), + 'machine_id': str(self.machine_id), + 'name': self.name, + 'binprovider': self.binprovider, + 'abspath': self.abspath, + 'version': self.version, + 'sha256': self.sha256, + 'status': self.status, + } + @staticmethod def from_jsonl(record: dict, overrides: dict = None): """ @@ -606,6 +624,32 @@ class Process(ModelWithHealthStats): return self.archiveresult.hook_name return '' + def to_jsonl(self) -> dict: + """ + Convert Process model instance to a JSONL record. + """ + from archivebox.config import VERSION + record = { + 'type': 'Process', + 'schema_version': VERSION, + 'id': str(self.id), + 'machine_id': str(self.machine_id), + 'cmd': self.cmd, + 'pwd': self.pwd, + 'status': self.status, + 'exit_code': self.exit_code, + 'started_at': self.started_at.isoformat() if self.started_at else None, + 'ended_at': self.ended_at.isoformat() if self.ended_at else None, + } + # Include optional fields if set + if self.binary_id: + record['binary_id'] = str(self.binary_id) + if self.pid: + record['pid'] = self.pid + if self.timeout: + record['timeout'] = self.timeout + return record + def update_and_requeue(self, **kwargs): """ Update process fields and requeue for worker state machine. diff --git a/archivebox/misc/jsonl.py b/archivebox/misc/jsonl.py index 88081ea6..1e555a0a 100644 --- a/archivebox/misc/jsonl.py +++ b/archivebox/misc/jsonl.py @@ -4,9 +4,15 @@ JSONL (JSON Lines) utilities for ArchiveBox. Provides functions for reading, writing, and processing typed JSONL records. All CLI commands that accept stdin can read both plain URLs and typed JSONL. +CLI Pipeline: + archivebox crawl URL -> {"type": "Crawl", "id": "...", "urls": "...", ...} + archivebox snapshot -> {"type": "Snapshot", "id": "...", "url": "...", ...} + archivebox extract -> {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", ...} + Typed JSONL Format: - {"type": "Snapshot", "url": "https://example.com", "title": "...", "tags": "..."} - {"type": "ArchiveResult", "snapshot_id": "...", "extractor": "wget", ...} + {"type": "Crawl", "id": "...", "urls": "...", "max_depth": 0, ...} + {"type": "Snapshot", "id": "...", "url": "https://example.com", "title": "...", ...} + {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", "plugin": "...", ...} {"type": "Tag", "name": "..."} Plain URLs (also supported): @@ -18,7 +24,7 @@ __package__ = 'archivebox.misc' import sys import json -from typing import Iterator, Dict, Any, Optional, TextIO, Callable, Union, List +from typing import Iterator, Dict, Any, Optional, TextIO, Callable from pathlib import Path @@ -28,8 +34,10 @@ TYPE_ARCHIVERESULT = 'ArchiveResult' TYPE_TAG = 'Tag' TYPE_CRAWL = 'Crawl' TYPE_BINARY = 'Binary' +TYPE_PROCESS = 'Process' +TYPE_MACHINE = 'Machine' -VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY} +VALID_TYPES = {TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_TAG, TYPE_CRAWL, TYPE_BINARY, TYPE_PROCESS, TYPE_MACHINE} def parse_line(line: str) -> Optional[Dict[str, Any]]: @@ -152,81 +160,6 @@ def filter_by_type(records: Iterator[Dict[str, Any]], record_type: str) -> Itera yield record -def snapshot_to_jsonl(snapshot) -> Dict[str, Any]: - """ - Convert a Snapshot model instance to a JSONL record. - """ - return { - 'type': TYPE_SNAPSHOT, - 'id': str(snapshot.id), - 'url': snapshot.url, - 'title': snapshot.title, - 'tags': snapshot.tags_str() if hasattr(snapshot, 'tags_str') else '', - 'bookmarked_at': snapshot.bookmarked_at.isoformat() if snapshot.bookmarked_at else None, - 'created_at': snapshot.created_at.isoformat() if snapshot.created_at else None, - 'timestamp': snapshot.timestamp, - 'depth': getattr(snapshot, 'depth', 0), - 'status': snapshot.status if hasattr(snapshot, 'status') else None, - } - - -def archiveresult_to_jsonl(result) -> Dict[str, Any]: - """ - Convert an ArchiveResult model instance to a JSONL record. - """ - record = { - 'type': TYPE_ARCHIVERESULT, - 'id': str(result.id), - 'snapshot_id': str(result.snapshot_id), - 'plugin': result.plugin, - 'hook_name': result.hook_name, - 'status': result.status, - 'output_str': result.output_str, - 'start_ts': result.start_ts.isoformat() if result.start_ts else None, - 'end_ts': result.end_ts.isoformat() if result.end_ts else None, - } - # Include optional fields if set - if result.output_json: - record['output_json'] = result.output_json - if result.output_files: - record['output_files'] = result.output_files - if result.output_size: - record['output_size'] = result.output_size - if result.output_mimetypes: - record['output_mimetypes'] = result.output_mimetypes - if result.cmd: - record['cmd'] = result.cmd - if result.cmd_version: - record['cmd_version'] = result.cmd_version - return record - - -def tag_to_jsonl(tag) -> Dict[str, Any]: - """ - Convert a Tag model instance to a JSONL record. - """ - return { - 'type': TYPE_TAG, - 'id': str(tag.id), - 'name': tag.name, - 'slug': tag.slug, - } - - -def crawl_to_jsonl(crawl) -> Dict[str, Any]: - """ - Convert a Crawl model instance to a JSONL record. - """ - return { - 'type': TYPE_CRAWL, - 'id': str(crawl.id), - 'urls': crawl.urls, - 'status': crawl.status, - 'max_depth': crawl.max_depth, - 'created_at': crawl.created_at.isoformat() if crawl.created_at else None, - } - - def process_records( records: Iterator[Dict[str, Any]], handlers: Dict[str, Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]] @@ -250,60 +183,3 @@ def process_records( yield result -def get_or_create_tag(record: Dict[str, Any]): - """ - Get or create a Tag from a JSONL record. - - Returns the Tag instance. - """ - from archivebox.core.models import Tag - - name = record.get('name') - if not name: - raise ValueError("Record missing required 'name' field") - - tag, _ = Tag.objects.get_or_create(name=name) - return tag - - -def process_jsonl_records(records: Iterator[Dict[str, Any]], created_by_id: Optional[int] = None) -> Dict[str, List]: - """ - Process JSONL records, creating Tags and Snapshots as needed. - - Args: - records: Iterator of JSONL record dicts - created_by_id: User ID for created objects - - Returns: - Dict with 'tags' and 'snapshots' lists of created objects - """ - from archivebox.base_models.models import get_or_create_system_user_pk - - created_by_id = created_by_id or get_or_create_system_user_pk() - - results = { - 'tags': [], - 'snapshots': [], - } - - for record in records: - record_type = record.get('type', TYPE_SNAPSHOT) - - if record_type == TYPE_TAG: - try: - tag = get_or_create_tag(record) - results['tags'].append(tag) - except ValueError: - continue - - elif record_type == TYPE_SNAPSHOT or 'url' in record: - try: - from archivebox.core.models import Snapshot - overrides = {'created_by_id': created_by_id} if created_by_id else {} - snapshot = Snapshot.from_jsonl(record, overrides=overrides) - if snapshot: - results['snapshots'].append(snapshot) - except ValueError: - continue - - return results diff --git a/archivebox/misc/legacy.py b/archivebox/misc/legacy.py index 7328670f..e936151d 100644 --- a/archivebox/misc/legacy.py +++ b/archivebox/misc/legacy.py @@ -58,9 +58,10 @@ def parse_json_main_index(out_dir: Path) -> Iterator[SnapshotDict]: def parse_json_links_details(out_dir: Path) -> Iterator[SnapshotDict]: """ - Parse links from individual snapshot index.json files in archive directories. + Parse links from individual snapshot index.jsonl/index.json files in archive directories. - Walks through archive/*/index.json files to discover orphaned snapshots. + Walks through archive/*/index.jsonl and archive/*/index.json files to discover orphaned snapshots. + Prefers index.jsonl (new format) over index.json (legacy format). """ from archivebox.config import CONSTANTS @@ -72,19 +73,36 @@ def parse_json_links_details(out_dir: Path) -> Iterator[SnapshotDict]: if not entry.is_dir(): continue - index_file = Path(entry.path) / 'index.json' - if not index_file.exists(): - continue + # Try index.jsonl first (new format) + jsonl_file = Path(entry.path) / CONSTANTS.JSONL_INDEX_FILENAME + json_file = Path(entry.path) / CONSTANTS.JSON_INDEX_FILENAME - try: - with open(index_file, 'r', encoding='utf-8') as f: - link = json.load(f) + link = None + if jsonl_file.exists(): + try: + with open(jsonl_file, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line.startswith('{'): + record = json.loads(line) + if record.get('type') == 'Snapshot': + link = record + break + except (json.JSONDecodeError, KeyError, TypeError): + pass + + if link is None and json_file.exists(): + try: + with open(json_file, 'r', encoding='utf-8') as f: + link = json.load(f) + except (json.JSONDecodeError, KeyError, TypeError): + pass + + if link: yield { 'url': link.get('url', ''), 'timestamp': link.get('timestamp', entry.name), 'title': link.get('title'), 'tags': link.get('tags', ''), } - except (json.JSONDecodeError, KeyError, TypeError): - continue