From f3e11b61fdfab0d464c9e212f48e5cab1fdae24b Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 31 Dec 2025 10:07:14 +0000 Subject: [PATCH] Implement JSONL CLI pipeline architecture (Phases 1-4, 6) Phase 1: Model Prerequisites - Add ArchiveResult.from_json() and from_jsonl() methods - Fix Snapshot.to_json() to use tags_str (consistent with Crawl) Phase 2: Shared Utilities - Create archivebox/cli/cli_utils.py with shared apply_filters() - Update 7 CLI files to import from cli_utils.py instead of duplicating Phase 3: Pass-Through Behavior - Add pass-through to crawl create (non-Crawl records pass unchanged) - Add pass-through to snapshot create (Crawl records + others pass through) - Add pass-through to archiveresult create (Snapshot records + others) - Add create-or-update behavior to run command: - Records WITHOUT id: Create via Model.from_json() - Records WITH id: Lookup existing, re-queue - Outputs JSONL of all processed records for chaining Phase 4: Test Infrastructure - Create archivebox/tests/conftest.py with pytest-django fixtures - Include CLI helpers, output assertions, database assertions Phase 6: Config Update - Update supervisord_util.py: orchestrator -> run command This enables Unix-style piping: archivebox crawl create URL | archivebox run archivebox archiveresult list --status=failed | archivebox run curl API | jq transform | archivebox crawl create | archivebox run --- TODO_archivebox_jsonl_cli.md | 24 +-- archivebox/cli/archivebox_archiveresult.py | 55 ++++-- archivebox/cli/archivebox_binary.py | 16 +- archivebox/cli/archivebox_crawl.py | 53 +++-- archivebox/cli/archivebox_machine.py | 16 +- archivebox/cli/archivebox_process.py | 16 +- archivebox/cli/archivebox_run.py | 84 ++++++-- archivebox/cli/archivebox_snapshot.py | 36 ++-- archivebox/cli/archivebox_tag.py | 16 +- archivebox/cli/cli_utils.py | 46 +++++ archivebox/core/models.py | 92 ++++++++- archivebox/tests/conftest.py | 218 +++++++++++++++++++++ archivebox/workers/supervisord_util.py | 2 +- 13 files changed, 529 insertions(+), 145 deletions(-) create mode 100644 archivebox/cli/cli_utils.py create mode 100644 archivebox/tests/conftest.py diff --git a/TODO_archivebox_jsonl_cli.md b/TODO_archivebox_jsonl_cli.md index fb7bf9fd..065d132e 100644 --- a/TODO_archivebox_jsonl_cli.md +++ b/TODO_archivebox_jsonl_cli.md @@ -687,23 +687,23 @@ def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]: ## Task Checklist ### Phase 1: Model Prerequisites -- [ ] Implement `ArchiveResult.from_json()` in `archivebox/core/models.py` -- [ ] Implement `ArchiveResult.from_jsonl()` in `archivebox/core/models.py` -- [ ] Fix `Snapshot.to_json()` to use `tags_str` instead of `tags` +- [x] Implement `ArchiveResult.from_json()` in `archivebox/core/models.py` +- [x] Implement `ArchiveResult.from_jsonl()` in `archivebox/core/models.py` +- [x] Fix `Snapshot.to_json()` to use `tags_str` instead of `tags` ### Phase 2: Shared Utilities -- [ ] Create `archivebox/cli/cli_utils.py` with shared `apply_filters()` -- [ ] Update 7 CLI files to import from `cli_utils.py` +- [x] Create `archivebox/cli/cli_utils.py` with shared `apply_filters()` +- [x] Update 7 CLI files to import from `cli_utils.py` ### Phase 3: Pass-Through Behavior -- [ ] Add pass-through to `archivebox_crawl.py` create -- [ ] Add pass-through to `archivebox_snapshot.py` create -- [ ] Add pass-through to `archivebox_archiveresult.py` create -- [ ] Add create-or-update to `archivebox_run.py` -- [ ] Add pass-through output to `archivebox_run.py` +- [x] Add pass-through to `archivebox_crawl.py` create +- [x] Add pass-through to `archivebox_snapshot.py` create +- [x] Add pass-through to `archivebox_archiveresult.py` create +- [x] Add create-or-update to `archivebox_run.py` +- [x] Add pass-through output to `archivebox_run.py` ### Phase 4: Test Infrastructure -- [ ] Create `archivebox/tests/conftest.py` with pytest-django fixtures +- [x] Create `archivebox/tests/conftest.py` with pytest-django fixtures ### Phase 5: Unit Tests - [ ] Create `archivebox/tests/test_cli_crawl.py` @@ -713,4 +713,4 @@ def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]: ### Phase 6: Integration & Config - [ ] Extend `archivebox/cli/tests_piping.py` with pass-through tests -- [ ] Update `archivebox/workers/supervisord_util.py`: orchestrator→run +- [x] Update `archivebox/workers/supervisord_util.py`: orchestrator→run diff --git a/archivebox/cli/archivebox_archiveresult.py b/archivebox/cli/archivebox_archiveresult.py index 1f725a03..aea83413 100644 --- a/archivebox/cli/archivebox_archiveresult.py +++ b/archivebox/cli/archivebox_archiveresult.py @@ -39,21 +39,7 @@ from typing import Optional import rich_click as click from rich import print as rprint - -def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None): - """Apply Django-style filters from CLI kwargs to a QuerySet.""" - filters = {} - for key, value in filter_kwargs.items(): - if value is not None and key not in ('limit', 'offset'): - filters[key] = value - - if filters: - queryset = queryset.filter(**filters) - - if limit: - queryset = queryset[:limit] - - return queryset +from archivebox.cli.cli_utils import apply_filters # ============================================================================= @@ -69,6 +55,7 @@ def create_archiveresults( Create ArchiveResults for Snapshots. Reads Snapshot records from stdin and creates ArchiveResult entries. + Pass-through: Non-Snapshot/ArchiveResult records are output unchanged. If --plugin is specified, only creates results for that plugin. Otherwise, creates results for all pending plugins. @@ -78,7 +65,7 @@ def create_archiveresults( """ from django.utils import timezone - from archivebox.misc.jsonl import read_stdin, write_record, TYPE_SNAPSHOT + from archivebox.misc.jsonl import read_stdin, write_record, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT from archivebox.core.models import Snapshot, ArchiveResult is_tty = sys.stdout.isatty() @@ -87,6 +74,7 @@ def create_archiveresults( if snapshot_id: try: snapshots = [Snapshot.objects.get(id=snapshot_id)] + pass_through_records = [] except Snapshot.DoesNotExist: rprint(f'[red]Snapshot not found: {snapshot_id}[/red]', file=sys.stderr) return 1 @@ -97,17 +85,44 @@ def create_archiveresults( rprint('[yellow]No Snapshot records provided via stdin[/yellow]', file=sys.stderr) return 1 - # Filter to only Snapshot records + # Separate snapshot records from pass-through records snapshot_ids = [] + pass_through_records = [] + for record in records: - if record.get('type') == TYPE_SNAPSHOT: + record_type = record.get('type', '') + + if record_type == TYPE_SNAPSHOT: + # Pass through the Snapshot record itself + pass_through_records.append(record) if record.get('id'): snapshot_ids.append(record['id']) + + elif record_type == TYPE_ARCHIVERESULT: + # ArchiveResult records: pass through if they have an id + if record.get('id'): + pass_through_records.append(record) + # If no id, we could create it, but for now just pass through + else: + pass_through_records.append(record) + + elif record_type: + # Other typed records (Crawl, Tag, etc): pass through + pass_through_records.append(record) + elif record.get('id'): - # Assume it's a snapshot ID if no type specified + # Untyped record with id - assume it's a snapshot ID snapshot_ids.append(record['id']) + # Output pass-through records first + if not is_tty: + for record in pass_through_records: + write_record(record) + if not snapshot_ids: + if pass_through_records: + rprint(f'[dim]Passed through {len(pass_through_records)} records, no new snapshots to process[/dim]', file=sys.stderr) + return 0 rprint('[yellow]No valid Snapshot IDs in input[/yellow]', file=sys.stderr) return 1 @@ -115,7 +130,7 @@ def create_archiveresults( if not snapshots: rprint('[yellow]No matching snapshots found[/yellow]', file=sys.stderr) - return 1 + return 0 if pass_through_records else 1 created_count = 0 for snapshot in snapshots: diff --git a/archivebox/cli/archivebox_binary.py b/archivebox/cli/archivebox_binary.py index 98ab33be..86ce7b4b 100644 --- a/archivebox/cli/archivebox_binary.py +++ b/archivebox/cli/archivebox_binary.py @@ -34,21 +34,7 @@ from typing import Optional import rich_click as click from rich import print as rprint - -def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None): - """Apply Django-style filters from CLI kwargs to a QuerySet.""" - filters = {} - for key, value in filter_kwargs.items(): - if value is not None and key not in ('limit', 'offset'): - filters[key] = value - - if filters: - queryset = queryset.filter(**filters) - - if limit: - queryset = queryset[:limit] - - return queryset +from archivebox.cli.cli_utils import apply_filters # ============================================================================= diff --git a/archivebox/cli/archivebox_crawl.py b/archivebox/cli/archivebox_crawl.py index d0621fcc..59f176cd 100644 --- a/archivebox/cli/archivebox_crawl.py +++ b/archivebox/cli/archivebox_crawl.py @@ -39,21 +39,7 @@ from typing import Optional, Iterable import rich_click as click from rich import print as rprint - -def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None): - """Apply Django-style filters from CLI kwargs to a QuerySet.""" - filters = {} - for key, value in filter_kwargs.items(): - if value is not None and key not in ('limit', 'offset'): - filters[key] = value - - if filters: - queryset = queryset.filter(**filters) - - if limit: - queryset = queryset[:limit] - - return queryset +from archivebox.cli.cli_utils import apply_filters # ============================================================================= @@ -71,12 +57,13 @@ def create_crawl( Create a Crawl job from URLs. Takes URLs as args or stdin, creates one Crawl with all URLs, outputs JSONL. + Pass-through: Records that are not URLs are output unchanged (for piping). Exit codes: 0: Success 1: Failure """ - from archivebox.misc.jsonl import read_args_or_stdin, write_record + from archivebox.misc.jsonl import read_args_or_stdin, write_record, TYPE_CRAWL from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.crawls.models import Crawl @@ -90,14 +77,46 @@ def create_crawl( rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr) return 1 - # Collect all URLs into a single newline-separated string + # Separate pass-through records from URL records url_list = [] + pass_through_records = [] + for record in records: + record_type = record.get('type', '') + + # Pass-through: output records that aren't URL/Crawl types + if record_type and record_type != TYPE_CRAWL and not record.get('url') and not record.get('urls'): + pass_through_records.append(record) + continue + + # Handle existing Crawl records (just pass through with id) + if record_type == TYPE_CRAWL and record.get('id'): + pass_through_records.append(record) + continue + + # Collect URLs url = record.get('url') if url: url_list.append(url) + # Handle 'urls' field (newline-separated) + urls_field = record.get('urls') + if urls_field: + for line in urls_field.split('\n'): + line = line.strip() + if line and not line.startswith('#'): + url_list.append(line) + + # Output pass-through records first + if not is_tty: + for record in pass_through_records: + write_record(record) + if not url_list: + if pass_through_records: + # If we had pass-through records but no URLs, that's OK + rprint(f'[dim]Passed through {len(pass_through_records)} records, no new URLs[/dim]', file=sys.stderr) + return 0 rprint('[red]No valid URLs found[/red]', file=sys.stderr) return 1 diff --git a/archivebox/cli/archivebox_machine.py b/archivebox/cli/archivebox_machine.py index e63eac41..86d3e219 100644 --- a/archivebox/cli/archivebox_machine.py +++ b/archivebox/cli/archivebox_machine.py @@ -28,21 +28,7 @@ from typing import Optional import rich_click as click from rich import print as rprint - -def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None): - """Apply Django-style filters from CLI kwargs to a QuerySet.""" - filters = {} - for key, value in filter_kwargs.items(): - if value is not None and key not in ('limit', 'offset'): - filters[key] = value - - if filters: - queryset = queryset.filter(**filters) - - if limit: - queryset = queryset[:limit] - - return queryset +from archivebox.cli.cli_utils import apply_filters # ============================================================================= diff --git a/archivebox/cli/archivebox_process.py b/archivebox/cli/archivebox_process.py index 9784650b..82694064 100644 --- a/archivebox/cli/archivebox_process.py +++ b/archivebox/cli/archivebox_process.py @@ -31,21 +31,7 @@ from typing import Optional import rich_click as click from rich import print as rprint - -def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None): - """Apply Django-style filters from CLI kwargs to a QuerySet.""" - filters = {} - for key, value in filter_kwargs.items(): - if value is not None and key not in ('limit', 'offset'): - filters[key] = value - - if filters: - queryset = queryset.filter(**filters) - - if limit: - queryset = queryset[:limit] - - return queryset +from archivebox.cli.cli_utils import apply_filters # ============================================================================= diff --git a/archivebox/cli/archivebox_run.py b/archivebox/cli/archivebox_run.py index 6efd9018..9901c684 100644 --- a/archivebox/cli/archivebox_run.py +++ b/archivebox/cli/archivebox_run.py @@ -38,58 +38,110 @@ def process_stdin_records() -> int: """ Process JSONL records from stdin. - Reads records, queues them for processing, then runs orchestrator until complete. - Handles any record type: Crawl, Snapshot, ArchiveResult, etc. + Create-or-update behavior: + - Records WITHOUT id: Create via Model.from_json(), then queue + - Records WITH id: Lookup existing, re-queue for processing + + Outputs JSONL of all processed records (for chaining). + + Handles any record type: Crawl, Snapshot, ArchiveResult. + Auto-cascades: Crawl → Snapshots → ArchiveResults. Returns exit code (0 = success, 1 = error). """ from django.utils import timezone - from archivebox.misc.jsonl import read_stdin, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT + from archivebox.misc.jsonl import read_stdin, write_record, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT + from archivebox.base_models.models import get_or_create_system_user_pk from archivebox.core.models import Snapshot, ArchiveResult from archivebox.crawls.models import Crawl from archivebox.workers.orchestrator import Orchestrator records = list(read_stdin()) + is_tty = sys.stdout.isatty() if not records: return 0 # Nothing to process + created_by_id = get_or_create_system_user_pk() queued_count = 0 + output_records = [] for record in records: - record_type = record.get('type') + record_type = record.get('type', '') record_id = record.get('id') - if not record_id: - continue - try: if record_type == TYPE_CRAWL: - crawl = Crawl.objects.get(id=record_id) - if crawl.status in [Crawl.StatusChoices.QUEUED, Crawl.StatusChoices.STARTED]: + if record_id: + # Existing crawl - re-queue + try: + crawl = Crawl.objects.get(id=record_id) + except Crawl.DoesNotExist: + crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id}) + else: + # New crawl - create it + crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id}) + + if crawl: crawl.retry_at = timezone.now() + if crawl.status not in [Crawl.StatusChoices.SEALED]: + crawl.status = Crawl.StatusChoices.QUEUED crawl.save() + output_records.append(crawl.to_json()) queued_count += 1 - elif record_type == TYPE_SNAPSHOT: - snapshot = Snapshot.objects.get(id=record_id) - if snapshot.status in [Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED]: + elif record_type == TYPE_SNAPSHOT or (record.get('url') and not record_type): + if record_id: + # Existing snapshot - re-queue + try: + snapshot = Snapshot.objects.get(id=record_id) + except Snapshot.DoesNotExist: + snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id}) + else: + # New snapshot - create it + snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id}) + + if snapshot: snapshot.retry_at = timezone.now() + if snapshot.status not in [Snapshot.StatusChoices.SEALED]: + snapshot.status = Snapshot.StatusChoices.QUEUED snapshot.save() + output_records.append(snapshot.to_json()) queued_count += 1 elif record_type == TYPE_ARCHIVERESULT: - archiveresult = ArchiveResult.objects.get(id=record_id) - if archiveresult.status in [ArchiveResult.StatusChoices.QUEUED, ArchiveResult.StatusChoices.STARTED, ArchiveResult.StatusChoices.BACKOFF]: + if record_id: + # Existing archiveresult - re-queue + try: + archiveresult = ArchiveResult.objects.get(id=record_id) + except ArchiveResult.DoesNotExist: + archiveresult = ArchiveResult.from_json(record) + else: + # New archiveresult - create it + archiveresult = ArchiveResult.from_json(record) + + if archiveresult: archiveresult.retry_at = timezone.now() + if archiveresult.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED, ArchiveResult.StatusChoices.BACKOFF]: + archiveresult.status = ArchiveResult.StatusChoices.QUEUED archiveresult.save() + output_records.append(archiveresult.to_json()) queued_count += 1 - except (Crawl.DoesNotExist, Snapshot.DoesNotExist, ArchiveResult.DoesNotExist): - rprint(f'[yellow]Record not found: {record_type} {record_id}[/yellow]', file=sys.stderr) + else: + # Unknown type - pass through + output_records.append(record) + + except Exception as e: + rprint(f'[yellow]Error processing record: {e}[/yellow]', file=sys.stderr) continue + # Output all processed records (for chaining) + if not is_tty: + for rec in output_records: + write_record(rec) + if queued_count == 0: rprint('[yellow]No records to process[/yellow]', file=sys.stderr) return 0 diff --git a/archivebox/cli/archivebox_snapshot.py b/archivebox/cli/archivebox_snapshot.py index 87e7482b..46ad2949 100644 --- a/archivebox/cli/archivebox_snapshot.py +++ b/archivebox/cli/archivebox_snapshot.py @@ -36,21 +36,7 @@ from typing import Optional, Iterable import rich_click as click from rich import print as rprint - -def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None): - """Apply Django-style filters from CLI kwargs to a QuerySet.""" - filters = {} - for key, value in filter_kwargs.items(): - if value is not None and key not in ('limit', 'offset'): - filters[key] = value - - if filters: - queryset = queryset.filter(**filters) - - if limit: - queryset = queryset[:limit] - - return queryset +from archivebox.cli.cli_utils import apply_filters # ============================================================================= @@ -66,13 +52,12 @@ def create_snapshots( ) -> int: """ Create Snapshots from URLs or stdin JSONL (Crawl or Snapshot records). + Pass-through: Records that are not Crawl/Snapshot/URL are output unchanged. Exit codes: 0: Success 1: Failure """ - from django.utils import timezone - from archivebox.misc.jsonl import ( read_args_or_stdin, write_record, TYPE_SNAPSHOT, TYPE_CRAWL @@ -93,11 +78,17 @@ def create_snapshots( # Process each record - handle Crawls and plain URLs/Snapshots created_snapshots = [] + pass_through_count = 0 + for record in records: - record_type = record.get('type') + record_type = record.get('type', '') try: if record_type == TYPE_CRAWL: + # Pass through the Crawl record itself first + if not is_tty: + write_record(record) + # Input is a Crawl - get or create it, then create Snapshots for its URLs crawl = None crawl_id = record.get('id') @@ -144,11 +135,20 @@ def create_snapshots( if not is_tty: write_record(snapshot.to_json()) + else: + # Pass-through: output records we don't handle + if not is_tty: + write_record(record) + pass_through_count += 1 + except Exception as e: rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr) continue if not created_snapshots: + if pass_through_count > 0: + rprint(f'[dim]Passed through {pass_through_count} records, no new snapshots[/dim]', file=sys.stderr) + return 0 rprint('[red]No snapshots created[/red]', file=sys.stderr) return 1 diff --git a/archivebox/cli/archivebox_tag.py b/archivebox/cli/archivebox_tag.py index c9461396..bf72ef97 100644 --- a/archivebox/cli/archivebox_tag.py +++ b/archivebox/cli/archivebox_tag.py @@ -36,21 +36,7 @@ from typing import Optional, Iterable import rich_click as click from rich import print as rprint - -def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None): - """Apply Django-style filters from CLI kwargs to a QuerySet.""" - filters = {} - for key, value in filter_kwargs.items(): - if value is not None and key not in ('limit', 'offset'): - filters[key] = value - - if filters: - queryset = queryset.filter(**filters) - - if limit: - queryset = queryset[:limit] - - return queryset +from archivebox.cli.cli_utils import apply_filters # ============================================================================= diff --git a/archivebox/cli/cli_utils.py b/archivebox/cli/cli_utils.py new file mode 100644 index 00000000..8bb7f66d --- /dev/null +++ b/archivebox/cli/cli_utils.py @@ -0,0 +1,46 @@ +""" +Shared CLI utilities for ArchiveBox commands. + +This module contains common utilities used across multiple CLI commands, +extracted to avoid code duplication. +""" + +__package__ = 'archivebox.cli' + +from typing import Optional + + +def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None): + """ + Apply Django-style filters from CLI kwargs to a QuerySet. + + Supports: --status=queued, --url__icontains=example, --id__in=uuid1,uuid2 + + Args: + queryset: Django QuerySet to filter + filter_kwargs: Dict of filter key-value pairs from CLI + limit: Optional limit on results + + Returns: + Filtered QuerySet + + Example: + queryset = Snapshot.objects.all() + filter_kwargs = {'status': 'queued', 'url__icontains': 'example.com'} + filtered = apply_filters(queryset, filter_kwargs, limit=10) + """ + filters = {} + for key, value in filter_kwargs.items(): + if value is None or key in ('limit', 'offset'): + continue + # Handle CSV lists for __in filters + if key.endswith('__in') and isinstance(value, str): + value = [v.strip() for v in value.split(',')] + filters[key] = value + + if filters: + queryset = queryset.filter(**filters) + if limit: + queryset = queryset[:limit] + + return queryset diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 1dca0810..f566f8f0 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -1457,7 +1457,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea 'crawl_id': str(self.crawl_id), 'url': self.url, 'title': self.title, - 'tags': self.tags_str(), + 'tags_str': self.tags_str(), 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None, 'created_at': self.created_at.isoformat() if self.created_at else None, 'timestamp': self.timestamp, @@ -2415,6 +2415,96 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi if process and self.process: yield from self.process.to_jsonl(seen=seen, **kwargs) + @classmethod + def from_jsonl(cls, records, overrides: Dict[str, Any] = None) -> list['ArchiveResult']: + """ + Create/update ArchiveResults from an iterable of JSONL records. + Filters to only records with type='ArchiveResult'. + + Args: + records: Iterable of dicts (JSONL records) + overrides: Dict of field overrides + + Returns: + List of ArchiveResult instances (skips None results) + """ + results = [] + for record in records: + record_type = record.get('type', cls.JSONL_TYPE) + if record_type == cls.JSONL_TYPE: + instance = cls.from_json(record, overrides=overrides) + if instance: + results.append(instance) + return results + + @staticmethod + def from_json(record: Dict[str, Any], overrides: Dict[str, Any] = None) -> 'ArchiveResult | None': + """ + Create or update a single ArchiveResult from a JSON record dict. + + Args: + record: Dict with 'snapshot_id' and 'plugin' (required for create), + or 'id' (for update) + overrides: Dict of field overrides (e.g., config overrides) + + Returns: + ArchiveResult instance or None if invalid + """ + from django.utils import timezone + + overrides = overrides or {} + + # If 'id' is provided, lookup and update existing + result_id = record.get('id') + if result_id: + try: + result = ArchiveResult.objects.get(id=result_id) + # Update fields from record + if record.get('status'): + result.status = record['status'] + result.retry_at = timezone.now() + result.save() + return result + except ArchiveResult.DoesNotExist: + pass # Fall through to create + + # Required fields for creation + snapshot_id = record.get('snapshot_id') + plugin = record.get('plugin') + + if not snapshot_id or not plugin: + return None + + try: + snapshot = Snapshot.objects.get(id=snapshot_id) + except Snapshot.DoesNotExist: + return None + + # Check if result already exists for this snapshot+plugin + existing = ArchiveResult.objects.filter( + snapshot=snapshot, + plugin=plugin, + ).first() + + if existing: + # Update existing result if status provided + if record.get('status'): + existing.status = record['status'] + existing.retry_at = timezone.now() + existing.save() + return existing + + # Create new ArchiveResult + result = ArchiveResult( + snapshot=snapshot, + plugin=plugin, + status=record.get('status', ArchiveResult.StatusChoices.QUEUED), + retry_at=timezone.now(), + hook_name=record.get('hook_name', ''), + ) + result.save() + return result + def save(self, *args, **kwargs): is_new = self._state.adding diff --git a/archivebox/tests/conftest.py b/archivebox/tests/conftest.py new file mode 100644 index 00000000..f1c5175f --- /dev/null +++ b/archivebox/tests/conftest.py @@ -0,0 +1,218 @@ +"""archivebox/tests/conftest.py - Pytest fixtures for CLI tests.""" + +import os +import sys +import json +import subprocess +from pathlib import Path +from typing import List, Dict, Any, Optional, Tuple + +import pytest + + +# ============================================================================= +# Fixtures +# ============================================================================= + +@pytest.fixture +def isolated_data_dir(tmp_path, settings): + """ + Create isolated DATA_DIR for each test. + + Uses tmp_path for isolation, configures Django settings. + """ + data_dir = tmp_path / 'archivebox_data' + data_dir.mkdir() + + # Set environment for subprocess calls + os.environ['DATA_DIR'] = str(data_dir) + + # Update Django settings + settings.DATA_DIR = data_dir + + yield data_dir + + # Cleanup handled by tmp_path fixture + + +@pytest.fixture +def initialized_archive(isolated_data_dir): + """ + Initialize ArchiveBox archive in isolated directory. + + Runs `archivebox init` to set up database and directories. + """ + from archivebox.cli.archivebox_init import init + init(setup=True, quick=True) + return isolated_data_dir + + +@pytest.fixture +def cli_env(initialized_archive): + """ + Environment dict for CLI subprocess calls. + + Includes DATA_DIR and disables slow extractors. + """ + return { + **os.environ, + 'DATA_DIR': str(initialized_archive), + 'USE_COLOR': 'False', + 'SHOW_PROGRESS': 'False', + 'SAVE_TITLE': 'True', + 'SAVE_FAVICON': 'False', + 'SAVE_WGET': 'False', + 'SAVE_WARC': 'False', + 'SAVE_PDF': 'False', + 'SAVE_SCREENSHOT': 'False', + 'SAVE_DOM': 'False', + 'SAVE_SINGLEFILE': 'False', + 'SAVE_READABILITY': 'False', + 'SAVE_MERCURY': 'False', + 'SAVE_GIT': 'False', + 'SAVE_YTDLP': 'False', + 'SAVE_HEADERS': 'False', + } + + +# ============================================================================= +# CLI Helpers +# ============================================================================= + +def run_archivebox_cmd( + args: List[str], + stdin: Optional[str] = None, + cwd: Optional[Path] = None, + env: Optional[Dict[str, str]] = None, + timeout: int = 60, +) -> Tuple[str, str, int]: + """ + Run archivebox command, return (stdout, stderr, returncode). + + Args: + args: Command arguments (e.g., ['crawl', 'create', 'https://example.com']) + stdin: Optional string to pipe to stdin + cwd: Working directory (defaults to DATA_DIR from env) + env: Environment variables (defaults to os.environ with DATA_DIR) + timeout: Command timeout in seconds + + Returns: + Tuple of (stdout, stderr, returncode) + """ + cmd = [sys.executable, '-m', 'archivebox'] + args + + env = env or {**os.environ} + cwd = cwd or Path(env.get('DATA_DIR', '.')) + + result = subprocess.run( + cmd, + input=stdin, + capture_output=True, + text=True, + cwd=cwd, + env=env, + timeout=timeout, + ) + + return result.stdout, result.stderr, result.returncode + + +# ============================================================================= +# Output Assertions +# ============================================================================= + +def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]: + """Parse JSONL output into list of dicts.""" + records = [] + for line in stdout.strip().split('\n'): + line = line.strip() + if line and line.startswith('{'): + try: + records.append(json.loads(line)) + except json.JSONDecodeError: + pass + return records + + +def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1): + """Assert output contains at least min_count records of type.""" + records = parse_jsonl_output(stdout) + matching = [r for r in records if r.get('type') == record_type] + assert len(matching) >= min_count, \ + f"Expected >= {min_count} {record_type}, got {len(matching)}" + return matching + + +def assert_jsonl_pass_through(stdout: str, input_records: List[Dict[str, Any]]): + """Assert that input records appear in output (pass-through behavior).""" + output_records = parse_jsonl_output(stdout) + output_ids = {r.get('id') for r in output_records if r.get('id')} + + for input_rec in input_records: + input_id = input_rec.get('id') + if input_id: + assert input_id in output_ids, \ + f"Input record {input_id} not found in output (pass-through failed)" + + +def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str]): + """Assert record has all required fields with non-None values.""" + for field in required_fields: + assert field in record, f"Record missing field: {field}" + assert record[field] is not None, f"Record field is None: {field}" + + +# ============================================================================= +# Database Assertions +# ============================================================================= + +def assert_db_count(model_class, filters: Dict[str, Any], expected: int): + """Assert database count matches expected.""" + actual = model_class.objects.filter(**filters).count() + assert actual == expected, \ + f"Expected {expected} {model_class.__name__}, got {actual}" + + +def assert_db_exists(model_class, **filters): + """Assert at least one record exists matching filters.""" + assert model_class.objects.filter(**filters).exists(), \ + f"No {model_class.__name__} found matching {filters}" + + +# ============================================================================= +# Test Data Factories +# ============================================================================= + +def create_test_url(domain: str = 'example.com', path: str = None) -> str: + """Generate unique test URL.""" + import uuid + path = path or uuid.uuid4().hex[:8] + return f'https://{domain}/{path}' + + +def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]: + """Create Crawl JSONL record for testing.""" + from archivebox.misc.jsonl import TYPE_CRAWL + + urls = urls or [create_test_url()] + return { + 'type': TYPE_CRAWL, + 'urls': '\n'.join(urls), + 'max_depth': kwargs.get('max_depth', 0), + 'tags_str': kwargs.get('tags_str', ''), + 'status': kwargs.get('status', 'queued'), + **{k: v for k, v in kwargs.items() if k not in ('max_depth', 'tags_str', 'status')}, + } + + +def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]: + """Create Snapshot JSONL record for testing.""" + from archivebox.misc.jsonl import TYPE_SNAPSHOT + + return { + 'type': TYPE_SNAPSHOT, + 'url': url or create_test_url(), + 'tags_str': kwargs.get('tags_str', ''), + 'status': kwargs.get('status', 'queued'), + **{k: v for k, v in kwargs.items() if k not in ('tags_str', 'status')}, + } diff --git a/archivebox/workers/supervisord_util.py b/archivebox/workers/supervisord_util.py index 8ec749ee..14af0afd 100644 --- a/archivebox/workers/supervisord_util.py +++ b/archivebox/workers/supervisord_util.py @@ -32,7 +32,7 @@ _supervisord_proc = None ORCHESTRATOR_WORKER = { "name": "worker_orchestrator", - "command": "archivebox manage orchestrator", # runs forever by default + "command": "archivebox run", # runs forever by default "autostart": "true", "autorestart": "true", "stdout_logfile": "logs/worker_orchestrator.log",