Files
ArchiveBox/archivebox/core/models.py
Claude 3d985fa8c8 Implement hook architecture with JSONL output support
Phase 1: Database migration for new ArchiveResult fields
- Add output_str (TextField) for human-readable summary
- Add output_json (JSONField) for structured metadata
- Add output_files (JSONField) for dict of {relative_path: {}}
- Add output_size (BigIntegerField) for total bytes
- Add output_mimetypes (CharField) for CSV of mimetypes
- Add binary FK to InstalledBinary (optional)
- Migrate existing 'output' field to new split fields

Phase 3: Update run_hook() for JSONL parsing
- Support new JSONL format (any line with {type: 'ModelName', ...})
- Maintain backwards compatibility with RESULT_JSON= format
- Add plugin metadata to each parsed record
- Detect background hooks with .bg. suffix in filename
- Add find_binary_for_cmd() helper function
- Add create_model_record() for processing side-effect records

Phase 6: Update ArchiveResult.run()
- Handle background hooks (return immediately when result is None)
- Process 'records' from HookResult for side-effect models
- Use new output fields (output_str, output_json, output_files, etc.)
- Call create_model_record() for InstalledBinary, Machine updates

Phase 7: Add background hook support
- Add is_background_hook() method to ArchiveResult
- Add check_background_completed() to check if process exited
- Add finalize_background_hook() to collect results from completed hooks
- Update SnapshotMachine.is_finished() to check/finalize background hooks
- Update _populate_output_fields() to walk directory and populate stats

Also updated references to old 'output' field in:
- admin_archiveresults.py
- statemachines.py
- templatetags/core_tags.py
2025-12-27 08:38:49 +00:00

1602 lines
64 KiB
Python
Executable File

__package__ = 'archivebox.core'
from typing import Optional, Dict, Iterable, Any, List, TYPE_CHECKING
from archivebox.uuid_compat import uuid7
from datetime import datetime, timedelta
from django_stubs_ext.db.models import TypedModelMeta
import os
import json
from pathlib import Path
from django.db import models
from django.db.models import QuerySet, Value, Case, When, IntegerField
from django.utils.functional import cached_property
from django.utils.text import slugify
from django.utils import timezone
from django.core.cache import cache
from django.urls import reverse, reverse_lazy
from django.contrib import admin
from django.conf import settings
from archivebox.config import CONSTANTS
from archivebox.misc.system import get_dir_size, atomic_write
from archivebox.misc.util import parse_date, base_url, domain as url_domain, to_json, ts_to_date_str, urlencode, htmlencode, urldecode
from archivebox.misc.hashing import get_dir_info
from archivebox.hooks import (
ARCHIVE_METHODS_INDEXING_PRECEDENCE,
get_extractors, get_extractor_name, get_extractor_icon,
DEFAULT_EXTRACTOR_ICONS,
)
from archivebox.base_models.models import (
ModelWithUUID, ModelWithSerializers, ModelWithOutputDir,
ModelWithConfig, ModelWithNotes, ModelWithHealthStats,
get_or_create_system_user_pk,
)
from workers.models import ModelWithStateMachine
from workers.tasks import bg_archive_snapshot
from crawls.models import Crawl
from machine.models import NetworkInterface, InstalledBinary
class Tag(ModelWithSerializers):
# Keep AutoField for compatibility with main branch migrations
# Don't use UUIDField here - requires complex FK transformation
id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False, related_name='tag_set')
created_at = models.DateTimeField(default=timezone.now, db_index=True, null=True)
modified_at = models.DateTimeField(auto_now=True)
name = models.CharField(unique=True, blank=False, max_length=100)
slug = models.SlugField(unique=True, blank=False, max_length=100, editable=False)
snapshot_set: models.Manager['Snapshot']
class Meta(TypedModelMeta):
verbose_name = "Tag"
verbose_name_plural = "Tags"
def __str__(self):
return self.name
def save(self, *args, **kwargs):
is_new = self._state.adding
if is_new:
self.slug = slugify(self.name)
existing = set(Tag.objects.filter(slug__startswith=self.slug).values_list("slug", flat=True))
i = None
while True:
slug = f"{slugify(self.name)}_{i}" if i else slugify(self.name)
if slug not in existing:
self.slug = slug
break
i = (i or 0) + 1
super().save(*args, **kwargs)
if is_new:
from archivebox.misc.logging_util import log_worker_event
log_worker_event(
worker_type='DB',
event='Created Tag',
indent_level=0,
metadata={
'id': self.id,
'name': self.name,
'slug': self.slug,
},
)
@property
def api_url(self) -> str:
return reverse_lazy('api-1:get_tag', args=[self.id])
class SnapshotTag(models.Model):
id = models.AutoField(primary_key=True)
snapshot = models.ForeignKey('Snapshot', db_column='snapshot_id', on_delete=models.CASCADE, to_field='id')
tag = models.ForeignKey(Tag, db_column='tag_id', on_delete=models.CASCADE, to_field='id')
class Meta:
db_table = 'core_snapshot_tags'
unique_together = [('snapshot', 'tag')]
class SnapshotQuerySet(models.QuerySet):
"""Custom QuerySet for Snapshot model with export methods that persist through .filter() etc."""
# =========================================================================
# Filtering Methods
# =========================================================================
FILTER_TYPES = {
'exact': lambda pattern: models.Q(url=pattern),
'substring': lambda pattern: models.Q(url__icontains=pattern),
'regex': lambda pattern: models.Q(url__iregex=pattern),
'domain': lambda pattern: models.Q(url__istartswith=f"http://{pattern}") | models.Q(url__istartswith=f"https://{pattern}") | models.Q(url__istartswith=f"ftp://{pattern}"),
'tag': lambda pattern: models.Q(tags__name=pattern),
'timestamp': lambda pattern: models.Q(timestamp=pattern),
}
def filter_by_patterns(self, patterns: List[str], filter_type: str = 'exact') -> 'SnapshotQuerySet':
"""Filter snapshots by URL patterns using specified filter type"""
from archivebox.misc.logging import stderr
q_filter = models.Q()
for pattern in patterns:
try:
q_filter = q_filter | self.FILTER_TYPES[filter_type](pattern)
except KeyError:
stderr()
stderr(f'[X] Got invalid pattern for --filter-type={filter_type}:', color='red')
stderr(f' {pattern}')
raise SystemExit(2)
return self.filter(q_filter)
def search(self, patterns: List[str]) -> 'SnapshotQuerySet':
"""Search snapshots using the configured search backend"""
from archivebox.config.common import SEARCH_BACKEND_CONFIG
from archivebox.search import query_search_index
from archivebox.misc.logging import stderr
if not SEARCH_BACKEND_CONFIG.USE_SEARCHING_BACKEND:
stderr()
stderr('[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True', color='red')
raise SystemExit(2)
qsearch = self.none()
for pattern in patterns:
try:
qsearch |= query_search_index(pattern)
except:
raise SystemExit(2)
return self.all() & qsearch
# =========================================================================
# Export Methods
# =========================================================================
def to_json(self, with_headers: bool = False) -> str:
"""Generate JSON index from snapshots"""
import sys
from datetime import datetime, timezone as tz
from archivebox.config import VERSION
from archivebox.config.common import SERVER_CONFIG
MAIN_INDEX_HEADER = {
'info': 'This is an index of site data archived by ArchiveBox: The self-hosted web archive.',
'schema': 'archivebox.index.json',
'copyright_info': SERVER_CONFIG.FOOTER_INFO,
'meta': {
'project': 'ArchiveBox',
'version': VERSION,
'git_sha': VERSION,
'website': 'https://ArchiveBox.io',
'docs': 'https://github.com/ArchiveBox/ArchiveBox/wiki',
'source': 'https://github.com/ArchiveBox/ArchiveBox',
'issues': 'https://github.com/ArchiveBox/ArchiveBox/issues',
'dependencies': {},
},
} if with_headers else {}
snapshot_dicts = [s.to_dict(extended=True) for s in self.iterator(chunk_size=500)]
if with_headers:
output = {
**MAIN_INDEX_HEADER,
'num_links': len(snapshot_dicts),
'updated': datetime.now(tz.utc),
'last_run_cmd': sys.argv,
'links': snapshot_dicts,
}
else:
output = snapshot_dicts
return to_json(output, indent=4, sort_keys=True)
def to_csv(self, cols: Optional[List[str]] = None, header: bool = True, separator: str = ',', ljust: int = 0) -> str:
"""Generate CSV output from snapshots"""
cols = cols or ['timestamp', 'is_archived', 'url']
header_str = separator.join(col.ljust(ljust) for col in cols) if header else ''
row_strs = (s.to_csv(cols=cols, ljust=ljust, separator=separator) for s in self.iterator(chunk_size=500))
return '\n'.join((header_str, *row_strs))
def to_html(self, with_headers: bool = True) -> str:
"""Generate main index HTML from snapshots"""
from datetime import datetime, timezone as tz
from django.template.loader import render_to_string
from archivebox.config import VERSION
from archivebox.config.common import SERVER_CONFIG
from archivebox.config.version import get_COMMIT_HASH
template = 'static_index.html' if with_headers else 'minimal_index.html'
snapshot_list = list(self.iterator(chunk_size=500))
return render_to_string(template, {
'version': VERSION,
'git_sha': get_COMMIT_HASH() or VERSION,
'num_links': str(len(snapshot_list)),
'date_updated': datetime.now(tz.utc).strftime('%Y-%m-%d'),
'time_updated': datetime.now(tz.utc).strftime('%Y-%m-%d %H:%M'),
'links': snapshot_list,
'FOOTER_INFO': SERVER_CONFIG.FOOTER_INFO,
})
class SnapshotManager(models.Manager.from_queryset(SnapshotQuerySet)):
"""Manager for Snapshot model - uses SnapshotQuerySet for chainable methods"""
def filter(self, *args, **kwargs):
domain = kwargs.pop('domain', None)
qs = super().filter(*args, **kwargs)
if domain:
qs = qs.filter(url__icontains=f'://{domain}')
return qs
def get_queryset(self):
return super().get_queryset().prefetch_related('tags', 'archiveresult_set')
# =========================================================================
# Import Methods
# =========================================================================
def create_or_update_from_dict(self, link_dict: Dict[str, Any], created_by_id: Optional[int] = None) -> 'Snapshot':
"""Create or update a Snapshot from a SnapshotDict (parser output)"""
import re
from archivebox.config.common import GENERAL_CONFIG
url = link_dict['url']
timestamp = link_dict.get('timestamp')
title = link_dict.get('title')
tags_str = link_dict.get('tags')
tag_list = []
if tags_str:
tag_list = list(dict.fromkeys(
tag.strip() for tag in re.split(GENERAL_CONFIG.TAG_SEPARATOR_PATTERN, tags_str)
if tag.strip()
))
# Get most recent snapshot with this URL (URLs can exist in multiple crawls)
snapshot = self.filter(url=url).order_by('-created_at').first()
if snapshot:
if title and (not snapshot.title or len(title) > len(snapshot.title or '')):
snapshot.title = title
snapshot.save(update_fields=['title', 'modified_at'])
else:
if timestamp:
while self.filter(timestamp=timestamp).exists():
timestamp = str(float(timestamp) + 1.0)
snapshot = self.create(
url=url,
timestamp=timestamp,
title=title,
created_by_id=created_by_id or get_or_create_system_user_pk(),
)
if tag_list:
existing_tags = set(snapshot.tags.values_list('name', flat=True))
new_tags = set(tag_list) | existing_tags
snapshot.save_tags(new_tags)
return snapshot
def create_from_dicts(self, link_dicts: List[Dict[str, Any]], created_by_id: Optional[int] = None) -> List['Snapshot']:
"""Create or update multiple Snapshots from a list of SnapshotDicts"""
return [self.create_or_update_from_dict(d, created_by_id=created_by_id) for d in link_dicts]
def remove(self, atomic: bool = False) -> tuple:
"""Remove snapshots from the database"""
from django.db import transaction
if atomic:
with transaction.atomic():
return self.delete()
return self.delete()
class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False, related_name='snapshot_set', db_index=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
url = models.URLField(unique=False, db_index=True) # URLs can appear in multiple crawls
timestamp = models.CharField(max_length=32, unique=True, db_index=True, editable=False)
bookmarked_at = models.DateTimeField(default=timezone.now, db_index=True)
crawl: Crawl = models.ForeignKey(Crawl, on_delete=models.CASCADE, default=None, null=True, blank=True, related_name='snapshot_set', db_index=True) # type: ignore
title = models.CharField(max_length=512, null=True, blank=True, db_index=True)
downloaded_at = models.DateTimeField(default=None, null=True, editable=False, db_index=True, blank=True)
depth = models.PositiveSmallIntegerField(default=0, db_index=True) # 0 for root snapshot, 1+ for discovered URLs
fs_version = models.CharField(max_length=10, default='0.9.0', help_text='Filesystem version of this snapshot (e.g., "0.7.0", "0.8.0", "0.9.0"). Used to trigger lazy migration on save().')
retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
status = ModelWithStateMachine.StatusField(choices=ModelWithStateMachine.StatusChoices, default=ModelWithStateMachine.StatusChoices.QUEUED)
config = models.JSONField(default=dict, null=False, blank=False, editable=True)
notes = models.TextField(blank=True, null=False, default='')
output_dir = models.FilePathField(path=CONSTANTS.ARCHIVE_DIR, recursive=True, match='.*', default=None, null=True, blank=True, editable=True)
tags = models.ManyToManyField(Tag, blank=True, through=SnapshotTag, related_name='snapshot_set', through_fields=('snapshot', 'tag'))
state_machine_name = 'core.statemachines.SnapshotMachine'
state_field_name = 'status'
retry_at_field_name = 'retry_at'
StatusChoices = ModelWithStateMachine.StatusChoices
active_state = StatusChoices.STARTED
objects = SnapshotManager()
archiveresult_set: models.Manager['ArchiveResult']
class Meta(TypedModelMeta):
verbose_name = "Snapshot"
verbose_name_plural = "Snapshots"
constraints = [
# Allow same URL in different crawls, but not duplicates within same crawl
models.UniqueConstraint(fields=['url', 'crawl'], name='unique_url_per_crawl'),
]
def __str__(self):
return f'[{self.id}] {self.url[:64]}'
def save(self, *args, **kwargs):
is_new = self._state.adding
if not self.bookmarked_at:
self.bookmarked_at = self.created_at or timezone.now()
if not self.timestamp:
self.timestamp = str(self.bookmarked_at.timestamp())
# Migrate filesystem if needed (happens automatically on save)
if self.pk and self.fs_migration_needed:
from django.db import transaction
with transaction.atomic():
# Walk through migration chain automatically
current = self.fs_version
target = self._fs_current_version()
while current != target:
next_ver = self._fs_next_version(current)
method = f'_fs_migrate_from_{current.replace(".", "_")}_to_{next_ver.replace(".", "_")}'
# Only run if method exists (most are no-ops)
if hasattr(self, method):
getattr(self, method)()
current = next_ver
# Update version (still in transaction)
self.fs_version = target
super().save(*args, **kwargs)
if self.crawl and self.url not in self.crawl.urls:
self.crawl.urls += f'\n{self.url}'
self.crawl.save()
if is_new:
from archivebox.misc.logging_util import log_worker_event
log_worker_event(
worker_type='DB',
event='Created Snapshot',
indent_level=2,
url=self.url,
metadata={
'id': str(self.id),
'crawl_id': str(self.crawl_id) if self.crawl_id else None,
'depth': self.depth,
'status': self.status,
},
)
# =========================================================================
# Filesystem Migration Methods
# =========================================================================
@staticmethod
def _fs_current_version() -> str:
"""Get current ArchiveBox filesystem version (normalized to x.x.0 format)"""
from archivebox.config import VERSION
# Normalize version to x.x.0 format (e.g., "0.9.0rc1" -> "0.9.0")
parts = VERSION.split('.')
if len(parts) >= 2:
major, minor = parts[0], parts[1]
# Strip any non-numeric suffix from minor version
minor = ''.join(c for c in minor if c.isdigit())
return f'{major}.{minor}.0'
return '0.9.0' # Fallback if version parsing fails
@property
def fs_migration_needed(self) -> bool:
"""Check if snapshot needs filesystem migration"""
return self.fs_version != self._fs_current_version()
def _fs_next_version(self, version: str) -> str:
"""Get next version in migration chain"""
chain = ['0.7.0', '0.8.0', '0.9.0']
try:
idx = chain.index(version)
return chain[idx + 1] if idx + 1 < len(chain) else self._fs_current_version()
except ValueError:
# Unknown version - skip to current
return self._fs_current_version()
def _fs_migrate_from_0_7_0_to_0_8_0(self):
"""Migration from 0.7.0 to 0.8.0 layout (no-op)"""
# 0.7 and 0.8 both used archive/<timestamp>
# Nothing to do!
pass
def _fs_migrate_from_0_8_0_to_0_9_0(self):
"""
Migrate from flat file structure to organized extractor subdirectories.
0.8.x layout (flat):
archive/1234567890/
index.json
index.html
screenshot.png
warc/archive.warc.gz
media/video.mp4
0.9.x layout (organized):
archive/{timestamp}/
index.json
screenshot/
screenshot.png
singlefile/
index.html
warc/
archive.warc.gz
media/
video.mp4
Note: For now this is a no-op. The actual file reorganization will be
implemented when we're ready to do the migration. This placeholder ensures
the migration chain is set up correctly.
"""
# TODO: Implement actual file reorganization when ready
pass
# =========================================================================
# Output Directory Properties
# =========================================================================
@property
def output_dir_parent(self) -> str:
return 'archive'
@property
def output_dir_name(self) -> str:
return str(self.timestamp)
def archive(self, overwrite=False, methods=None):
return bg_archive_snapshot(self, overwrite=overwrite, methods=methods)
@admin.display(description='Tags')
def tags_str(self, nocache=True) -> str | None:
calc_tags_str = lambda: ','.join(sorted(tag.name for tag in self.tags.all()))
if hasattr(self, '_prefetched_objects_cache') and 'tags' in self._prefetched_objects_cache:
return calc_tags_str()
cache_key = f'{self.pk}-tags'
return cache.get_or_set(cache_key, calc_tags_str) if not nocache else calc_tags_str()
def icons(self) -> str:
"""Generate HTML icons showing which extractors have succeeded for this snapshot"""
from django.utils.html import format_html, mark_safe
cache_key = f'result_icons:{self.pk}:{(self.downloaded_at or self.modified_at or self.created_at or self.bookmarked_at).timestamp()}'
def calc_icons():
if hasattr(self, '_prefetched_objects_cache') and 'archiveresult_set' in self._prefetched_objects_cache:
archive_results = {r.extractor: r for r in self.archiveresult_set.all() if r.status == "succeeded" and (r.output_files or r.output_str)}
else:
# Filter for results that have either output_files or output_str
from django.db.models import Q
archive_results = {r.extractor: r for r in self.archiveresult_set.filter(
Q(status="succeeded") & (Q(output_files__isnull=False) | ~Q(output_str=''))
)}
path = self.archive_path
canon = self.canonical_outputs()
output = ""
output_template = '<a href="/{}/{}" class="exists-{}" title="{}">{}</a> &nbsp;'
# Get all extractors from hooks system (sorted by numeric prefix)
all_extractors = [get_extractor_name(e) for e in get_extractors()]
for extractor in all_extractors:
result = archive_results.get(extractor)
existing = result and result.status == 'succeeded' and (result.output_files or result.output_str)
icon = get_extractor_icon(extractor)
output += format_html(
output_template,
path,
canon.get(extractor, extractor + '/'),
str(bool(existing)),
extractor,
icon
)
return format_html('<span class="files-icons" style="font-size: 1.1em; opacity: 0.8; min-width: 240px; display: inline-block">{}</span>', mark_safe(output))
cache_result = cache.get(cache_key)
if cache_result:
return cache_result
fresh_result = calc_icons()
cache.set(cache_key, fresh_result, timeout=60 * 60 * 24)
return fresh_result
@property
def api_url(self) -> str:
return reverse_lazy('api-1:get_snapshot', args=[self.id])
def get_absolute_url(self):
return f'/{self.archive_path}'
@cached_property
def domain(self) -> str:
return url_domain(self.url)
@cached_property
def output_dir(self):
"""The filesystem path to the snapshot's output directory."""
return str(CONSTANTS.ARCHIVE_DIR / self.timestamp)
@cached_property
def archive_path(self):
return f'{CONSTANTS.ARCHIVE_DIR_NAME}/{self.timestamp}'
@cached_property
def archive_size(self):
try:
return get_dir_size(self.output_dir)[0]
except Exception:
return 0
def save_tags(self, tags: Iterable[str] = ()) -> None:
tags_id = [Tag.objects.get_or_create(name=tag)[0].pk for tag in tags if tag.strip()]
self.tags.clear()
self.tags.add(*tags_id)
def pending_archiveresults(self) -> QuerySet['ArchiveResult']:
return self.archiveresult_set.exclude(status__in=ArchiveResult.FINAL_OR_ACTIVE_STATES)
def run(self) -> list['ArchiveResult']:
"""
Execute this Snapshot by creating ArchiveResults for all enabled extractors.
Called by the state machine when entering the 'started' state.
"""
return self.create_pending_archiveresults()
def create_pending_archiveresults(self) -> list['ArchiveResult']:
"""
Create ArchiveResult records for all enabled extractors.
Uses the hooks system to discover available extractors from:
- archivebox/plugins/*/on_Snapshot__*.{py,sh,js}
- data/plugins/*/on_Snapshot__*.{py,sh,js}
"""
from archivebox.hooks import get_enabled_extractors
extractors = get_enabled_extractors()
archiveresults = []
for extractor in extractors:
if ArchiveResult.objects.filter(snapshot=self, extractor=extractor).exists():
continue
archiveresult, _ = ArchiveResult.objects.get_or_create(
snapshot=self, extractor=extractor,
defaults={
'status': ArchiveResult.INITIAL_STATE,
'retry_at': timezone.now(),
'created_by_id': self.created_by_id,
},
)
if archiveresult.status == ArchiveResult.INITIAL_STATE:
archiveresults.append(archiveresult)
return archiveresults
def retry_failed_archiveresults(self, retry_at: Optional['timezone.datetime'] = None) -> int:
"""
Reset failed/skipped ArchiveResults to queued for retry.
This enables seamless retry of the entire extraction pipeline:
- Resets FAILED and SKIPPED results to QUEUED
- Sets retry_at so workers pick them up
- Extractors run in order (numeric prefix)
- Each extractor checks its dependencies at runtime
Dependency handling (e.g., chrome_session → screenshot):
- Extractors check if required outputs exist before running
- If dependency output missing → extractor returns 'skipped'
- On retry, if dependency now succeeds → dependent can run
Returns count of ArchiveResults reset.
"""
retry_at = retry_at or timezone.now()
count = self.archiveresult_set.filter(
status__in=[
ArchiveResult.StatusChoices.FAILED,
ArchiveResult.StatusChoices.SKIPPED,
]
).update(
status=ArchiveResult.StatusChoices.QUEUED,
retry_at=retry_at,
output=None,
start_ts=None,
end_ts=None,
)
# Also reset the snapshot so it gets re-checked
if count > 0:
self.status = self.StatusChoices.STARTED
self.retry_at = retry_at
self.save(update_fields=['status', 'retry_at', 'modified_at'])
return count
# =========================================================================
# URL Helper Properties (migrated from Link schema)
# =========================================================================
@cached_property
def url_hash(self) -> str:
from hashlib import sha256
return sha256(self.url.encode()).hexdigest()[:8]
@cached_property
def scheme(self) -> str:
return self.url.split('://')[0]
@cached_property
def path(self) -> str:
parts = self.url.split('://', 1)
return '/' + parts[1].split('/', 1)[1] if len(parts) > 1 and '/' in parts[1] else '/'
@cached_property
def basename(self) -> str:
return self.path.split('/')[-1]
@cached_property
def extension(self) -> str:
basename = self.basename
return basename.split('.')[-1] if '.' in basename else ''
@cached_property
def base_url(self) -> str:
return f'{self.scheme}://{self.domain}'
@cached_property
def is_static(self) -> bool:
static_extensions = {'.pdf', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.mp4', '.mp3', '.wav', '.webm'}
return any(self.url.lower().endswith(ext) for ext in static_extensions)
@cached_property
def is_archived(self) -> bool:
output_paths = (
self.domain,
'output.html',
'output.pdf',
'screenshot.png',
'singlefile.html',
'readability/content.html',
'mercury/content.html',
'htmltotext.txt',
'media',
'git',
)
return any((Path(self.output_dir) / path).exists() for path in output_paths)
# =========================================================================
# Date/Time Properties (migrated from Link schema)
# =========================================================================
@cached_property
def bookmarked_date(self) -> Optional[str]:
max_ts = (timezone.now() + timedelta(days=30)).timestamp()
if self.timestamp and self.timestamp.replace('.', '').isdigit():
if 0 < float(self.timestamp) < max_ts:
return self._ts_to_date_str(datetime.fromtimestamp(float(self.timestamp)))
return str(self.timestamp)
return None
@cached_property
def downloaded_datestr(self) -> Optional[str]:
return self._ts_to_date_str(self.downloaded_at) if self.downloaded_at else None
@cached_property
def archive_dates(self) -> List[datetime]:
return [
result.start_ts
for result in self.archiveresult_set.all()
if result.start_ts
]
@cached_property
def oldest_archive_date(self) -> Optional[datetime]:
dates = self.archive_dates
return min(dates) if dates else None
@cached_property
def newest_archive_date(self) -> Optional[datetime]:
dates = self.archive_dates
return max(dates) if dates else None
@cached_property
def num_outputs(self) -> int:
return self.archiveresult_set.filter(status='succeeded').count()
@cached_property
def num_failures(self) -> int:
return self.archiveresult_set.filter(status='failed').count()
# =========================================================================
# Output Path Methods (migrated from Link schema)
# =========================================================================
def canonical_outputs(self) -> Dict[str, Optional[str]]:
"""
Intelligently discover the best output file for each extractor.
Uses actual ArchiveResult data and filesystem scanning with smart heuristics.
"""
FAVICON_PROVIDER = 'https://www.google.com/s2/favicons?domain={}'
# Mimetypes that can be embedded/previewed in an iframe
IFRAME_EMBEDDABLE_EXTENSIONS = {
'html', 'htm', 'pdf', 'txt', 'md', 'json', 'jsonl',
'png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico',
'mp4', 'webm', 'mp3', 'opus', 'ogg', 'wav',
}
MIN_DISPLAY_SIZE = 15_000 # 15KB - filter out tiny files
MAX_SCAN_FILES = 50 # Don't scan massive directories
def find_best_output_in_dir(dir_path: Path, extractor_name: str) -> Optional[str]:
"""Find the best representative file in an extractor's output directory"""
if not dir_path.exists() or not dir_path.is_dir():
return None
candidates = []
file_count = 0
# Special handling for media extractor - look for thumbnails
is_media_dir = extractor_name == 'media'
# Scan for suitable files
for file_path in dir_path.rglob('*'):
file_count += 1
if file_count > MAX_SCAN_FILES:
break
if file_path.is_dir() or file_path.name.startswith('.'):
continue
ext = file_path.suffix.lstrip('.').lower()
if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
continue
try:
size = file_path.stat().st_size
except OSError:
continue
# For media dir, allow smaller image files (thumbnails are often < 15KB)
min_size = 5_000 if (is_media_dir and ext in ('png', 'jpg', 'jpeg', 'webp', 'gif')) else MIN_DISPLAY_SIZE
if size < min_size:
continue
# Prefer main files: index.html, output.*, content.*, etc.
priority = 0
name_lower = file_path.name.lower()
if is_media_dir:
# Special prioritization for media directories
if any(keyword in name_lower for keyword in ('thumb', 'thumbnail', 'cover', 'poster')):
priority = 200 # Highest priority for thumbnails
elif ext in ('png', 'jpg', 'jpeg', 'webp', 'gif'):
priority = 150 # High priority for any image
elif ext in ('mp4', 'webm', 'mp3', 'opus', 'ogg'):
priority = 100 # Lower priority for actual media files
else:
priority = 50
elif 'index' in name_lower:
priority = 100
elif name_lower.startswith(('output', 'content', extractor_name)):
priority = 50
elif ext in ('html', 'htm', 'pdf'):
priority = 30
elif ext in ('png', 'jpg', 'jpeg', 'webp'):
priority = 20
else:
priority = 10
candidates.append((priority, size, file_path))
if not candidates:
return None
# Sort by priority (desc), then size (desc)
candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
best_file = candidates[0][2]
return str(best_file.relative_to(Path(self.output_dir)))
canonical = {
'index_path': 'index.html',
'google_favicon_path': FAVICON_PROVIDER.format(self.domain),
'archive_org_path': f'https://web.archive.org/web/{self.base_url}',
}
# Scan each ArchiveResult's output directory for the best file
snap_dir = Path(self.output_dir)
for result in self.archiveresult_set.filter(status='succeeded'):
if not result.output_files and not result.output_str:
continue
# Try to find the best output file for this extractor
extractor_dir = snap_dir / result.extractor
best_output = None
# Check output_files first (new field)
if result.output_files:
first_file = next(iter(result.output_files.keys()), None)
if first_file and (extractor_dir / first_file).exists():
best_output = f'{result.extractor}/{first_file}'
# Fallback to output_str if it looks like a path
if not best_output and result.output_str and (snap_dir / result.output_str).exists():
best_output = result.output_str
if not best_output and extractor_dir.exists():
# Intelligently find the best file in the extractor's directory
best_output = find_best_output_in_dir(extractor_dir, result.extractor)
if best_output:
canonical[f'{result.extractor}_path'] = best_output
# Also scan top-level for legacy outputs (backwards compatibility)
for file_path in snap_dir.glob('*'):
if file_path.is_dir() or file_path.name in ('index.html', 'index.json'):
continue
ext = file_path.suffix.lstrip('.').lower()
if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
continue
try:
size = file_path.stat().st_size
if size >= MIN_DISPLAY_SIZE:
# Add as generic output with stem as key
key = f'{file_path.stem}_path'
if key not in canonical:
canonical[key] = file_path.name
except OSError:
continue
if self.is_static:
static_path = f'warc/{self.timestamp}'
canonical.update({
'title': self.basename,
'wget_path': static_path,
})
return canonical
def latest_outputs(self, status: Optional[str] = None) -> Dict[str, Any]:
"""Get the latest output that each archive method produced"""
from archivebox.hooks import get_extractors
from django.db.models import Q
latest: Dict[str, Any] = {}
for archive_method in get_extractors():
results = self.archiveresult_set.filter(extractor=archive_method)
if status is not None:
results = results.filter(status=status)
# Filter for results with output_files or output_str
results = results.filter(Q(output_files__isnull=False) | ~Q(output_str='')).order_by('-start_ts')
result = results.first()
# Return embed_path() for backwards compatibility
latest[archive_method] = result.embed_path() if result else None
return latest
# =========================================================================
# Serialization Methods
# =========================================================================
def to_dict(self, extended: bool = False) -> Dict[str, Any]:
"""Convert Snapshot to a dictionary (replacement for Link._asdict())"""
from archivebox.misc.util import ts_to_date_str
result = {
'TYPE': 'core.models.Snapshot',
'id': str(self.id),
'url': self.url,
'timestamp': self.timestamp,
'title': self.title,
'tags': self.tags_str(),
'downloaded_at': self.downloaded_at.isoformat() if self.downloaded_at else None,
'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
# Computed properties
'domain': self.domain,
'scheme': self.scheme,
'base_url': self.base_url,
'path': self.path,
'basename': self.basename,
'extension': self.extension,
'is_static': self.is_static,
'is_archived': self.is_archived,
'archive_path': self.archive_path,
'output_dir': self.output_dir,
'link_dir': self.output_dir, # backwards compatibility alias
'archive_size': self.archive_size,
'bookmarked_date': self.bookmarked_date,
'downloaded_datestr': self.downloaded_datestr,
'num_outputs': self.num_outputs,
'num_failures': self.num_failures,
}
if extended:
result['canonical'] = self.canonical_outputs()
return result
def to_json(self, indent: int = 4) -> str:
"""Convert to JSON string"""
return to_json(self.to_dict(extended=True), indent=indent)
def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
"""Convert to CSV string"""
data = self.to_dict()
cols = cols or ['timestamp', 'is_archived', 'url']
return separator.join(to_json(data.get(col, ''), indent=None).ljust(ljust) for col in cols)
def write_json_details(self, out_dir: Optional[str] = None) -> None:
"""Write JSON index file for this snapshot to its output directory"""
out_dir = out_dir or self.output_dir
path = Path(out_dir) / CONSTANTS.JSON_INDEX_FILENAME
atomic_write(str(path), self.to_dict(extended=True))
def write_html_details(self, out_dir: Optional[str] = None) -> None:
"""Write HTML detail page for this snapshot to its output directory"""
from django.template.loader import render_to_string
from archivebox.config.common import SERVER_CONFIG
from archivebox.config.configset import get_config
from archivebox.misc.logging_util import printable_filesize
out_dir = out_dir or self.output_dir
config = get_config()
SAVE_ARCHIVE_DOT_ORG = config.get('SAVE_ARCHIVE_DOT_ORG', True)
TITLE_LOADING_MSG = 'Not yet archived...'
canonical = self.canonical_outputs()
context = {
**self.to_dict(extended=True),
**{f'{k}_path': v for k, v in canonical.items()},
'canonical': {f'{k}_path': v for k, v in canonical.items()},
'title': htmlencode(self.title or (self.base_url if self.is_archived else TITLE_LOADING_MSG)),
'url_str': htmlencode(urldecode(self.base_url)),
'archive_url': urlencode(f'warc/{self.timestamp}' or (self.domain if self.is_archived else '')) or 'about:blank',
'extension': self.extension or 'html',
'tags': self.tags_str() or 'untagged',
'size': printable_filesize(self.archive_size) if self.archive_size else 'pending',
'status': 'archived' if self.is_archived else 'not yet archived',
'status_color': 'success' if self.is_archived else 'danger',
'oldest_archive_date': ts_to_date_str(self.oldest_archive_date),
'SAVE_ARCHIVE_DOT_ORG': SAVE_ARCHIVE_DOT_ORG,
'PREVIEW_ORIGINALS': SERVER_CONFIG.PREVIEW_ORIGINALS,
}
rendered_html = render_to_string('snapshot.html', context)
atomic_write(str(Path(out_dir) / CONSTANTS.HTML_INDEX_FILENAME), rendered_html)
# =========================================================================
# Helper Methods
# =========================================================================
@staticmethod
def _ts_to_date_str(dt: Optional[datetime]) -> Optional[str]:
return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else None
class ArchiveResultManager(models.Manager):
def indexable(self, sorted: bool = True):
INDEXABLE_METHODS = [r[0] for r in ARCHIVE_METHODS_INDEXING_PRECEDENCE]
qs = self.get_queryset().filter(extractor__in=INDEXABLE_METHODS, status='succeeded')
if sorted:
precedence = [When(extractor=method, then=Value(p)) for method, p in ARCHIVE_METHODS_INDEXING_PRECEDENCE]
qs = qs.annotate(indexing_precedence=Case(*precedence, default=Value(1000), output_field=IntegerField())).order_by('indexing_precedence')
return qs
class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
class StatusChoices(models.TextChoices):
QUEUED = 'queued', 'Queued'
STARTED = 'started', 'Started'
BACKOFF = 'backoff', 'Waiting to retry'
SUCCEEDED = 'succeeded', 'Succeeded'
FAILED = 'failed', 'Failed'
SKIPPED = 'skipped', 'Skipped'
@classmethod
def get_extractor_choices(cls):
"""Get extractor choices from discovered hooks (for forms/admin)."""
extractors = [get_extractor_name(e) for e in get_extractors()]
return tuple((e, e) for e in extractors)
# Keep AutoField for backward compatibility with 0.7.x databases
# UUID field is added separately by migration for new records
id = models.AutoField(primary_key=True, editable=False)
# Note: unique constraint is added by migration 0027 - don't set unique=True here
# or SQLite table recreation in earlier migrations will fail
uuid = models.UUIDField(default=uuid7, null=True, blank=True, db_index=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False, related_name='archiveresult_set', db_index=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
snapshot: Snapshot = models.ForeignKey(Snapshot, on_delete=models.CASCADE) # type: ignore
# No choices= constraint - extractor names come from plugin system and can be any string
extractor = models.CharField(max_length=32, blank=False, null=False, db_index=True)
pwd = models.CharField(max_length=256, default=None, null=True, blank=True)
cmd = models.JSONField(default=None, null=True, blank=True)
cmd_version = models.CharField(max_length=128, default=None, null=True, blank=True)
# New output fields (replacing old 'output' field)
output_str = models.TextField(blank=True, default='', help_text='Human-readable output summary')
output_json = models.JSONField(null=True, blank=True, default=None, help_text='Structured metadata (headers, redirects, etc.)')
output_files = models.JSONField(default=dict, help_text='Dict of {relative_path: {metadata}}')
output_size = models.BigIntegerField(default=0, help_text='Total bytes of all output files')
output_mimetypes = models.CharField(max_length=512, blank=True, default='', help_text='CSV of mimetypes sorted by size')
# Binary FK (optional - set when hook reports cmd)
binary = models.ForeignKey(
'machine.InstalledBinary',
on_delete=models.SET_NULL,
null=True, blank=True,
related_name='archiveresults',
help_text='Primary binary used by this hook'
)
start_ts = models.DateTimeField(default=None, null=True, blank=True)
end_ts = models.DateTimeField(default=None, null=True, blank=True)
status = ModelWithStateMachine.StatusField(choices=StatusChoices.choices, default=StatusChoices.QUEUED)
retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
notes = models.TextField(blank=True, null=False, default='')
output_dir = models.CharField(max_length=256, default=None, null=True, blank=True)
iface = models.ForeignKey(NetworkInterface, on_delete=models.SET_NULL, null=True, blank=True)
state_machine_name = 'core.statemachines.ArchiveResultMachine'
retry_at_field_name = 'retry_at'
state_field_name = 'status'
active_state = StatusChoices.STARTED
objects = ArchiveResultManager()
class Meta(TypedModelMeta):
verbose_name = 'Archive Result'
verbose_name_plural = 'Archive Results Log'
def __str__(self):
return f'[{self.id}] {self.snapshot.url[:64]} -> {self.extractor}'
def save(self, *args, **kwargs):
is_new = self._state.adding
# Skip ModelWithOutputDir.save() to avoid creating index.json in plugin directories
# Call the Django Model.save() directly instead
models.Model.save(self, *args, **kwargs)
if is_new:
from archivebox.misc.logging_util import log_worker_event
log_worker_event(
worker_type='DB',
event='Created ArchiveResult',
indent_level=3,
extractor=self.extractor,
metadata={
'id': str(self.id),
'snapshot_id': str(self.snapshot_id),
'snapshot_url': str(self.snapshot.url)[:64],
'status': self.status,
},
)
@cached_property
def snapshot_dir(self):
return Path(self.snapshot.output_dir)
@cached_property
def url(self):
return self.snapshot.url
@property
def api_url(self) -> str:
return reverse_lazy('api-1:get_archiveresult', args=[self.id])
def get_absolute_url(self):
return f'/{self.snapshot.archive_path}/{self.extractor}'
@property
def extractor_module(self) -> Any | None:
# Hook scripts are now used instead of Python extractor modules
# The extractor name maps to hooks in archivebox/plugins/{extractor}/
return None
def output_exists(self) -> bool:
return os.path.exists(Path(self.snapshot_dir) / self.extractor)
def embed_path(self) -> Optional[str]:
"""
Get the relative path to the embeddable output file for this result.
Returns the first file from output_files if set, otherwise tries to
find a reasonable default based on the extractor type.
"""
# Check output_files dict for primary output
if self.output_files:
# Return first file from output_files (dict preserves insertion order)
first_file = next(iter(self.output_files.keys()), None)
if first_file:
return f'{self.extractor}/{first_file}'
# Fallback: check output_str if it looks like a file path
if self.output_str and ('/' in self.output_str or '.' in self.output_str):
return self.output_str
# Try to find output file based on extractor's canonical output path
canonical = self.snapshot.canonical_outputs()
extractor_key = f'{self.extractor}_path'
if extractor_key in canonical:
return canonical[extractor_key]
# Fallback to extractor directory
return f'{self.extractor}/'
def create_output_dir(self):
output_dir = Path(self.snapshot_dir) / self.extractor
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
@property
def output_dir_name(self) -> str:
return self.extractor
@property
def output_dir_parent(self) -> str:
return str(self.snapshot.OUTPUT_DIR.relative_to(CONSTANTS.DATA_DIR))
def save_search_index(self):
pass
def run(self):
"""
Execute this ArchiveResult's extractor and update status.
Discovers and runs the hook script for self.extractor,
updates status/output fields, queues discovered URLs, and triggers indexing.
"""
from django.utils import timezone
from archivebox.hooks import BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR, run_hook
config_objects = [self.snapshot.crawl, self.snapshot] if self.snapshot.crawl else [self.snapshot]
# Find hook for this extractor
hook = None
for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
if not base_dir.exists():
continue
matches = list(base_dir.glob(f'*/on_Snapshot__{self.extractor}.*'))
if matches:
hook = matches[0]
break
if not hook:
self.status = self.StatusChoices.FAILED
self.output_str = f'No hook found for: {self.extractor}'
self.retry_at = None
self.save()
return
# Use plugin directory name instead of extractor name (removes numeric prefix)
plugin_name = hook.parent.name
extractor_dir = Path(self.snapshot.output_dir) / plugin_name
# Run the hook
start_ts = timezone.now()
result = run_hook(
hook,
output_dir=extractor_dir,
config_objects=config_objects,
url=self.snapshot.url,
snapshot_id=str(self.snapshot.id),
)
# BACKGROUND HOOK - still running, return immediately
if result is None:
self.status = self.StatusChoices.STARTED
self.start_ts = start_ts
self.pwd = str(extractor_dir)
self.save()
return
end_ts = timezone.now()
# Get records from hook output (new JSONL format)
records = result.get('records', [])
# Clean up empty output directory if no files were created
output_files = result.get('output_files', [])
if not output_files and extractor_dir.exists():
try:
# Only remove if directory is completely empty
if not any(extractor_dir.iterdir()):
extractor_dir.rmdir()
except (OSError, RuntimeError):
pass # Directory not empty or can't be removed, that's fine
# Find the ArchiveResult record from hook output (if any)
ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
output_json = result.get('output_json') or {}
# Determine status from records, output_json, or return code
if ar_records:
# Use status from first ArchiveResult record
hook_data = ar_records[0]
status = hook_data.get('status', 'failed')
elif output_json.get('status'):
status = output_json['status']
elif result['returncode'] == 0:
status = 'succeeded'
else:
status = 'failed'
# Update self from result
status_map = {
'succeeded': self.StatusChoices.SUCCEEDED,
'failed': self.StatusChoices.FAILED,
'skipped': self.StatusChoices.SKIPPED,
}
self.status = status_map.get(status, self.StatusChoices.FAILED)
# Set output fields from records or output_json
if ar_records:
hook_data = ar_records[0]
self.output_str = hook_data.get('output_str') or hook_data.get('output') or ''
self.output_json = hook_data.get('output_json')
# Set cmd from JSONL record
if hook_data.get('cmd'):
self.cmd = hook_data['cmd']
self._set_binary_from_cmd(hook_data['cmd'])
if hook_data.get('cmd_version'):
self.cmd_version = hook_data['cmd_version'][:128]
else:
# Fallback to legacy output_json format
self.output_str = output_json.get('output_str') or output_json.get('output') or result['stdout'][:1024] or result['stderr'][:1024] or ''
self.output_json = output_json.get('output_json') if output_json.get('output_json') else None
if output_json.get('cmd_version'):
self.cmd_version = output_json['cmd_version'][:128]
if output_json.get('cmd'):
self.cmd = output_json['cmd']
self._set_binary_from_cmd(output_json['cmd'])
self.start_ts = start_ts
self.end_ts = end_ts
self.retry_at = None
self.pwd = str(extractor_dir)
# Populate output_files, output_size, output_mimetypes from filesystem
if extractor_dir.exists():
self._populate_output_fields(extractor_dir)
self.save()
# Process side-effect records (InstalledBinary, Machine config, etc.)
from archivebox.hooks import create_model_record
for record in records:
if record.get('type') != 'ArchiveResult':
create_model_record(record.copy()) # Copy to avoid mutating original
# Queue any discovered URLs for crawling (parser extractors write urls.jsonl)
self._queue_urls_for_crawl(extractor_dir)
# Update snapshot title if this is the title extractor
# Check both old numeric name and new plugin name for compatibility
extractor_name = get_extractor_name(self.extractor)
if self.status == self.StatusChoices.SUCCEEDED and extractor_name == 'title':
self._update_snapshot_title(extractor_dir)
# Trigger search indexing if succeeded
if self.status == self.StatusChoices.SUCCEEDED:
self.trigger_search_indexing()
def _populate_output_fields(self, output_dir: Path) -> None:
"""
Walk output directory and populate output_files, output_size, output_mimetypes.
"""
import mimetypes
from collections import defaultdict
exclude_names = {'stdout.log', 'stderr.log', 'hook.pid', 'listener.pid'}
# Track mimetypes and sizes for aggregation
mime_sizes = defaultdict(int)
total_size = 0
output_files = {} # Dict keyed by relative path
for file_path in output_dir.rglob('*'):
# Skip non-files and infrastructure files
if not file_path.is_file():
continue
if file_path.name in exclude_names:
continue
# Get file stats
try:
stat = file_path.stat()
mime_type, _ = mimetypes.guess_type(str(file_path))
mime_type = mime_type or 'application/octet-stream'
# Track for ArchiveResult fields
relative_path = str(file_path.relative_to(output_dir))
output_files[relative_path] = {} # Empty dict, extensible for future metadata
mime_sizes[mime_type] += stat.st_size
total_size += stat.st_size
except (OSError, IOError):
continue
# Populate ArchiveResult fields
self.output_files = output_files
self.output_size = total_size
# Build output_mimetypes CSV (sorted by size descending)
sorted_mimes = sorted(mime_sizes.items(), key=lambda x: x[1], reverse=True)
self.output_mimetypes = ','.join(mime for mime, _ in sorted_mimes)
def _set_binary_from_cmd(self, cmd: list) -> None:
"""
Find InstalledBinary for command and set binary FK.
Tries matching by absolute path first, then by binary name.
Only matches binaries on the current machine.
"""
if not cmd:
return
from machine.models import Machine
bin_path_or_name = cmd[0] if isinstance(cmd, list) else cmd
machine = Machine.current()
# Try matching by absolute path first
binary = InstalledBinary.objects.filter(
abspath=bin_path_or_name,
machine=machine
).first()
if binary:
self.binary = binary
return
# Fallback: match by binary name
bin_name = Path(bin_path_or_name).name
binary = InstalledBinary.objects.filter(
name=bin_name,
machine=machine
).first()
if binary:
self.binary = binary
def _update_snapshot_title(self, extractor_dir: Path):
"""
Update snapshot title from title extractor output.
The title extractor writes title.txt with the extracted page title.
This updates the Snapshot.title field if the file exists and has content.
"""
title_file = extractor_dir / 'title.txt'
if title_file.exists():
try:
title = title_file.read_text(encoding='utf-8').strip()
if title and (not self.snapshot.title or len(title) > len(self.snapshot.title)):
self.snapshot.title = title[:512] # Max length from model
self.snapshot.save(update_fields=['title', 'modified_at'])
except Exception:
pass # Failed to read title, that's okay
def _queue_urls_for_crawl(self, extractor_dir: Path):
"""
Read urls.jsonl and queue discovered URLs for crawling.
Parser extractors output urls.jsonl with discovered URLs and Tags.
- Tag records: {"type": "Tag", "name": "..."}
- Snapshot records: {"type": "Snapshot", "url": "...", ...}
Tags are created in the database.
URLs get added to the parent Crawl's queue with metadata
(depth, via_snapshot, via_extractor) for recursive crawling.
Used at all depths:
- depth=0: Initial source file (e.g., bookmarks.html) parsed for URLs
- depth>0: Crawled pages parsed for outbound links
"""
import json
if not self.snapshot.crawl:
return
urls_file = extractor_dir / 'urls.jsonl'
if not urls_file.exists():
return
urls_added = 0
tags_created = 0
with open(urls_file, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
record_type = entry.get('type', 'Snapshot')
# Handle Tag records
if record_type == 'Tag':
tag_name = entry.get('name')
if tag_name:
Tag.objects.get_or_create(name=tag_name)
tags_created += 1
continue
# Handle Snapshot records (or records without type)
if not entry.get('url'):
continue
# Add crawl metadata
entry['depth'] = self.snapshot.depth + 1
entry['via_snapshot'] = str(self.snapshot.id)
entry['via_extractor'] = self.extractor
if self.snapshot.crawl.add_url(entry):
urls_added += 1
except json.JSONDecodeError:
continue
if urls_added > 0:
self.snapshot.crawl.create_snapshots_from_urls()
def trigger_search_indexing(self):
"""Run any ArchiveResult__index hooks to update search indexes."""
from archivebox.hooks import discover_hooks, run_hook
# Pass config objects in priority order (later overrides earlier)
config_objects = [self.snapshot.crawl, self.snapshot] if self.snapshot.crawl else [self.snapshot]
for hook in discover_hooks('ArchiveResult__index'):
run_hook(
hook,
output_dir=self.output_dir,
config_objects=config_objects,
url=self.snapshot.url,
snapshot_id=str(self.snapshot.id),
extractor=self.extractor,
)
@property
def output_dir(self) -> Path:
"""Get the output directory for this extractor's results."""
return Path(self.snapshot.output_dir) / self.extractor
def is_background_hook(self) -> bool:
"""Check if this ArchiveResult is for a background hook."""
extractor_dir = Path(self.pwd) if self.pwd else None
if not extractor_dir:
return False
pid_file = extractor_dir / 'hook.pid'
return pid_file.exists()
def check_background_completed(self) -> bool:
"""
Check if background hook process has exited.
Returns:
True if completed (process exited), False if still running
"""
extractor_dir = Path(self.pwd) if self.pwd else None
if not extractor_dir:
return True # No pwd = completed or failed to start
pid_file = extractor_dir / 'hook.pid'
if not pid_file.exists():
return True # No PID file = completed or failed to start
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, 0) # Signal 0 = check if process exists
return False # Still running
except (OSError, ValueError):
return True # Process exited or invalid PID
def finalize_background_hook(self) -> None:
"""
Collect final results from completed background hook.
Same logic as run() but for background hooks that already started.
"""
from archivebox.hooks import create_model_record
extractor_dir = Path(self.pwd) if self.pwd else None
if not extractor_dir or not extractor_dir.exists():
self.status = self.StatusChoices.FAILED
self.output_str = 'Background hook output directory not found'
self.end_ts = timezone.now()
self.retry_at = None
self.save()
return
stdout_file = extractor_dir / 'stdout.log'
stderr_file = extractor_dir / 'stderr.log'
# Read logs
stdout = stdout_file.read_text() if stdout_file.exists() else ''
# Parse JSONL output
records = []
for line in stdout.splitlines():
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
if 'type' in data:
records.append(data)
except json.JSONDecodeError:
continue
# Find the ArchiveResult record
ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
if ar_records:
hook_data = ar_records[0]
# Apply hook's data
status_str = hook_data.get('status', 'failed')
status_map = {
'succeeded': self.StatusChoices.SUCCEEDED,
'failed': self.StatusChoices.FAILED,
'skipped': self.StatusChoices.SKIPPED,
}
self.status = status_map.get(status_str, self.StatusChoices.FAILED)
self.output_str = hook_data.get('output_str') or hook_data.get('output') or ''
self.output_json = hook_data.get('output_json')
# Determine binary FK from cmd
if hook_data.get('cmd'):
self.cmd = hook_data['cmd']
self._set_binary_from_cmd(hook_data['cmd'])
if hook_data.get('cmd_version'):
self.cmd_version = hook_data['cmd_version'][:128]
else:
# No output = failed
self.status = self.StatusChoices.FAILED
self.output_str = 'Background hook did not output ArchiveResult'
self.end_ts = timezone.now()
self.retry_at = None
# Populate output fields from filesystem
if extractor_dir.exists():
self._populate_output_fields(extractor_dir)
self.save()
# Create any side-effect records
for record in records:
if record.get('type') != 'ArchiveResult':
create_model_record(record.copy())
# Cleanup PID files and empty logs
pid_file = extractor_dir / 'hook.pid'
pid_file.unlink(missing_ok=True)
if stdout_file.exists() and stdout_file.stat().st_size == 0:
stdout_file.unlink()
if stderr_file.exists() and stderr_file.stat().st_size == 0:
stderr_file.unlink()