Refresh worker config from resolved plugin installs

This commit is contained in:
Nick Sweeting
2026-03-15 11:07:55 -07:00
parent 47f540c094
commit 86fdc3be1e
5 changed files with 89 additions and 10 deletions

View File

@@ -255,9 +255,10 @@ def get_config(
if crawl and hasattr(crawl, "config") and crawl.config:
config.update(crawl.config)
# Add CRAWL_OUTPUT_DIR for snapshot hooks to find shared Chrome session
# Add crawl path aliases for hooks that need shared crawl state.
if crawl and hasattr(crawl, "output_dir"):
config['CRAWL_OUTPUT_DIR'] = str(crawl.output_dir)
config['CRAWL_DIR'] = str(crawl.output_dir)
config['CRAWL_ID'] = str(getattr(crawl, "id", "")) if getattr(crawl, "id", None) else config.get('CRAWL_ID')
# Apply snapshot config overrides (highest priority)
@@ -267,6 +268,8 @@ def get_config(
if snapshot:
config['SNAPSHOT_ID'] = str(getattr(snapshot, "id", "")) if getattr(snapshot, "id", None) else config.get('SNAPSHOT_ID')
config['SNAPSHOT_DEPTH'] = int(getattr(snapshot, "depth", 0) or 0)
if hasattr(snapshot, "output_dir"):
config['SNAP_DIR'] = str(snapshot.output_dir)
if getattr(snapshot, "crawl_id", None):
config['CRAWL_ID'] = str(snapshot.crawl_id)

View File

@@ -342,8 +342,8 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
f.write(f'\n=== Crawl.run() starting for {self.id} at {time.time()} ===\n')
f.flush()
# Get merged config with crawl context
config = get_config(crawl=self)
def get_runtime_config():
return get_config(crawl=self)
machine = Machine.current()
declared_binary_names: set[str] = set()
@@ -393,6 +393,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
def run_crawl_hook(hook: Path) -> set[str]:
executed_crawl_hooks.add(str(hook))
primary_url = next(
(line.strip() for line in self.urls.splitlines() if line.strip()),
self.urls.strip(),
)
with open(debug_log, 'a') as f:
f.write(f'Running hook: {hook.name}\n')
@@ -405,9 +409,11 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
process = run_hook(
hook,
output_dir=output_dir,
config=config,
config=get_runtime_config(),
crawl_id=str(self.id),
source_url=self.urls,
url=primary_url,
snapshot_id=str(self.id),
)
with open(debug_log, 'a') as f:
f.write(f'Hook {hook.name} completed with status={process.status}\n')
@@ -476,7 +482,7 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
provider_hooks = [
hook
for hook in discover_hooks('Crawl', filter_disabled=False, config=config)
for hook in discover_hooks('Crawl', filter_disabled=False, config=get_runtime_config())
if hook.parent.name in needed_provider_names and str(hook) not in executed_crawl_hooks
]
if not provider_hooks:
@@ -489,7 +495,7 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
with open(debug_log, 'a') as f:
f.write(f'Discovering Crawl hooks...\n')
f.flush()
hooks = discover_hooks('Crawl', config=config)
hooks = discover_hooks('Crawl', config=get_runtime_config())
with open(debug_log, 'a') as f:
f.write(f'Found {len(hooks)} hooks\n')
f.flush()

View File

@@ -74,13 +74,31 @@ class Machine(ModelWithHealthStats):
global _CURRENT_MACHINE
if _CURRENT_MACHINE:
if timezone.now() < _CURRENT_MACHINE.modified_at + timedelta(seconds=MACHINE_RECHECK_INTERVAL):
return _CURRENT_MACHINE
return cls._hydrate_config_from_sibling(_CURRENT_MACHINE)
_CURRENT_MACHINE = None
_CURRENT_MACHINE, _ = cls.objects.update_or_create(
guid=get_host_guid(),
defaults={'hostname': socket.gethostname(), **get_os_info(), **get_vm_info(), 'stats': get_host_stats()},
)
return _CURRENT_MACHINE
return cls._hydrate_config_from_sibling(_CURRENT_MACHINE)
@classmethod
def _hydrate_config_from_sibling(cls, machine: 'Machine') -> 'Machine':
if machine.config:
return machine
sibling = (
cls.objects
.exclude(pk=machine.pk)
.filter(hostname=machine.hostname)
.exclude(config={})
.order_by('-modified_at')
.first()
)
if sibling and sibling.config:
machine.config = dict(sibling.config)
machine.save(update_fields=['config', 'modified_at'])
return machine
def to_json(self) -> dict:
"""
@@ -326,6 +344,7 @@ class Binary(ModelWithHealthStats, ModelWithStateMachine):
machine = Machine.current()
overrides = overrides or {}
normalized_overrides = Binary._normalize_overrides(record.get('overrides', {}))
# Case 1: Already installed (from on_Crawl hooks) - has abspath AND binproviders
# This happens when on_Crawl hooks detect already-installed binaries
@@ -357,7 +376,7 @@ class Binary(ModelWithHealthStats, ModelWithStateMachine):
name=name,
defaults={
'binproviders': record.get('binproviders', 'env'),
'overrides': record.get('overrides', {}),
'overrides': normalized_overrides,
'status': Binary.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
@@ -394,6 +413,31 @@ class Binary(ModelWithHealthStats, ModelWithStateMachine):
self.modified_at = timezone.now()
self.save()
@staticmethod
def _normalize_overrides(overrides: dict | None) -> dict:
"""Normalize hook-emitted binary overrides to the canonical install_args shape."""
if not isinstance(overrides, dict):
return {}
normalized: dict = {}
reserved_keys = {'custom_cmd', 'cmd', 'command'}
for provider, value in overrides.items():
if provider in reserved_keys:
normalized[provider] = value
continue
if isinstance(value, dict):
normalized[provider] = value
elif isinstance(value, (list, tuple)):
normalized[provider] = {'install_args': list(value)}
elif isinstance(value, str) and value.strip():
normalized[provider] = {'install_args': value.strip()}
else:
normalized[provider] = value
return normalized
def _allowed_binproviders(self) -> set[str] | None:
"""Return the allowed binproviders for this binary, or None for wildcard."""
providers = str(self.binproviders or '').strip()
@@ -441,8 +485,13 @@ class Binary(ModelWithHealthStats, ModelWithStateMachine):
from archivebox.hooks import discover_hooks, run_hook
from archivebox.config.configset import get_config
# Get merged config (Binary doesn't have crawl/snapshot context)
# Get merged config (Binary doesn't have crawl/snapshot context).
# Binary workers can install several dependencies in one process, so
# refresh from the latest persisted machine config before each hook run.
config = get_config()
current_machine = Machine.current()
if current_machine.config:
config.update(current_machine.config)
# Create output directory
output_dir = self.output_dir

View File

@@ -697,6 +697,15 @@ path = hook_process.env.get('PATH')
if lib_bin_dir:
assert lib_bin_dir in path, f"LIB_BIN_DIR not in PATH. LIB_BIN_DIR={{lib_bin_dir}}, PATH={{path[:200]}}..."
# Verify canonical crawl/snapshot directories are exported for plugins
crawl_dir = hook_process.env.get('CRAWL_DIR')
snap_dir = hook_process.env.get('SNAP_DIR')
print(f" CRAWL_DIR: {{crawl_dir}}")
print(f" SNAP_DIR: {{snap_dir}}")
assert crawl_dir is not None, "CRAWL_DIR not set"
assert snap_dir is not None, "SNAP_DIR not set"
assert str(snapshot.id) in snap_dir, f"SNAP_DIR should contain snapshot id, got {{snap_dir}}"
# Verify NODE_PATH is set
node_path = hook_process.env.get('NODE_PATH')
node_modules_dir = hook_process.env.get('NODE_MODULES_DIR')

View File

@@ -299,6 +299,10 @@ class Worker:
from django.conf import settings
import sys
refresh_machine_config = bool(
parent and getattr(parent, 'process_type', None) == Process.TypeChoices.WORKER
)
# Build command and get config for the appropriate scope
if cls.name == 'crawl':
crawl_id = kwargs.get('crawl_id')
@@ -349,6 +353,14 @@ class Worker:
else:
raise ValueError(f"Unknown worker type: {cls.name}")
if refresh_machine_config:
current_machine = Machine.current()
if current_machine.config:
# Worker subprocesses inherit parent Process.env, which can contain
# stale pre-install binary aliases. Refresh resolved machine values
# before serializing the child worker env.
env.update(current_machine.config)
# Ensure output directory exists
pwd.mkdir(parents=True, exist_ok=True)