mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-06 07:47:53 +10:00
Implement hook step-based concurrency system
This implements the hook concurrency plan from TODO_hook_concurrency.md: ## Schema Changes - Add Snapshot.current_step (IntegerField 0-9, default=0) - Create migration 0034_snapshot_current_step.py - Fix uuid_compat imports in migrations 0032 and 0003 ## Core Logic - Add extract_step(hook_name) utility - extracts step from __XX_ pattern - Add is_background_hook(hook_name) utility - checks for .bg. suffix - Update Snapshot.create_pending_archiveresults() to create one AR per hook - Update ArchiveResult.run() to handle hook_name field - Add Snapshot.advance_step_if_ready() method for step advancement - Integrate with SnapshotMachine.is_finished() to call advance_step_if_ready() ## Worker Coordination - Update ArchiveResultWorker.get_queue() for step-based filtering - ARs are only claimable when their step <= snapshot.current_step ## Hook Renumbering - Step 5 (DOM extraction): singlefile→50, screenshot→51, pdf→52, dom→53, title→54, readability→55, headers→55, mercury→56, htmltotext→57 - Step 6 (post-DOM): wget→61, git→62, media→63.bg, gallerydl→64.bg, forumdl→65.bg, papersdl→66.bg - Step 7 (URL extraction): parse_* hooks moved to 70-75 Background hooks (.bg suffix) don't block step advancement, enabling long-running downloads to continue while other hooks proceed.
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
# Generated by Django 6.0 on 2025-12-28 05:12
|
||||
|
||||
import django.db.models.deletion
|
||||
import uuid
|
||||
from archivebox import uuid_compat
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
@@ -49,7 +49,7 @@ class Migration(migrations.Migration):
|
||||
migrations.AlterField(
|
||||
model_name='archiveresult',
|
||||
name='uuid',
|
||||
field=models.UUIDField(blank=True, db_index=True, default=uuid.uuid7, null=True),
|
||||
field=models.UUIDField(blank=True, db_index=True, default=uuid_compat.uuid7, null=True),
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name='snapshot',
|
||||
|
||||
23
archivebox/core/migrations/0034_snapshot_current_step.py
Normal file
23
archivebox/core/migrations/0034_snapshot_current_step.py
Normal file
@@ -0,0 +1,23 @@
|
||||
# Generated by Django 6.0 on 2025-12-28
|
||||
# Add Snapshot.current_step field for hook step-based execution
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('core', '0033_rename_extractor_add_hook_name'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='snapshot',
|
||||
name='current_step',
|
||||
field=models.PositiveSmallIntegerField(
|
||||
default=0,
|
||||
db_index=True,
|
||||
help_text='Current hook step being executed (0-9). Used for sequential hook execution.'
|
||||
),
|
||||
),
|
||||
]
|
||||
@@ -334,6 +334,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
|
||||
downloaded_at = models.DateTimeField(default=None, null=True, editable=False, db_index=True, blank=True)
|
||||
depth = models.PositiveSmallIntegerField(default=0, db_index=True) # 0 for root snapshot, 1+ for discovered URLs
|
||||
fs_version = models.CharField(max_length=10, default='0.9.0', help_text='Filesystem version of this snapshot (e.g., "0.7.0", "0.8.0", "0.9.0"). Used to trigger lazy migration on save().')
|
||||
current_step = models.PositiveSmallIntegerField(default=0, db_index=True, help_text='Current hook step being executed (0-9). Used for sequential hook execution.')
|
||||
|
||||
retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
|
||||
status = ModelWithStateMachine.StatusField(choices=ModelWithStateMachine.StatusChoices, default=ModelWithStateMachine.StatusChoices.QUEUED)
|
||||
@@ -1243,23 +1244,33 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
|
||||
|
||||
def create_pending_archiveresults(self) -> list['ArchiveResult']:
|
||||
"""
|
||||
Create ArchiveResult records for all enabled plugins.
|
||||
Create ArchiveResult records for all enabled hooks.
|
||||
|
||||
Uses the hooks system to discover available plugins from:
|
||||
Uses the hooks system to discover available hooks from:
|
||||
- archivebox/plugins/*/on_Snapshot__*.{py,sh,js}
|
||||
- data/plugins/*/on_Snapshot__*.{py,sh,js}
|
||||
"""
|
||||
from archivebox.hooks import get_enabled_plugins
|
||||
|
||||
plugins = get_enabled_plugins()
|
||||
Creates one ArchiveResult per hook (not per plugin), with hook_name set.
|
||||
This enables step-based execution where all hooks in a step can run in parallel.
|
||||
"""
|
||||
from archivebox.hooks import discover_hooks
|
||||
|
||||
hooks = discover_hooks('Snapshot')
|
||||
archiveresults = []
|
||||
|
||||
for plugin in plugins:
|
||||
if ArchiveResult.objects.filter(snapshot=self, plugin=plugin).exists():
|
||||
for hook_path in hooks:
|
||||
hook_name = hook_path.name # e.g., 'on_Snapshot__50_wget.py'
|
||||
plugin = hook_path.parent.name # e.g., 'wget'
|
||||
|
||||
# Check if AR already exists for this specific hook
|
||||
if ArchiveResult.objects.filter(snapshot=self, hook_name=hook_name).exists():
|
||||
continue
|
||||
archiveresult, _ = ArchiveResult.objects.get_or_create(
|
||||
snapshot=self, plugin=plugin,
|
||||
|
||||
archiveresult, created = ArchiveResult.objects.get_or_create(
|
||||
snapshot=self,
|
||||
hook_name=hook_name,
|
||||
defaults={
|
||||
'plugin': plugin,
|
||||
'status': ArchiveResult.INITIAL_STATE,
|
||||
'retry_at': timezone.now(),
|
||||
'created_by_id': self.created_by_id,
|
||||
@@ -1267,8 +1278,57 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
|
||||
)
|
||||
if archiveresult.status == ArchiveResult.INITIAL_STATE:
|
||||
archiveresults.append(archiveresult)
|
||||
|
||||
return archiveresults
|
||||
|
||||
def advance_step_if_ready(self) -> bool:
|
||||
"""
|
||||
Advance current_step if all foreground hooks in current step are finished.
|
||||
|
||||
Called by the state machine to check if step can advance.
|
||||
Background hooks (.bg) don't block step advancement.
|
||||
|
||||
Step advancement rules:
|
||||
- All foreground ARs in current step must be finished (SUCCEEDED/FAILED/SKIPPED)
|
||||
- Background ARs (hook_name contains '.bg.') are ignored for advancement
|
||||
- When ready, increments current_step by 1 (up to 9)
|
||||
|
||||
Returns:
|
||||
True if step was advanced, False if not ready or already at step 9.
|
||||
"""
|
||||
from archivebox.hooks import extract_step, is_background_hook
|
||||
|
||||
if self.current_step >= 9:
|
||||
return False # Already at final step
|
||||
|
||||
# Get all ARs for current step that are foreground
|
||||
current_step_ars = self.archiveresult_set.filter(
|
||||
hook_name__isnull=False
|
||||
).exclude(hook_name='')
|
||||
|
||||
# Check each AR in current step
|
||||
for ar in current_step_ars:
|
||||
ar_step = extract_step(ar.hook_name)
|
||||
if ar_step != self.current_step:
|
||||
continue # Not in current step
|
||||
|
||||
if is_background_hook(ar.hook_name):
|
||||
continue # Background hooks don't block
|
||||
|
||||
# Foreground hook in current step - check if finished
|
||||
if ar.status not in ArchiveResult.FINAL_OR_ACTIVE_STATES:
|
||||
# Still pending/queued - can't advance
|
||||
return False
|
||||
|
||||
if ar.status == ArchiveResult.StatusChoices.STARTED:
|
||||
# Still running - can't advance
|
||||
return False
|
||||
|
||||
# All foreground hooks in current step are finished - advance!
|
||||
self.current_step += 1
|
||||
self.save(update_fields=['current_step', 'modified_at'])
|
||||
return True
|
||||
|
||||
def retry_failed_archiveresults(self, retry_at: Optional['timezone.datetime'] = None) -> int:
|
||||
"""
|
||||
Reset failed/skipped ArchiveResults to queued for retry.
|
||||
@@ -1301,11 +1361,12 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
|
||||
end_ts=None,
|
||||
)
|
||||
|
||||
# Also reset the snapshot so it gets re-checked
|
||||
# Also reset the snapshot and current_step so it gets re-checked from the beginning
|
||||
if count > 0:
|
||||
self.status = self.StatusChoices.STARTED
|
||||
self.retry_at = retry_at
|
||||
self.save(update_fields=['status', 'retry_at', 'modified_at'])
|
||||
self.current_step = 0 # Reset to step 0 for retry
|
||||
self.save(update_fields=['status', 'retry_at', 'current_step', 'modified_at'])
|
||||
|
||||
return count
|
||||
|
||||
@@ -1841,45 +1902,63 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Execute this ArchiveResult's plugin and update status.
|
||||
Execute this ArchiveResult's hook and update status.
|
||||
|
||||
Discovers and runs the hook script for self.plugin,
|
||||
updates status/output fields, queues discovered URLs, and triggers indexing.
|
||||
If self.hook_name is set, runs only that specific hook.
|
||||
If self.hook_name is empty, discovers and runs all hooks for self.plugin (backwards compat).
|
||||
|
||||
Updates status/output fields, queues discovered URLs, and triggers indexing.
|
||||
"""
|
||||
from django.utils import timezone
|
||||
from archivebox.hooks import BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR, run_hook
|
||||
from archivebox.hooks import BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR, run_hook, is_background_hook
|
||||
|
||||
config_objects = [self.snapshot.crawl, self.snapshot] if self.snapshot.crawl else [self.snapshot]
|
||||
|
||||
# Find ALL hooks for this plugin
|
||||
# plugin = plugin name (e.g., 'chrome')
|
||||
# Each plugin can have multiple hooks that run in sequence
|
||||
# Determine which hook(s) to run
|
||||
hooks = []
|
||||
for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
|
||||
if not base_dir.exists():
|
||||
continue
|
||||
plugin_dir = base_dir / self.plugin
|
||||
if plugin_dir.exists():
|
||||
matches = list(plugin_dir.glob('on_Snapshot__*.*'))
|
||||
if matches:
|
||||
# Sort by name for deterministic order (numeric prefix controls execution order)
|
||||
hooks.extend(sorted(matches))
|
||||
|
||||
if self.hook_name:
|
||||
# SPECIFIC HOOK MODE: Find the specific hook by name
|
||||
for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
|
||||
if not base_dir.exists():
|
||||
continue
|
||||
plugin_dir = base_dir / self.plugin
|
||||
if plugin_dir.exists():
|
||||
hook_path = plugin_dir / self.hook_name
|
||||
if hook_path.exists():
|
||||
hooks.append(hook_path)
|
||||
break
|
||||
else:
|
||||
# LEGACY MODE: Discover all hooks for this plugin (backwards compatibility)
|
||||
for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
|
||||
if not base_dir.exists():
|
||||
continue
|
||||
plugin_dir = base_dir / self.plugin
|
||||
if plugin_dir.exists():
|
||||
matches = list(plugin_dir.glob('on_Snapshot__*.*'))
|
||||
if matches:
|
||||
hooks.extend(sorted(matches))
|
||||
|
||||
if not hooks:
|
||||
self.status = self.StatusChoices.FAILED
|
||||
self.output_str = f'No hooks found for plugin: {self.plugin}'
|
||||
if self.hook_name:
|
||||
self.output_str = f'Hook not found: {self.plugin}/{self.hook_name}'
|
||||
else:
|
||||
self.output_str = f'No hooks found for plugin: {self.plugin}'
|
||||
self.retry_at = None
|
||||
self.save()
|
||||
return
|
||||
|
||||
# plugin field contains plugin name
|
||||
# Output directory is plugin_dir for the hook output
|
||||
plugin_dir = Path(self.snapshot.output_dir) / self.plugin
|
||||
|
||||
# Run ALL hooks in the plugin sequentially
|
||||
start_ts = timezone.now()
|
||||
has_background_hook = False
|
||||
is_bg_hook = False
|
||||
|
||||
for hook in hooks:
|
||||
# Check if this is a background hook
|
||||
is_bg_hook = is_background_hook(hook.name)
|
||||
|
||||
result = run_hook(
|
||||
hook,
|
||||
output_dir=plugin_dir,
|
||||
@@ -1890,20 +1969,21 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
|
||||
depth=self.snapshot.depth,
|
||||
)
|
||||
|
||||
# If any hook is background, mark this ArchiveResult as started
|
||||
# Background hooks return None
|
||||
if result is None:
|
||||
has_background_hook = True
|
||||
is_bg_hook = True
|
||||
|
||||
# Update status based on hook execution
|
||||
if has_background_hook:
|
||||
# BACKGROUND HOOK(S) - still running, return immediately
|
||||
if is_bg_hook:
|
||||
# BACKGROUND HOOK - still running, return immediately
|
||||
# Status stays STARTED, will be finalized by Snapshot.cleanup()
|
||||
self.status = self.StatusChoices.STARTED
|
||||
self.start_ts = start_ts
|
||||
self.pwd = str(plugin_dir)
|
||||
self.save()
|
||||
return
|
||||
|
||||
# ALL FOREGROUND HOOKS - completed, update from filesystem
|
||||
# FOREGROUND HOOK - completed, update from filesystem
|
||||
self.start_ts = start_ts
|
||||
self.pwd = str(plugin_dir)
|
||||
self.update_from_output()
|
||||
@@ -1911,11 +1991,10 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
|
||||
# Clean up empty output directory if no files were created
|
||||
if plugin_dir.exists() and not self.output_files:
|
||||
try:
|
||||
# Only remove if directory is completely empty
|
||||
if not any(plugin_dir.iterdir()):
|
||||
plugin_dir.rmdir()
|
||||
except (OSError, RuntimeError):
|
||||
pass # Directory not empty or can't be removed, that's fine
|
||||
pass
|
||||
|
||||
def update_from_output(self):
|
||||
"""
|
||||
|
||||
@@ -60,6 +60,11 @@ class SnapshotMachine(StateMachine, strict_states=True):
|
||||
if not self.snapshot.archiveresult_set.exists():
|
||||
return False
|
||||
|
||||
# Try to advance step if ready (handles step-based hook execution)
|
||||
# This will increment current_step when all foreground hooks in current step are done
|
||||
while self.snapshot.advance_step_if_ready():
|
||||
pass # Keep advancing until we can't anymore
|
||||
|
||||
# if archiveresults exist but are still pending, it's not finished
|
||||
if self.snapshot.pending_archiveresults().exists():
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user