mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-06 07:47:53 +10:00
fix lib bin dir and archivebox add hanging
This commit is contained in:
@@ -131,7 +131,7 @@ def add(urls: str | list[str],
|
||||
else:
|
||||
# Foreground mode: run orchestrator inline until all work is done
|
||||
print(f'[green]\\[*] Starting orchestrator to process crawl...[/green]')
|
||||
orchestrator = Orchestrator(exit_on_idle=True)
|
||||
orchestrator = Orchestrator(exit_on_idle=True, crawl_id=str(crawl.id))
|
||||
orchestrator.runloop() # Block until complete
|
||||
|
||||
# 6. Return the list of Snapshots in this crawl
|
||||
|
||||
@@ -66,6 +66,10 @@ class StorageConfig(BaseConfigSet):
|
||||
# should not be a remote/network/FUSE mount for speed reasons, otherwise extractors will be slow
|
||||
LIB_DIR: Path = Field(default=CONSTANTS.DEFAULT_LIB_DIR)
|
||||
|
||||
# LIB_BIN_DIR is where all installed binaries are symlinked for easy PATH management
|
||||
# Derived from LIB_DIR / 'bin', should be prepended to PATH for all hook executions
|
||||
LIB_BIN_DIR: Path = Field(default=CONSTANTS.DEFAULT_LIB_BIN_DIR)
|
||||
|
||||
# CUSTOM_TEMPLATES_DIR allows users to override default templates
|
||||
# defaults to DATA_DIR / 'user_templates' but can be configured
|
||||
CUSTOM_TEMPLATES_DIR: Path = Field(default=CONSTANTS.CUSTOM_TEMPLATES_DIR)
|
||||
|
||||
@@ -108,9 +108,10 @@ class ConstantsDict(Mapping):
|
||||
# Runtime dirs
|
||||
TMP_DIR_NAME: str = 'tmp'
|
||||
DEFAULT_TMP_DIR: Path = DATA_DIR / TMP_DIR_NAME / MACHINE_ID # ./data/tmp/abc3244323
|
||||
|
||||
|
||||
LIB_DIR_NAME: str = 'lib'
|
||||
DEFAULT_LIB_DIR: Path = DATA_DIR / LIB_DIR_NAME / MACHINE_TYPE # ./data/lib/arm64-linux-docker
|
||||
DEFAULT_LIB_BIN_DIR: Path = DEFAULT_LIB_DIR / 'bin' # ./data/lib/arm64-linux-docker/bin
|
||||
|
||||
# Config constants
|
||||
TIMEZONE: str = 'UTC'
|
||||
|
||||
@@ -288,6 +288,11 @@ def get_code_locations():
|
||||
'enabled': True,
|
||||
'is_valid': os.path.isdir(STORAGE_CONFIG.LIB_DIR) and os.access(STORAGE_CONFIG.LIB_DIR, os.R_OK) and os.access(STORAGE_CONFIG.LIB_DIR, os.W_OK), # read + write
|
||||
},
|
||||
'LIB_BIN_DIR': {
|
||||
'path': STORAGE_CONFIG.LIB_BIN_DIR.resolve(),
|
||||
'enabled': True,
|
||||
'is_valid': os.path.isdir(STORAGE_CONFIG.LIB_BIN_DIR) and os.access(STORAGE_CONFIG.LIB_BIN_DIR, os.R_OK) and os.access(STORAGE_CONFIG.LIB_BIN_DIR, os.W_OK), # read + write
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
|
||||
@@ -328,6 +328,24 @@ def run_hook(
|
||||
env['ARCHIVE_DIR'] = str(getattr(settings, 'ARCHIVE_DIR', Path.cwd() / 'archive'))
|
||||
env.setdefault('MACHINE_ID', getattr(settings, 'MACHINE_ID', '') or os.environ.get('MACHINE_ID', ''))
|
||||
|
||||
# Get LIB_DIR and LIB_BIN_DIR from config
|
||||
lib_dir = config.get('LIB_DIR', getattr(settings, 'LIB_DIR', None))
|
||||
lib_bin_dir = config.get('LIB_BIN_DIR', getattr(settings, 'LIB_BIN_DIR', None))
|
||||
if lib_dir:
|
||||
env['LIB_DIR'] = str(lib_dir)
|
||||
if not lib_bin_dir and lib_dir:
|
||||
# Derive LIB_BIN_DIR from LIB_DIR if not set
|
||||
lib_bin_dir = Path(lib_dir) / 'bin'
|
||||
|
||||
# Prepend LIB_BIN_DIR to PATH so symlinked binaries take priority
|
||||
if lib_bin_dir:
|
||||
lib_bin_dir = str(lib_bin_dir)
|
||||
env['LIB_BIN_DIR'] = lib_bin_dir
|
||||
current_path = env.get('PATH', '')
|
||||
# Only prepend if not already at the beginning
|
||||
if not current_path.startswith(f'{lib_bin_dir}:'):
|
||||
env['PATH'] = f'{lib_bin_dir}:{current_path}' if current_path else lib_bin_dir
|
||||
|
||||
# Use Machine.config.PATH if set (includes pip/npm bin dirs from providers)
|
||||
try:
|
||||
from archivebox.machine.models import Machine
|
||||
@@ -335,7 +353,11 @@ def run_hook(
|
||||
if machine and machine.config:
|
||||
machine_path = machine.config.get('config/PATH')
|
||||
if machine_path:
|
||||
env['PATH'] = machine_path
|
||||
# Prepend LIB_BIN_DIR to machine PATH as well
|
||||
if lib_bin_dir and not machine_path.startswith(f'{lib_bin_dir}:'):
|
||||
env['PATH'] = f'{lib_bin_dir}:{machine_path}'
|
||||
else:
|
||||
env['PATH'] = machine_path
|
||||
# Also set NODE_MODULES_DIR if configured
|
||||
node_modules_dir = machine.config.get('config/NODE_MODULES_DIR')
|
||||
if node_modules_dir:
|
||||
|
||||
@@ -60,13 +60,15 @@ def test_extracts_screenshot_from_example_com():
|
||||
tmpdir = Path(tmpdir)
|
||||
|
||||
# Run screenshot extraction hook
|
||||
env = get_test_env()
|
||||
print(f"\n[DEBUG] NODE_V8_COVERAGE={env.get('NODE_V8_COVERAGE', 'NOT SET')}", file=sys.stderr)
|
||||
result = subprocess.run(
|
||||
['node', str(SCREENSHOT_HOOK), f'--url={TEST_URL}', '--snapshot-id=test789'],
|
||||
cwd=tmpdir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
env=get_test_env()
|
||||
env=env
|
||||
)
|
||||
|
||||
assert result.returncode == 0, f"Extraction failed: {result.stderr}"
|
||||
|
||||
@@ -182,7 +182,7 @@ class Orchestrator:
|
||||
def spawn_worker(self, WorkerClass: Type[Worker]) -> int | None:
|
||||
"""Spawn a new worker process. Returns PID or None if spawn failed."""
|
||||
try:
|
||||
pid = WorkerClass.start(daemon=False)
|
||||
pid = WorkerClass.start(daemon=False, crawl_id=self.crawl_id)
|
||||
|
||||
# CRITICAL: Block until worker registers itself in Process table
|
||||
# This prevents race condition where orchestrator spawns multiple workers
|
||||
@@ -248,11 +248,11 @@ class Orchestrator:
|
||||
for WorkerClass in self.WORKER_TYPES:
|
||||
# Get queue for this worker type
|
||||
# Need to instantiate worker to get queue (for model access)
|
||||
worker = WorkerClass(worker_id=-1) # temp instance just for queue access
|
||||
worker = WorkerClass(worker_id=-1, crawl_id=self.crawl_id) # temp instance just for queue access
|
||||
queue = worker.get_queue()
|
||||
queue_count = queue.count()
|
||||
queue_sizes[WorkerClass.name] = queue_count
|
||||
|
||||
|
||||
# Spawn worker if needed
|
||||
if self.should_spawn_worker(WorkerClass, queue_count):
|
||||
self.spawn_worker(WorkerClass)
|
||||
@@ -270,15 +270,26 @@ class Orchestrator:
|
||||
def has_future_work(self) -> bool:
|
||||
"""Check if there's work scheduled for the future (retry_at > now)."""
|
||||
for WorkerClass in self.WORKER_TYPES:
|
||||
worker = WorkerClass(worker_id=-1)
|
||||
worker = WorkerClass(worker_id=-1, crawl_id=self.crawl_id)
|
||||
Model = worker.get_model()
|
||||
# Check for items not in final state with future retry_at
|
||||
future_count = Model.objects.filter(
|
||||
|
||||
# Build filter for future work, respecting crawl_id if set
|
||||
qs = Model.objects.filter(
|
||||
retry_at__gt=timezone.now()
|
||||
).exclude(
|
||||
status__in=Model.FINAL_STATES
|
||||
).count()
|
||||
if future_count > 0:
|
||||
)
|
||||
|
||||
# Apply crawl_id filter if set
|
||||
if self.crawl_id:
|
||||
if WorkerClass.name == 'crawl':
|
||||
qs = qs.filter(id=self.crawl_id)
|
||||
elif WorkerClass.name == 'snapshot':
|
||||
qs = qs.filter(crawl_id=self.crawl_id)
|
||||
elif WorkerClass.name == 'archiveresult':
|
||||
qs = qs.filter(snapshot__crawl_id=self.crawl_id)
|
||||
|
||||
if qs.count() > 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -404,14 +415,6 @@ class Orchestrator:
|
||||
# Track which snapshots are still active
|
||||
active_ids = set()
|
||||
|
||||
# Debug: check for duplicates
|
||||
snapshot_urls = [s.url for s in active_snapshots]
|
||||
if len(active_snapshots) != len(set(snapshot_urls)):
|
||||
# We have duplicate URLs - let's deduplicate by showing snapshot ID
|
||||
show_id = True
|
||||
else:
|
||||
show_id = False
|
||||
|
||||
for snapshot in active_snapshots:
|
||||
active_ids.add(snapshot.id)
|
||||
|
||||
@@ -442,11 +445,7 @@ class Orchestrator:
|
||||
|
||||
# Build description with URL + current plugin
|
||||
url = snapshot.url[:50] + '...' if len(snapshot.url) > 50 else snapshot.url
|
||||
if show_id:
|
||||
# Show snapshot ID if there are duplicate URLs
|
||||
description = f"[{str(snapshot.id)[:8]}] {url}{current_plugin}"
|
||||
else:
|
||||
description = f"{url}{current_plugin}"
|
||||
description = f"{url}{current_plugin}"
|
||||
|
||||
# Create or update task
|
||||
if snapshot.id not in task_ids:
|
||||
|
||||
@@ -238,7 +238,7 @@ class Worker:
|
||||
|
||||
log_worker_event(
|
||||
worker_type=worker_type_name,
|
||||
event='Starting...',
|
||||
event='Processing',
|
||||
indent_level=indent_level,
|
||||
pid=self.pid,
|
||||
worker_id=str(self.worker_id),
|
||||
@@ -365,6 +365,13 @@ class SnapshotWorker(Worker):
|
||||
from archivebox.core.models import Snapshot
|
||||
return Snapshot
|
||||
|
||||
def get_queue(self) -> QuerySet:
|
||||
"""Get queue of Snapshots ready for processing, optionally filtered by crawl_id."""
|
||||
qs = super().get_queue()
|
||||
if self.crawl_id:
|
||||
qs = qs.filter(crawl_id=self.crawl_id)
|
||||
return qs
|
||||
|
||||
|
||||
class ArchiveResultWorker(Worker):
|
||||
"""Worker for processing ArchiveResult objects."""
|
||||
@@ -392,6 +399,9 @@ class ArchiveResultWorker(Worker):
|
||||
|
||||
qs = super().get_queue()
|
||||
|
||||
if self.crawl_id:
|
||||
qs = qs.filter(snapshot__crawl_id=self.crawl_id)
|
||||
|
||||
if self.plugin:
|
||||
qs = qs.filter(plugin=self.plugin)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user