""" Hook discovery and execution system for ArchiveBox plugins. Hooks are standalone scripts that run as separate processes and communicate with ArchiveBox via CLI arguments and stdout JSON output. This keeps the plugin system simple and language-agnostic. Directory structure: archivebox/plugins//on___. (built-in) data/plugins//on___. (user) Hook contract: Input: --url= (and other --key=value args) Output: JSON to stdout, files to $PWD Exit: 0 = success, non-zero = failure Execution order: - Extractors run sequentially within each Snapshot (ordered by numeric prefix) - Multiple Snapshots can process in parallel - Failed extractors don't block subsequent extractors Dependency handling: Extractors that depend on other extractors' output should check at runtime: ```python # Example: screenshot extractor depends on chrome_session chrome_session_dir = Path(os.environ.get('SNAPSHOT_DIR', '.')) / 'chrome_session' if not (chrome_session_dir / 'session.json').exists(): print('{"status": "skipped", "output": "chrome_session not available"}') sys.exit(1) # Exit non-zero so it gets retried later ``` On retry (Snapshot.retry_failed_archiveresults()): - Only FAILED/SKIPPED extractors reset to queued (SUCCEEDED stays) - Run in order again - If dependencies now succeed, dependents can run API (all hook logic lives here): discover_hooks(event) -> List[Path] Find hook scripts run_hook(script, ...) -> HookResult Execute a hook script run_hooks(event, ...) -> List[HookResult] Run all hooks for an event """ __package__ = 'archivebox' import os import json import time import subprocess from pathlib import Path from typing import List, Dict, Any, Optional, TypedDict from django.conf import settings from django.utils import timezone # Plugin directories BUILTIN_PLUGINS_DIR = Path(__file__).parent / 'plugins' USER_PLUGINS_DIR = Path(getattr(settings, 'DATA_DIR', Path.cwd())) / 'plugins' class HookResult(TypedDict, total=False): """Raw result from run_hook().""" returncode: int stdout: str stderr: str output_json: Optional[Dict[str, Any]] output_files: List[str] duration_ms: int hook: str def discover_hooks(event_name: str) -> List[Path]: """ Find all hook scripts matching on_{event_name}__*.{sh,py,js} pattern. Searches both built-in and user plugin directories. Returns scripts sorted alphabetically by filename for deterministic execution order. Hook naming convention uses numeric prefixes to control order: on_Snapshot__10_title.py # runs first on_Snapshot__15_singlefile.py # runs second on_Snapshot__26_readability.py # runs later (depends on singlefile) Example: discover_hooks('Snapshot') # Returns: [Path('.../on_Snapshot__10_title.py'), Path('.../on_Snapshot__15_singlefile.py'), ...] """ hooks = [] for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR): if not base_dir.exists(): continue # Search for hook scripts in all subdirectories for ext in ('sh', 'py', 'js'): pattern = f'*/on_{event_name}__*.{ext}' hooks.extend(base_dir.glob(pattern)) # Also check for hooks directly in the plugins directory pattern_direct = f'on_{event_name}__*.{ext}' hooks.extend(base_dir.glob(pattern_direct)) # Sort by filename (not full path) to ensure numeric prefix ordering works # e.g., on_Snapshot__10_title.py sorts before on_Snapshot__26_readability.py return sorted(set(hooks), key=lambda p: p.name) def discover_all_hooks() -> Dict[str, List[Path]]: """ Discover all hooks organized by event name. Returns a dict mapping event names to lists of hook script paths. """ hooks_by_event: Dict[str, List[Path]] = {} for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR): if not base_dir.exists(): continue for ext in ('sh', 'py', 'js'): for hook_path in base_dir.glob(f'*/on_*__*.{ext}'): # Extract event name from filename: on_EventName__hook_name.ext filename = hook_path.stem # on_EventName__hook_name if filename.startswith('on_') and '__' in filename: event_name = filename[3:].split('__')[0] # EventName if event_name not in hooks_by_event: hooks_by_event[event_name] = [] hooks_by_event[event_name].append(hook_path) # Sort hooks within each event for event_name in hooks_by_event: hooks_by_event[event_name] = sorted(set(hooks_by_event[event_name]), key=lambda p: p.name) return hooks_by_event def run_hook( script: Path, output_dir: Path, timeout: int = 300, config_objects: Optional[List[Any]] = None, **kwargs: Any ) -> HookResult: """ Execute a hook script with the given arguments. This is the low-level hook executor. For running extractors with proper metadata handling, use call_extractor() instead. Config is passed to hooks via environment variables with this priority: 1. Plugin schema defaults (config.json) 2. Config file (ArchiveBox.conf) 3. Environment variables 4. Machine.config (auto-included, lowest override priority) 5. config_objects (in order - later objects override earlier ones) Args: script: Path to the hook script (.sh, .py, or .js) output_dir: Working directory for the script (where output files go) timeout: Maximum execution time in seconds config_objects: Optional list of objects with .config JSON fields (e.g., [crawl, snapshot] - later items have higher priority) **kwargs: Arguments passed to the script as --key=value Returns: HookResult with 'returncode', 'stdout', 'stderr', 'output_json', 'output_files', 'duration_ms' """ import time start_time = time.time() # Auto-include Machine.config at the start (lowest priority among config_objects) from machine.models import Machine machine = Machine.current() all_config_objects = [machine] + list(config_objects or []) if not script.exists(): return HookResult( returncode=1, stdout='', stderr=f'Hook script not found: {script}', output_json=None, output_files=[], duration_ms=0, hook=str(script), ) # Determine the interpreter based on file extension ext = script.suffix.lower() if ext == '.sh': cmd = ['bash', str(script)] elif ext == '.py': cmd = ['python3', str(script)] elif ext == '.js': cmd = ['node', str(script)] else: # Try to execute directly (assumes shebang) cmd = [str(script)] # Build CLI arguments from kwargs for key, value in kwargs.items(): # Skip keys that start with underscore (internal parameters) if key.startswith('_'): continue arg_key = f'--{key.replace("_", "-")}' if isinstance(value, bool): if value: cmd.append(arg_key) elif value is not None and value != '': # JSON-encode complex values, use str for simple ones # Skip empty strings to avoid --key= which breaks argument parsers if isinstance(value, (dict, list)): cmd.append(f'{arg_key}={json.dumps(value)}') else: # Ensure value is converted to string and strip whitespace str_value = str(value).strip() if str_value: # Only add if non-empty after stripping cmd.append(f'{arg_key}={str_value}') # Set up environment with base paths env = os.environ.copy() env['DATA_DIR'] = str(getattr(settings, 'DATA_DIR', Path.cwd())) env['ARCHIVE_DIR'] = str(getattr(settings, 'ARCHIVE_DIR', Path.cwd() / 'archive')) env.setdefault('MACHINE_ID', getattr(settings, 'MACHINE_ID', '') or os.environ.get('MACHINE_ID', '')) # Build overrides from any objects with .config fields (in order, later overrides earlier) # all_config_objects includes Machine at the start, then any passed config_objects overrides = {} for obj in all_config_objects: if obj and hasattr(obj, 'config') and obj.config: overrides.update(obj.config) # Get plugin config from JSON schemas with hierarchy resolution # This merges: schema defaults -> config file -> env vars -> object config overrides plugin_config = get_flat_plugin_config(overrides=overrides if overrides else None) export_plugin_config_to_env(plugin_config, env) # Also pass core config values that aren't in plugin schemas yet # These are legacy values that may still be needed from archivebox import config env.setdefault('CHROME_BINARY', str(getattr(config, 'CHROME_BINARY', ''))) env.setdefault('WGET_BINARY', str(getattr(config, 'WGET_BINARY', ''))) env.setdefault('CURL_BINARY', str(getattr(config, 'CURL_BINARY', ''))) env.setdefault('GIT_BINARY', str(getattr(config, 'GIT_BINARY', ''))) env.setdefault('YOUTUBEDL_BINARY', str(getattr(config, 'YOUTUBEDL_BINARY', ''))) env.setdefault('SINGLEFILE_BINARY', str(getattr(config, 'SINGLEFILE_BINARY', ''))) env.setdefault('READABILITY_BINARY', str(getattr(config, 'READABILITY_BINARY', ''))) env.setdefault('MERCURY_BINARY', str(getattr(config, 'MERCURY_BINARY', ''))) env.setdefault('NODE_BINARY', str(getattr(config, 'NODE_BINARY', ''))) env.setdefault('TIMEOUT', str(getattr(config, 'TIMEOUT', 60))) env.setdefault('CHECK_SSL_VALIDITY', str(getattr(config, 'CHECK_SSL_VALIDITY', True))) env.setdefault('USER_AGENT', str(getattr(config, 'USER_AGENT', ''))) env.setdefault('RESOLUTION', str(getattr(config, 'RESOLUTION', ''))) # Pass SEARCH_BACKEND_ENGINE from new-style config try: from archivebox.config.configset import get_config search_config = get_config() env.setdefault('SEARCH_BACKEND_ENGINE', str(search_config.get('SEARCH_BACKEND_ENGINE', 'ripgrep'))) except Exception: env.setdefault('SEARCH_BACKEND_ENGINE', 'ripgrep') # Create output directory if needed output_dir.mkdir(parents=True, exist_ok=True) # Capture files before execution to detect new output files_before = set(output_dir.rglob('*')) if output_dir.exists() else set() # Detect if this is a background hook (long-running daemon) is_background = '__background' in script.stem # Set up output files for ALL hooks (useful for debugging) stdout_file = output_dir / 'stdout.log' stderr_file = output_dir / 'stderr.log' pid_file = output_dir / 'hook.pid' try: # Open log files for writing with open(stdout_file, 'w') as out, open(stderr_file, 'w') as err: process = subprocess.Popen( cmd, cwd=str(output_dir), stdout=out, stderr=err, env=env, ) # Write PID for all hooks (useful for debugging/cleanup) pid_file.write_text(str(process.pid)) if is_background: # Background hook - return None immediately, don't wait # Process continues running, writing to stdout.log # ArchiveResult will poll for completion later return None # Normal hook - wait for completion with timeout try: returncode = process.wait(timeout=timeout) except subprocess.TimeoutExpired: process.kill() process.wait() # Clean up zombie duration_ms = int((time.time() - start_time) * 1000) return HookResult( returncode=-1, stdout='', stderr=f'Hook timed out after {timeout} seconds', output_json=None, output_files=[], duration_ms=duration_ms, hook=str(script), ) # Read output from files stdout = stdout_file.read_text() if stdout_file.exists() else '' stderr = stderr_file.read_text() if stderr_file.exists() else '' # Detect new files created by the hook files_after = set(output_dir.rglob('*')) if output_dir.exists() else set() new_files = [str(f.relative_to(output_dir)) for f in (files_after - files_before) if f.is_file()] # Exclude the log files themselves from new_files new_files = [f for f in new_files if f not in ('stdout.log', 'stderr.log', 'hook.pid')] # Parse RESULT_JSON from stdout output_json = None for line in stdout.splitlines(): if line.startswith('RESULT_JSON='): try: output_json = json.loads(line[len('RESULT_JSON='):]) break except json.JSONDecodeError: pass duration_ms = int((time.time() - start_time) * 1000) # Clean up log files on success (keep on failure for debugging) if returncode == 0: stdout_file.unlink(missing_ok=True) stderr_file.unlink(missing_ok=True) pid_file.unlink(missing_ok=True) return HookResult( returncode=returncode, stdout=stdout, stderr=stderr, output_json=output_json, output_files=new_files, duration_ms=duration_ms, hook=str(script), ) except Exception as e: duration_ms = int((time.time() - start_time) * 1000) return HookResult( returncode=-1, stdout='', stderr=f'Failed to run hook: {type(e).__name__}: {e}', output_json=None, output_files=[], duration_ms=duration_ms, hook=str(script), ) def collect_urls_from_extractors(snapshot_dir: Path) -> List[Dict[str, Any]]: """ Collect all urls.jsonl entries from extractor output subdirectories. Each parser extractor outputs urls.jsonl to its own subdir: snapshot_dir/parse_rss_urls/urls.jsonl snapshot_dir/parse_html_urls/urls.jsonl etc. This is not special handling - urls.jsonl is just a normal output file. This utility collects them all for the crawl system. """ urls = [] # Look in each immediate subdirectory for urls.jsonl if not snapshot_dir.exists(): return urls for subdir in snapshot_dir.iterdir(): if not subdir.is_dir(): continue urls_file = subdir / 'urls.jsonl' if not urls_file.exists(): continue try: with open(urls_file, 'r') as f: for line in f: line = line.strip() if line: try: entry = json.loads(line) if entry.get('url'): # Track which extractor found this URL entry['via_extractor'] = subdir.name urls.append(entry) except json.JSONDecodeError: continue except Exception: pass return urls def run_hooks( event_name: str, output_dir: Path, timeout: int = 300, stop_on_failure: bool = False, config_objects: Optional[List[Any]] = None, **kwargs: Any ) -> List[HookResult]: """ Run all hooks for a given event. Args: event_name: The event name to trigger (e.g., 'Snapshot__wget') output_dir: Working directory for hook scripts timeout: Maximum execution time per hook stop_on_failure: If True, stop executing hooks after first failure config_objects: Optional list of objects with .config JSON fields (e.g., [crawl, snapshot] - later items have higher priority) **kwargs: Arguments passed to each hook script Returns: List of results from each hook execution """ hooks = discover_hooks(event_name) results = [] for hook in hooks: result = run_hook(hook, output_dir, timeout=timeout, config_objects=config_objects, **kwargs) result['hook'] = str(hook) results.append(result) if stop_on_failure and result['returncode'] != 0: break return results def get_extractors() -> List[str]: """ Get list of available extractors by discovering Snapshot hooks. Returns extractor names (including numeric prefix) from hook filenames: on_Snapshot__10_title.py -> '10_title' on_Snapshot__26_readability.py -> '26_readability' Sorted alphabetically so numeric prefixes control execution order. """ extractors = [] for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR): if not base_dir.exists(): continue for ext in ('sh', 'py', 'js'): for hook_path in base_dir.glob(f'*/on_Snapshot__*.{ext}'): # Extract extractor name: on_Snapshot__26_readability.py -> 26_readability filename = hook_path.stem # on_Snapshot__26_readability if '__' in filename: extractor = filename.split('__', 1)[1] extractors.append(extractor) return sorted(set(extractors)) def get_parser_extractors() -> List[str]: """ Get list of parser extractors by discovering parse_*_urls hooks. Parser extractors discover URLs from source files and output urls.jsonl. Returns extractor names like: ['50_parse_html_urls', '51_parse_rss_urls', ...] """ return [e for e in get_extractors() if 'parse_' in e and '_urls' in e] def get_extractor_name(extractor: str) -> str: """ Get the base extractor name without numeric prefix. Examples: '10_title' -> 'title' '26_readability' -> 'readability' '50_parse_html_urls' -> 'parse_html_urls' """ # Split on first underscore after any leading digits parts = extractor.split('_', 1) if len(parts) == 2 and parts[0].isdigit(): return parts[1] return extractor def is_parser_extractor(extractor: str) -> bool: """Check if an extractor is a parser extractor (discovers URLs).""" name = get_extractor_name(extractor) return name.startswith('parse_') and name.endswith('_urls') # Precedence order for search indexing (lower number = higher priority) # Used to select which extractor's output to use for full-text search # Extractor names here should match the part after the numeric prefix # e.g., '31_readability' -> 'readability' ARCHIVE_METHODS_INDEXING_PRECEDENCE = [ ('readability', 1), ('mercury', 2), ('htmltotext', 3), ('singlefile', 4), ('dom', 5), ('wget', 6), ] def get_enabled_extractors(config: Optional[Dict] = None) -> List[str]: """ Get the list of enabled extractors based on config and available hooks. Checks for ENABLED_EXTRACTORS in config, falls back to discovering available hooks from the plugins directory. Returns extractor names sorted alphabetically (numeric prefix controls order). """ if config and 'ENABLED_EXTRACTORS' in config: return config['ENABLED_EXTRACTORS'] # Discover from hooks - this is the source of truth return get_extractors() def discover_plugins_that_provide_interface( module_name: str, required_attrs: List[str], plugin_prefix: Optional[str] = None, ) -> Dict[str, Any]: """ Discover plugins that provide a specific Python module with required interface. This enables dynamic plugin discovery for features like search backends, storage backends, etc. without hardcoding imports. Args: module_name: Name of the module to look for (e.g., 'search') required_attrs: List of attributes the module must have (e.g., ['search', 'flush']) plugin_prefix: Optional prefix to filter plugins (e.g., 'search_backend_') Returns: Dict mapping backend names to imported modules. Backend name is derived from plugin directory name minus the prefix. e.g., search_backend_sqlite -> 'sqlite' Example: backends = discover_plugins_that_provide_interface( module_name='search', required_attrs=['search', 'flush'], plugin_prefix='search_backend_', ) # Returns: {'sqlite': , 'sonic': , 'ripgrep': } """ import importlib.util backends = {} for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR): if not base_dir.exists(): continue for plugin_dir in base_dir.iterdir(): if not plugin_dir.is_dir(): continue plugin_name = plugin_dir.name # Filter by prefix if specified if plugin_prefix and not plugin_name.startswith(plugin_prefix): continue # Look for the module file module_path = plugin_dir / f'{module_name}.py' if not module_path.exists(): continue try: # Import the module dynamically spec = importlib.util.spec_from_file_location( f'archivebox.plugins.{plugin_name}.{module_name}', module_path ) if spec is None or spec.loader is None: continue module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Check for required attributes if not all(hasattr(module, attr) for attr in required_attrs): continue # Derive backend name from plugin directory name if plugin_prefix: backend_name = plugin_name[len(plugin_prefix):] else: backend_name = plugin_name backends[backend_name] = module except Exception: # Skip plugins that fail to import continue return backends def get_search_backends() -> Dict[str, Any]: """ Discover all available search backend plugins. Search backends must provide a search.py module with: - search(query: str) -> List[str] (returns snapshot IDs) - flush(snapshot_ids: Iterable[str]) -> None Returns: Dict mapping backend names to their modules. e.g., {'sqlite': , 'sonic': , 'ripgrep': } """ return discover_plugins_that_provide_interface( module_name='search', required_attrs=['search', 'flush'], plugin_prefix='search_backend_', ) def discover_plugin_configs() -> Dict[str, Dict[str, Any]]: """ Discover all plugin config.json schemas. Each plugin can define a config.json file with JSONSchema defining its configuration options. This function discovers and loads all such schemas. The config.json files use JSONSchema draft-07 with custom extensions: - x-fallback: Global config key to use as fallback - x-aliases: List of old/alternative config key names Returns: Dict mapping plugin names to their parsed JSONSchema configs. e.g., {'wget': {...schema...}, 'chrome_session': {...schema...}} Example config.json: { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": { "SAVE_WGET": {"type": "boolean", "default": true}, "WGET_TIMEOUT": {"type": "integer", "default": 60, "x-fallback": "TIMEOUT"} } } """ configs = {} for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR): if not base_dir.exists(): continue for plugin_dir in base_dir.iterdir(): if not plugin_dir.is_dir(): continue config_path = plugin_dir / 'config.json' if not config_path.exists(): continue try: with open(config_path, 'r') as f: schema = json.load(f) # Basic validation: must be an object with properties if not isinstance(schema, dict): continue if schema.get('type') != 'object': continue if 'properties' not in schema: continue configs[plugin_dir.name] = schema except (json.JSONDecodeError, OSError) as e: # Log warning but continue - malformed config shouldn't break discovery import sys print(f"Warning: Failed to load config.json from {plugin_dir.name}: {e}", file=sys.stderr) continue return configs def get_merged_config_schema() -> Dict[str, Any]: """ Get a merged JSONSchema combining all plugin config schemas. This creates a single schema that can validate all plugin config keys. Useful for validating the complete configuration at startup. Returns: Combined JSONSchema with all plugin properties merged. """ plugin_configs = discover_plugin_configs() merged_properties = {} for plugin_name, schema in plugin_configs.items(): properties = schema.get('properties', {}) for key, prop_schema in properties.items(): if key in merged_properties: # Key already exists from another plugin - log warning but keep first import sys print(f"Warning: Config key '{key}' defined in multiple plugins, using first definition", file=sys.stderr) continue merged_properties[key] = prop_schema return { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, # Allow unknown keys (core config, etc.) "properties": merged_properties, } def get_config_defaults_from_plugins() -> Dict[str, Any]: """ Get default values for all plugin config options. Returns: Dict mapping config keys to their default values. e.g., {'SAVE_WGET': True, 'WGET_TIMEOUT': 60, ...} """ plugin_configs = discover_plugin_configs() defaults = {} for plugin_name, schema in plugin_configs.items(): properties = schema.get('properties', {}) for key, prop_schema in properties.items(): if 'default' in prop_schema: defaults[key] = prop_schema['default'] return defaults def resolve_config_value( key: str, prop_schema: Dict[str, Any], env_vars: Dict[str, str], config_file: Dict[str, str], overrides: Optional[Dict[str, Any]] = None, ) -> Any: """ Resolve a single config value following the hierarchy and schema rules. Resolution order (later overrides earlier): 1. Schema default 2. x-fallback (global config key) 3. Config file (ArchiveBox.conf) 4. Environment variables (including x-aliases) 5. Explicit overrides (User/Crawl/Snapshot config) Args: key: Config key name (e.g., 'WGET_TIMEOUT') prop_schema: JSONSchema property definition for this key env_vars: Environment variables dict config_file: Config file values dict overrides: Optional override values (from User/Crawl/Snapshot) Returns: Resolved value with appropriate type coercion. """ value = None prop_type = prop_schema.get('type', 'string') # 1. Start with schema default if 'default' in prop_schema: value = prop_schema['default'] # 2. Check x-fallback (global config key) fallback_key = prop_schema.get('x-fallback') if fallback_key: if fallback_key in env_vars: value = env_vars[fallback_key] elif fallback_key in config_file: value = config_file[fallback_key] # 3. Check config file for main key if key in config_file: value = config_file[key] # 4. Check environment variables (main key and aliases) keys_to_check = [key] + prop_schema.get('x-aliases', []) for check_key in keys_to_check: if check_key in env_vars: value = env_vars[check_key] break # 5. Apply explicit overrides if overrides and key in overrides: value = overrides[key] # Type coercion for env var strings if value is not None and isinstance(value, str): value = coerce_config_value(value, prop_type, prop_schema) return value def coerce_config_value(value: str, prop_type: str, prop_schema: Dict[str, Any]) -> Any: """ Coerce a string value to the appropriate type based on schema. Args: value: String value to coerce prop_type: JSONSchema type ('boolean', 'integer', 'number', 'array', 'string') prop_schema: Full property schema (for array item types, etc.) Returns: Coerced value of appropriate type. """ if prop_type == 'boolean': return value.lower() in ('true', '1', 'yes', 'on') elif prop_type == 'integer': try: return int(value) except ValueError: return prop_schema.get('default', 0) elif prop_type == 'number': try: return float(value) except ValueError: return prop_schema.get('default', 0.0) elif prop_type == 'array': # Try JSON parse first, fall back to comma-separated try: return json.loads(value) except json.JSONDecodeError: return [v.strip() for v in value.split(',') if v.strip()] else: return value def get_flat_plugin_config( env_vars: Optional[Dict[str, str]] = None, config_file: Optional[Dict[str, str]] = None, overrides: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Get all plugin config values resolved according to hierarchy. This is the main function for getting plugin configuration. It discovers all plugin schemas and resolves each config key. Args: env_vars: Environment variables (defaults to os.environ) config_file: Config file values (from ArchiveBox.conf) overrides: Override values (from User/Crawl/Snapshot config fields) Returns: Flat dict of all resolved config values. e.g., {'SAVE_WGET': True, 'WGET_TIMEOUT': 60, ...} """ if env_vars is None: env_vars = dict(os.environ) if config_file is None: config_file = {} plugin_configs = discover_plugin_configs() flat_config = {} for plugin_name, schema in plugin_configs.items(): properties = schema.get('properties', {}) for key, prop_schema in properties.items(): flat_config[key] = resolve_config_value( key, prop_schema, env_vars, config_file, overrides ) return flat_config def export_plugin_config_to_env( config: Dict[str, Any], env: Optional[Dict[str, str]] = None, ) -> Dict[str, str]: """ Export plugin config values to environment variable format. Converts all values to strings suitable for subprocess environment. Arrays are JSON-encoded. Args: config: Flat config dict from get_flat_plugin_config() env: Optional existing env dict to update (creates new if None) Returns: Environment dict with config values as strings. """ if env is None: env = {} for key, value in config.items(): if value is None: continue elif isinstance(value, bool): env[key] = 'true' if value else 'false' elif isinstance(value, (list, dict)): env[key] = json.dumps(value) else: env[key] = str(value) return env # ============================================================================= # Plugin Template Discovery # ============================================================================= # # Plugins can provide custom templates for rendering their output in the UI. # Templates are discovered by filename convention inside each plugin's templates/ dir: # # archivebox/plugins// # templates/ # icon.html # Icon for admin table view (small inline HTML) # thumbnail.html # Preview thumbnail for snapshot cards # embed.html # Iframe embed content for main preview # fullscreen.html # Fullscreen view template # # Template context variables available: # {{ result }} - ArchiveResult object # {{ snapshot }} - Parent Snapshot object # {{ output_path }} - Path to output file/dir relative to snapshot dir # {{ extractor }} - Extractor name (e.g., 'screenshot', 'singlefile') # # Default templates used when plugin doesn't provide one DEFAULT_TEMPLATES = { 'icon': '''{{ icon }}''', 'thumbnail': ''' {{ extractor }} output ''', 'embed': ''' ''', 'fullscreen': ''' ''', } # Default icons for known extractors (emoji or short HTML) DEFAULT_EXTRACTOR_ICONS = { 'screenshot': '📷', 'pdf': '📄', 'singlefile': '📦', 'dom': '🌐', 'wget': '📥', 'media': '🎬', 'git': '📂', 'readability': '📖', 'mercury': '☿️', 'favicon': '⭐', 'title': '📝', 'headers': '📋', 'archive_org': '🏛️', 'htmltotext': '📃', 'warc': '🗄️', } def get_plugin_template(extractor: str, template_name: str) -> Optional[str]: """ Get a plugin template by extractor name and template type. Args: extractor: Extractor name (e.g., 'screenshot', '15_singlefile') template_name: One of 'icon', 'thumbnail', 'embed', 'fullscreen' Returns: Template content as string, or None if not found. """ base_name = get_extractor_name(extractor) for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR): if not base_dir.exists(): continue # Look for plugin directory matching extractor name for plugin_dir in base_dir.iterdir(): if not plugin_dir.is_dir(): continue # Match by directory name (exact or partial) if plugin_dir.name == base_name or plugin_dir.name.endswith(f'_{base_name}'): template_path = plugin_dir / 'templates' / f'{template_name}.html' if template_path.exists(): return template_path.read_text() return None def get_extractor_template(extractor: str, template_name: str) -> str: """ Get template for an extractor, falling back to defaults. Args: extractor: Extractor name (e.g., 'screenshot', '15_singlefile') template_name: One of 'icon', 'thumbnail', 'embed', 'fullscreen' Returns: Template content as string (plugin template or default). """ # Try plugin-provided template first template = get_plugin_template(extractor, template_name) if template: return template # Fall back to default template return DEFAULT_TEMPLATES.get(template_name, '') def get_extractor_icon(extractor: str) -> str: """ Get the icon for an extractor. First checks for plugin-provided icon.html template, then falls back to DEFAULT_EXTRACTOR_ICONS. Args: extractor: Extractor name (e.g., 'screenshot', '15_singlefile') Returns: Icon HTML/emoji string. """ base_name = get_extractor_name(extractor) # Try plugin-provided icon template icon_template = get_plugin_template(extractor, 'icon') if icon_template: return icon_template.strip() # Fall back to default icon return DEFAULT_EXTRACTOR_ICONS.get(base_name, '📁') def get_all_extractor_icons() -> Dict[str, str]: """ Get icons for all discovered extractors. Returns: Dict mapping extractor base names to their icons. """ icons = {} for extractor in get_extractors(): base_name = get_extractor_name(extractor) icons[base_name] = get_extractor_icon(extractor) return icons def discover_plugin_templates() -> Dict[str, Dict[str, str]]: """ Discover all plugin templates organized by extractor. Returns: Dict mapping extractor names to dicts of template_name -> template_path. e.g., {'screenshot': {'icon': '/path/to/icon.html', 'thumbnail': '/path/to/thumbnail.html'}} """ templates: Dict[str, Dict[str, str]] = {} for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR): if not base_dir.exists(): continue for plugin_dir in base_dir.iterdir(): if not plugin_dir.is_dir(): continue templates_dir = plugin_dir / 'templates' if not templates_dir.exists(): continue plugin_templates = {} for template_file in templates_dir.glob('*.html'): template_name = template_file.stem # icon, thumbnail, embed, fullscreen plugin_templates[template_name] = str(template_file) if plugin_templates: templates[plugin_dir.name] = plugin_templates return templates