__package__ = 'archivebox.crawls' from typing import TYPE_CHECKING, Iterable from datetime import timedelta from archivebox.uuid_compat import uuid7 from pathlib import Path from django.db import models from django.db.models import QuerySet from django.core.validators import MaxValueValidator, MinValueValidator from django.conf import settings from django.urls import reverse_lazy from django.utils import timezone from django_stubs_ext.db.models import TypedModelMeta from statemachine import State, registry from rich import print from archivebox.config import CONSTANTS from archivebox.base_models.models import ModelWithUUID, ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, get_or_create_system_user_pk from archivebox.workers.models import ModelWithStateMachine, BaseStateMachine if TYPE_CHECKING: from archivebox.core.models import Snapshot, ArchiveResult class CrawlSchedule(ModelWithUUID, ModelWithNotes): id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True) created_at = models.DateTimeField(default=timezone.now, db_index=True) created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False) modified_at = models.DateTimeField(auto_now=True) template: 'Crawl' = models.ForeignKey('Crawl', on_delete=models.CASCADE, null=False, blank=False) # type: ignore schedule = models.CharField(max_length=64, blank=False, null=False) is_enabled = models.BooleanField(default=True) label = models.CharField(max_length=64, blank=True, null=False, default='') notes = models.TextField(blank=True, null=False, default='') crawl_set: models.Manager['Crawl'] class Meta(TypedModelMeta): app_label = 'crawls' verbose_name = 'Scheduled Crawl' verbose_name_plural = 'Scheduled Crawls' def __str__(self) -> str: urls_preview = self.template.urls[:64] if self.template and self.template.urls else "" return f'[{self.id}] {urls_preview} @ {self.schedule}' @property def api_url(self) -> str: return reverse_lazy('api-1:get_any', args=[self.id]) def save(self, *args, **kwargs): self.label = self.label or (self.template.label if self.template else '') super().save(*args, **kwargs) if self.template: self.template.schedule = self self.template.save() class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWithStateMachine): id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True) created_at = models.DateTimeField(default=timezone.now, db_index=True) created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False) modified_at = models.DateTimeField(auto_now=True) urls = models.TextField(blank=False, null=False, help_text='Newline-separated list of URLs to crawl') config = models.JSONField(default=dict, null=True, blank=True) max_depth = models.PositiveSmallIntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(4)]) tags_str = models.CharField(max_length=1024, blank=True, null=False, default='') persona_id = models.UUIDField(null=True, blank=True) label = models.CharField(max_length=64, blank=True, null=False, default='') notes = models.TextField(blank=True, null=False, default='') schedule = models.ForeignKey(CrawlSchedule, on_delete=models.SET_NULL, null=True, blank=True, editable=True) output_dir = models.CharField(max_length=512, null=False, blank=True, default='') status = ModelWithStateMachine.StatusField(choices=ModelWithStateMachine.StatusChoices, default=ModelWithStateMachine.StatusChoices.QUEUED) retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now) state_machine_name = 'archivebox.crawls.models.CrawlMachine' retry_at_field_name = 'retry_at' state_field_name = 'status' StatusChoices = ModelWithStateMachine.StatusChoices active_state = StatusChoices.STARTED snapshot_set: models.Manager['Snapshot'] class Meta(TypedModelMeta): app_label = 'crawls' verbose_name = 'Crawl' verbose_name_plural = 'Crawls' def __str__(self): first_url = self.get_urls_list()[0] if self.get_urls_list() else '' # Show last 8 digits of UUID and more of the URL short_id = str(self.id)[-8:] return f'[...{short_id}] {first_url[:120]}' def save(self, *args, **kwargs): is_new = self._state.adding super().save(*args, **kwargs) if is_new: from archivebox.misc.logging_util import log_worker_event first_url = self.get_urls_list()[0] if self.get_urls_list() else '' log_worker_event( worker_type='DB', event='Created Crawl', indent_level=1, metadata={ 'id': str(self.id), 'first_url': first_url[:64], 'max_depth': self.max_depth, 'status': self.status, }, ) @property def api_url(self) -> str: return reverse_lazy('api-1:get_crawl', args=[self.id]) def to_json(self) -> dict: """ Convert Crawl model instance to a JSON-serializable dict. """ from archivebox.config import VERSION return { 'type': 'Crawl', 'schema_version': VERSION, 'id': str(self.id), 'urls': self.urls, 'status': self.status, 'max_depth': self.max_depth, 'tags_str': self.tags_str, 'label': self.label, 'created_at': self.created_at.isoformat() if self.created_at else None, } @staticmethod def from_json(record: dict, overrides: dict = None): """ Create or get a Crawl from a JSON dict. Args: record: Dict with 'urls' (required), optional 'max_depth', 'tags_str', 'label' overrides: Dict of field overrides (e.g., created_by_id) Returns: Crawl instance or None if invalid """ from django.utils import timezone overrides = overrides or {} # Check if crawl already exists by ID crawl_id = record.get('id') if crawl_id: try: return Crawl.objects.get(id=crawl_id) except Crawl.DoesNotExist: pass # Get URLs - can be string (newline-separated) or from 'url' field urls = record.get('urls', '') if not urls and record.get('url'): urls = record['url'] if not urls: return None # Create new crawl (status stays QUEUED, not started) crawl = Crawl.objects.create( urls=urls, max_depth=record.get('max_depth', record.get('depth', 0)), tags_str=record.get('tags_str', record.get('tags', '')), label=record.get('label', ''), status=Crawl.StatusChoices.QUEUED, retry_at=timezone.now(), **overrides, ) return crawl @property def OUTPUT_DIR(self) -> Path: """ Construct output directory: users/{username}/crawls/{YYYYMMDD}/{domain}/{crawl-id} Domain is extracted from the first URL in the crawl. """ from archivebox import DATA_DIR from archivebox.core.models import Snapshot date_str = self.created_at.strftime('%Y%m%d') urls = self.get_urls_list() domain = Snapshot.extract_domain_from_url(urls[0]) if urls else 'unknown' return DATA_DIR / 'users' / self.created_by.username / 'crawls' / date_str / domain / str(self.id) def get_urls_list(self) -> list[str]: """Get list of URLs from urls field, filtering out comments and empty lines.""" if not self.urls: return [] return [ url.strip() for url in self.urls.split('\n') if url.strip() and not url.strip().startswith('#') ] def add_url(self, entry: dict) -> bool: """ Add a URL to the crawl queue if not already present. Args: entry: dict with 'url', optional 'depth', 'title', 'timestamp', 'tags', 'via_snapshot', 'plugin' Returns: True if URL was added, False if skipped (duplicate or depth exceeded) """ import json url = entry.get('url', '') if not url: return False depth = entry.get('depth', 1) # Skip if depth exceeds max_depth if depth > self.max_depth: return False # Skip if already a Snapshot for this crawl if self.snapshot_set.filter(url=url).exists(): return False # Check if already in urls (parse existing JSONL entries) existing_urls = set() for line in self.urls.splitlines(): if not line.strip(): continue try: existing_entry = json.loads(line) existing_urls.add(existing_entry.get('url', '')) except json.JSONDecodeError: existing_urls.add(line.strip()) if url in existing_urls: return False # Append as JSONL jsonl_entry = json.dumps(entry) self.urls = (self.urls.rstrip() + '\n' + jsonl_entry).lstrip('\n') self.save(update_fields=['urls', 'modified_at']) return True def create_snapshots_from_urls(self) -> list['Snapshot']: """ Create Snapshot objects for each URL in self.urls that doesn't already exist. Returns: List of newly created Snapshot objects """ import sys import json from archivebox.core.models import Snapshot created_snapshots = [] print(f'[cyan]DEBUG create_snapshots_from_urls: self.urls={repr(self.urls)}[/cyan]', file=sys.stderr) print(f'[cyan]DEBUG create_snapshots_from_urls: lines={self.urls.splitlines()}[/cyan]', file=sys.stderr) for line in self.urls.splitlines(): if not line.strip(): continue # Parse JSONL or plain URL try: entry = json.loads(line) url = entry.get('url', '') depth = entry.get('depth', 0) title = entry.get('title') timestamp = entry.get('timestamp') tags = entry.get('tags', '') except json.JSONDecodeError: url = line.strip() depth = 0 title = None timestamp = None tags = '' if not url: continue # Skip if depth exceeds max_depth if depth > self.max_depth: continue # Create snapshot if doesn't exist snapshot, created = Snapshot.objects.get_or_create( url=url, crawl=self, defaults={ 'depth': depth, 'title': title, 'timestamp': timestamp or str(timezone.now().timestamp()), 'status': Snapshot.INITIAL_STATE, 'retry_at': timezone.now(), # Note: created_by removed in 0.9.0 - Snapshot inherits from Crawl } ) if created: created_snapshots.append(snapshot) # Save tags if present if tags: snapshot.save_tags(tags.split(',')) return created_snapshots def run(self) -> 'Snapshot | None': """ Execute this Crawl: run hooks, process JSONL, create snapshots. Called by the state machine when entering the 'started' state. Returns: The root Snapshot for this crawl, or None for system crawls that don't create snapshots """ import time import json from pathlib import Path from archivebox.hooks import run_hook, discover_hooks, process_hook_records from archivebox.config.configset import get_config # Debug logging to file (since stdout/stderr redirected to /dev/null in progress mode) debug_log = Path('/tmp/archivebox_crawl_debug.log') with open(debug_log, 'a') as f: f.write(f'\n=== Crawl.run() starting for {self.id} at {time.time()} ===\n') f.flush() # Get merged config with crawl context config = get_config(crawl=self) # Load all binaries.jsonl files from plugins # This replaces individual on_Crawl install hooks with declarative configuration from archivebox.hooks import BUILTIN_PLUGINS_DIR from archivebox.machine.models import Machine machine_id = str(Machine.current().id) binaries_records = [] for binaries_file in BUILTIN_PLUGINS_DIR.glob('*/binaries.jsonl'): try: with open(binaries_file, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): try: record = json.loads(line) if record.get('type') == 'Binary': record['machine_id'] = machine_id binaries_records.append(record) except json.JSONDecodeError: pass except Exception: pass # Process binary declarations before running hooks if binaries_records: overrides = {'crawl': self} process_hook_records(binaries_records, overrides=overrides) # Discover and run on_Crawl hooks with open(debug_log, 'a') as f: f.write(f'Discovering Crawl hooks...\n') f.flush() hooks = discover_hooks('Crawl', config=config) with open(debug_log, 'a') as f: f.write(f'Found {len(hooks)} hooks\n') f.flush() for hook in hooks: with open(debug_log, 'a') as f: f.write(f'Running hook: {hook.name}\n') f.flush() hook_start = time.time() plugin_name = hook.parent.name output_dir = self.OUTPUT_DIR / plugin_name output_dir.mkdir(parents=True, exist_ok=True) # Run hook using Process.launch() - returns Process model process = run_hook( hook, output_dir=output_dir, config=config, crawl_id=str(self.id), source_url=self.urls, # Pass full newline-separated URLs ) with open(debug_log, 'a') as f: f.write(f'Hook {hook.name} completed with status={process.status}\n') f.flush() hook_elapsed = time.time() - hook_start if hook_elapsed > 0.5: # Log slow hooks print(f'[yellow]⏱️ Hook {hook.name} took {hook_elapsed:.2f}s[/yellow]') # Background hook - still running if process.status == process.StatusChoices.RUNNING: continue # Foreground hook - process JSONL records from archivebox.hooks import extract_records_from_process records = extract_records_from_process(process) if records: print(f'[cyan]📝 Processing {len(records)} records from {hook.name}[/cyan]') for record in records[:3]: # Show first 3 print(f' Record: type={record.get("type")}, keys={list(record.keys())[:5]}') overrides = {'crawl': self} stats = process_hook_records(records, overrides=overrides) if stats: print(f'[green]✓ Created: {stats}[/green]') # Create snapshots from all URLs in self.urls with open(debug_log, 'a') as f: f.write(f'Creating snapshots from URLs...\n') f.flush() created_snapshots = self.create_snapshots_from_urls() with open(debug_log, 'a') as f: f.write(f'Created {len(created_snapshots)} snapshots\n') f.write(f'=== Crawl.run() complete ===\n\n') f.flush() return created_snapshots[0] if created_snapshots else None def is_finished(self) -> bool: """Check if crawl is finished (all snapshots sealed or no snapshots exist).""" from archivebox.core.models import Snapshot # Check if any snapshots exist for this crawl snapshots = Snapshot.objects.filter(crawl=self) # If no snapshots exist, allow finishing (e.g., archivebox://install crawls that only run hooks) if not snapshots.exists(): return True # If snapshots exist, check if all are sealed if snapshots.filter(status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED]).exists(): return False return True def cleanup(self): """Clean up background hooks and run on_CrawlEnd hooks.""" from archivebox.hooks import run_hook, discover_hooks from archivebox.misc.process_utils import safe_kill_process # Kill any background processes by scanning for all .pid files if self.OUTPUT_DIR.exists(): for pid_file in self.OUTPUT_DIR.glob('**/*.pid'): cmd_file = pid_file.parent / 'cmd.sh' # safe_kill_process now waits for termination and escalates to SIGKILL # Returns True only if process is confirmed dead killed = safe_kill_process(pid_file, cmd_file) if killed: pid_file.unlink(missing_ok=True) # Run on_CrawlEnd hooks from archivebox.config.configset import get_config config = get_config(crawl=self) hooks = discover_hooks('CrawlEnd', config=config) for hook in hooks: plugin_name = hook.parent.name output_dir = self.OUTPUT_DIR / plugin_name output_dir.mkdir(parents=True, exist_ok=True) result = run_hook( hook, output_dir=output_dir, config=config, crawl_id=str(self.id), source_url=self.urls, # Pass full newline-separated URLs ) # Log failures but don't block if result and result['returncode'] != 0: print(f'[yellow]⚠️ CrawlEnd hook failed: {hook.name}[/yellow]') # ============================================================================= # State Machines # ============================================================================= class CrawlMachine(BaseStateMachine, strict_states=True): """ State machine for managing Crawl lifecycle. Hook Lifecycle: ┌─────────────────────────────────────────────────────────────┐ │ QUEUED State │ │ • Waiting for crawl to be ready (has URLs) │ └─────────────────────────────────────────────────────────────┘ ↓ tick() when can_start() ┌─────────────────────────────────────────────────────────────┐ │ STARTED State → enter_started() │ │ 1. crawl.run() │ │ • discover_hooks('Crawl') → finds all crawl hooks │ │ • For each hook: │ │ - run_hook(script, output_dir, ...) │ │ - Parse JSONL from hook output │ │ - process_hook_records() → creates Snapshots │ │ • create_snapshots_from_urls() → from self.urls field │ │ │ │ 2. Snapshots process independently with their own │ │ state machines (see SnapshotMachine) │ └─────────────────────────────────────────────────────────────┘ ↓ tick() when is_finished() ┌─────────────────────────────────────────────────────────────┐ │ SEALED State → enter_sealed() │ │ • cleanup() → runs on_CrawlEnd hooks, kills background │ │ • Set retry_at=None (no more processing) │ └─────────────────────────────────────────────────────────────┘ """ model_attr_name = 'crawl' # States queued = State(value=Crawl.StatusChoices.QUEUED, initial=True) started = State(value=Crawl.StatusChoices.STARTED) sealed = State(value=Crawl.StatusChoices.SEALED, final=True) # Tick Event (polled by workers) tick = ( queued.to.itself(unless='can_start') | queued.to(started, cond='can_start') | started.to(sealed, cond='is_finished') ) # Manual event (triggered by last Snapshot sealing) seal = started.to(sealed) def can_start(self) -> bool: if not self.crawl.urls: print(f'[red]⚠️ Crawl {self.crawl.id} cannot start: no URLs[/red]') return False urls_list = self.crawl.get_urls_list() if not urls_list: print(f'[red]⚠️ Crawl {self.crawl.id} cannot start: no valid URLs in urls field[/red]') return False return True def is_finished(self) -> bool: """Check if all Snapshots for this crawl are finished.""" return self.crawl.is_finished() @started.enter def enter_started(self): import sys from archivebox.core.models import Snapshot print(f'[cyan]🔄 CrawlMachine.enter_started() - creating snapshots for {self.crawl.id}[/cyan]', file=sys.stderr) try: # Run the crawl - runs hooks, processes JSONL, creates snapshots first_snapshot = self.crawl.run() if first_snapshot: print(f'[cyan]🔄 Created {self.crawl.snapshot_set.count()} snapshot(s), first: {first_snapshot.url}[/cyan]', file=sys.stderr) # Update status to STARTED # Set retry_at to near future so tick() can poll and check is_finished() self.crawl.update_and_requeue( retry_at=timezone.now() + timedelta(seconds=2), status=Crawl.StatusChoices.STARTED, ) else: # No snapshots (system crawl like archivebox://install) print(f'[cyan]🔄 No snapshots created, sealing crawl immediately[/cyan]', file=sys.stderr) # Seal immediately since there's no work to do self.seal() except Exception as e: print(f'[red]⚠️ Crawl {self.crawl.id} failed to start: {e}[/red]') import traceback traceback.print_exc() raise @sealed.enter def enter_sealed(self): # Clean up background hooks and run on_CrawlEnd hooks self.crawl.cleanup() self.crawl.update_and_requeue( retry_at=None, status=Crawl.StatusChoices.SEALED, ) # ============================================================================= # Register State Machines # ============================================================================= # Manually register state machines with python-statemachine registry # (normally auto-discovered from statemachines.py, but we define them here for clarity) registry.register(CrawlMachine)