add overrides options to binproviders

This commit is contained in:
Nick Sweeting
2025-12-26 20:39:56 -08:00
parent 9bc5d99488
commit 2f81c0cc76
16 changed files with 546 additions and 599 deletions

350
TODOS.md
View File

@@ -1,3 +1,353 @@
# ArchiveBox Hook Architecture
## Core Design Pattern
**CRITICAL**: All hooks must follow this unified architecture. This pattern applies to ALL models: Crawl, Dependency, Snapshot, ArchiveResult, etc.
### The Flow
```
1. Model.run() discovers and executes hooks
2. Hooks emit JSONL to stdout
3. Model.run() parses JSONL and creates DB records
4. New DB records trigger their own Model.run()
5. Cycle repeats
```
**Example Flow:**
```
Crawl.run()
→ runs on_Crawl__* hooks
→ hooks emit JSONL: {type: 'Dependency', bin_name: 'wget', ...}
→ Crawl.run() creates Dependency record in DB
→ Dependency.run() is called automatically
→ runs on_Dependency__* hooks
→ hooks emit JSONL: {type: 'InstalledBinary', name: 'wget', ...}
→ Dependency.run() creates InstalledBinary record in DB
```
### Golden Rules
1. **Model.run() executes hooks directly** - No helper methods in statemachines. Statemachine just calls Model.run().
2. **Hooks emit JSONL** - Any line starting with `{` that has a `type` field creates/updates that model.
```python
print(json.dumps({'type': 'Dependency', 'bin_name': 'wget', ...}))
print(json.dumps({'type': 'InstalledBinary', 'name': 'wget', ...}))
```
3. **JSONL fields = Model fields** - JSONL keys must match Django model field names exactly. No transformation.
```python
# ✅ CORRECT - matches Dependency model
{'type': 'Dependency', 'bin_name': 'wget', 'bin_providers': 'apt,brew', 'overrides': {...}}
# ❌ WRONG - uses different field names
{'type': 'Dependency', 'name': 'wget', 'providers': 'apt,brew', 'custom_cmds': {...}}
```
4. **No hardcoding** - Never hardcode binary names, provider names, or anything else. Use discovery.
```python
# ✅ CORRECT - discovers all on_Dependency hooks dynamically
run_hooks(event_name='Dependency', ...)
# ❌ WRONG - hardcodes provider list
for provider in ['pip', 'npm', 'apt', 'brew']:
run_hooks(event_name=f'Dependency__install_using_{provider}_provider', ...)
```
5. **Trust abx-pkg** - Never use `shutil.which()`, `subprocess.run([bin, '--version'])`, or manual hash calculation.
```python
# ✅ CORRECT - abx-pkg handles everything
from abx_pkg import Binary, PipProvider, EnvProvider
binary = Binary(name='wget', binproviders=[PipProvider(), EnvProvider()]).load()
# binary.abspath, binary.version, binary.sha256 are all populated automatically
# ❌ WRONG - manual detection
abspath = shutil.which('wget')
version = subprocess.run(['wget', '--version'], ...).stdout
```
6. **Hooks check if they can handle requests** - Each hook decides internally if it can handle the dependency.
```python
# In on_Dependency__install_using_pip_provider.py
if bin_providers != '*' and 'pip' not in bin_providers.split(','):
sys.exit(0) # Can't handle this, exit cleanly
```
7. **Minimal transformation** - Statemachine/Model.run() should do minimal JSONL parsing, just create records.
```python
# ✅ CORRECT - simple JSONL parsing
obj = json.loads(line)
if obj.get('type') == 'Dependency':
Dependency.objects.create(**obj)
# ❌ WRONG - complex transformation logic
if obj.get('type') == 'Dependency':
dep = Dependency.objects.create(name=obj['bin_name']) # renaming fields
dep.custom_commands = transform_overrides(obj['overrides']) # transforming data
```
### Pattern Consistency
Follow the same pattern as `ArchiveResult.run()` (archivebox/core/models.py:1030):
```python
def run(self):
"""Execute this Model by running hooks and processing JSONL output."""
# 1. Discover hooks
hook = discover_hook_for_model(self)
# 2. Run hook
results = run_hook(hook, output_dir=..., ...)
# 3. Parse JSONL and update self
for line in results['stdout'].splitlines():
obj = json.loads(line)
if obj.get('type') == self.__class__.__name__:
self.status = obj.get('status')
self.output = obj.get('output')
# ... apply other fields
# 4. Create side-effect records
for line in results['stdout'].splitlines():
obj = json.loads(line)
if obj.get('type') != self.__class__.__name__:
create_record_from_jsonl(obj) # Creates InstalledBinary, etc.
self.save()
```
### Validation Hook Pattern (on_Crawl__00_validate_*.py)
**Purpose**: Check if binary exists, emit Dependency if not found.
```python
#!/usr/bin/env python3
import sys
import json
def find_wget() -> dict | None:
"""Find wget binary using abx-pkg."""
try:
from abx_pkg import Binary, AptProvider, BrewProvider, EnvProvider
binary = Binary(name='wget', binproviders=[AptProvider(), BrewProvider(), EnvProvider()])
loaded = binary.load()
if loaded and loaded.abspath:
return {
'name': 'wget',
'abspath': str(loaded.abspath),
'version': str(loaded.version) if loaded.version else None,
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except Exception:
pass
return None
def main():
result = find_wget()
if result and result.get('abspath'):
# Binary found - emit InstalledBinary and Machine config
print(json.dumps({
'type': 'InstalledBinary',
'name': result['name'],
'abspath': result['abspath'],
'version': result['version'],
'sha256': result['sha256'],
'binprovider': result['binprovider'],
}))
print(json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/WGET_BINARY',
'value': result['abspath'],
}))
sys.exit(0)
else:
# Binary not found - emit Dependency
print(json.dumps({
'type': 'Dependency',
'bin_name': 'wget',
'bin_providers': 'apt,brew,env',
'overrides': {}, # Empty if no special install requirements
}))
print(f"wget binary not found", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
```
**Rules:**
- ✅ Use `Binary(...).load()` from abx-pkg - handles finding binary, version, hash automatically
- ✅ Emit `InstalledBinary` JSONL if found
- ✅ Emit `Dependency` JSONL if not found
- ✅ Use `overrides` field matching abx-pkg format: `{'pip': {'packages': ['pkg']}, 'apt': {'packages': ['pkg']}}`
- ❌ NEVER use `shutil.which()`, `subprocess.run()`, manual version detection, or hash calculation
- ❌ NEVER call package managers (apt, brew, pip, npm) directly
### Dependency Installation Pattern (on_Dependency__install_*.py)
**Purpose**: Install binary if not already installed.
```python
#!/usr/bin/env python3
import json
import sys
import rich_click as click
from abx_pkg import Binary, PipProvider
@click.command()
@click.option('--dependency-id', required=True)
@click.option('--bin-name', required=True)
@click.option('--bin-providers', default='*')
@click.option('--overrides', default=None, help="JSON-encoded overrides dict")
def main(dependency_id: str, bin_name: str, bin_providers: str, overrides: str | None):
"""Install binary using pip."""
# Check if this hook can handle this dependency
if bin_providers != '*' and 'pip' not in bin_providers.split(','):
click.echo(f"pip provider not allowed for {bin_name}", err=True)
sys.exit(0) # Exit cleanly - not an error, just can't handle
# Parse overrides
overrides_dict = None
if overrides:
try:
full_overrides = json.loads(overrides)
overrides_dict = full_overrides.get('pip', {}) # Extract pip section
except json.JSONDecodeError:
pass
# Install using abx-pkg
provider = PipProvider()
try:
binary = Binary(name=bin_name, binproviders=[provider], overrides=overrides_dict or {}).install()
except Exception as e:
click.echo(f"pip install failed: {e}", err=True)
sys.exit(1)
if not binary.abspath:
sys.exit(1)
# Emit InstalledBinary JSONL
print(json.dumps({
'type': 'InstalledBinary',
'name': bin_name,
'abspath': str(binary.abspath),
'version': str(binary.version) if binary.version else '',
'sha256': binary.sha256 or '',
'binprovider': 'pip',
'dependency_id': dependency_id,
}))
sys.exit(0)
if __name__ == '__main__':
main()
```
**Rules:**
- ✅ Check `bin_providers` parameter - exit cleanly (code 0) if can't handle
- ✅ Parse `overrides` parameter as full dict, extract your provider's section
- ✅ Use `Binary(...).install()` from abx-pkg - handles actual installation
- ✅ Emit `InstalledBinary` JSONL on success
- ❌ NEVER hardcode provider names in Model.run() or anywhere else
- ❌ NEVER skip the bin_providers check
### Model.run() Pattern
```python
class Dependency(models.Model):
def run(self):
"""Execute dependency installation by running all on_Dependency hooks."""
import json
from pathlib import Path
from django.conf import settings
# Check if already installed
if self.is_installed:
return self.installed_binaries.first()
from archivebox.hooks import run_hooks
# Create output directory
DATA_DIR = getattr(settings, 'DATA_DIR', Path.cwd())
output_dir = Path(DATA_DIR) / 'tmp' / f'dependency_{self.id}'
output_dir.mkdir(parents=True, exist_ok=True)
# Build kwargs for hooks
hook_kwargs = {
'dependency_id': str(self.id),
'bin_name': self.bin_name,
'bin_providers': self.bin_providers,
'overrides': json.dumps(self.overrides) if self.overrides else None,
}
# Run ALL on_Dependency hooks - each decides if it can handle this
results = run_hooks(
event_name='Dependency',
output_dir=output_dir,
timeout=600,
**hook_kwargs
)
# Process results - parse JSONL and create InstalledBinary records
for result in results:
if result['returncode'] != 0:
continue
for line in result['stdout'].strip().split('\n'):
if not line.strip():
continue
try:
obj = json.loads(line)
if obj.get('type') == 'InstalledBinary':
# Create InstalledBinary record - fields match JSONL exactly
if not obj.get('name') or not obj.get('abspath') or not obj.get('version'):
continue
machine = Machine.current()
installed_binary, _ = InstalledBinary.objects.update_or_create(
machine=machine,
name=obj['name'],
defaults={
'abspath': obj['abspath'],
'version': obj['version'],
'sha256': obj.get('sha256') or '',
'binprovider': obj.get('binprovider') or 'env',
'dependency': self,
}
)
if self.is_installed:
return installed_binary
except json.JSONDecodeError:
continue
return None
```
**Rules:**
- ✅ Use `run_hooks(event_name='ModelName', ...)` with model name
- ✅ Pass all relevant data as kwargs (will become --cli-args for hooks)
- ✅ Parse JSONL output directly - each line is a potential record
- ✅ Create records using JSONL fields directly - no transformation
- ✅ Let hooks decide if they can handle the request
- ❌ NEVER hardcode hook names or provider lists
- ❌ NEVER create helper methods for hook execution - just call run_hooks()
- ❌ NEVER transform JSONL data - use it as-is
---
# Background Hooks Implementation Plan
## Overview

View File

@@ -185,9 +185,26 @@ class CrawlMachine(StateMachine, strict_states=True):
machine.save(update_fields=['config'])
elif obj_type == 'Dependency':
# Dependency request - could trigger installation
# For now just log it (installation hooks would be separate)
print(f'[yellow]Dependency requested: {obj.get("bin_name")}[/yellow]')
# Create Dependency record from JSONL
from machine.models import Dependency
bin_name = obj.get('bin_name')
if not bin_name:
continue
# Create or get existing dependency
dependency, created = Dependency.objects.get_or_create(
bin_name=bin_name,
defaults={
'bin_providers': obj.get('bin_providers', '*'),
'overrides': obj.get('overrides', {}),
'config': obj.get('config', {}),
}
)
# Run dependency installation if not already installed
if not dependency.is_installed:
dependency.run()
except json.JSONDecodeError:
# Not JSON, skip

View File

@@ -1,65 +0,0 @@
# Generated by Django 6.0 on 2025-12-25 09:34
import django.db.models.deletion
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('machine', '0001_squashed'),
]
operations = [
migrations.AlterField(
model_name='dependency',
name='bin_name',
field=models.CharField(db_index=True, help_text='Binary executable name (e.g., wget, yt-dlp, chromium)', max_length=63, unique=True),
),
migrations.AlterField(
model_name='dependency',
name='bin_providers',
field=models.CharField(default='*', help_text='Comma-separated list of allowed providers: apt,brew,pip,npm,gem,nix,custom or * for any', max_length=127),
),
migrations.AlterField(
model_name='dependency',
name='config',
field=models.JSONField(blank=True, default=dict, help_text='JSON map of env var config to use during install'),
),
migrations.AlterField(
model_name='dependency',
name='custom_cmds',
field=models.JSONField(blank=True, default=dict, help_text="JSON map of provider -> custom install command (e.g., {'apt': 'apt install -y wget'})"),
),
migrations.AlterField(
model_name='dependency',
name='id',
field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
migrations.AlterField(
model_name='installedbinary',
name='dependency',
field=models.ForeignKey(blank=True, help_text='The Dependency this binary satisfies', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='installedbinary_set', to='machine.dependency'),
),
migrations.AlterField(
model_name='installedbinary',
name='id',
field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
migrations.AlterField(
model_name='machine',
name='config',
field=models.JSONField(blank=True, default=dict, help_text='Machine-specific config overrides (e.g., resolved binary paths like WGET_BINARY)'),
),
migrations.AlterField(
model_name='machine',
name='id',
field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
migrations.AlterField(
model_name='networkinterface',
name='id',
field=models.UUIDField(default=uuid.uuid7, editable=False, primary_key=True, serialize=False, unique=True),
),
]

View File

@@ -0,0 +1,38 @@
# Generated manually on 2025-12-26
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('machine', '0001_squashed'),
]
operations = [
migrations.RenameField(
model_name='dependency',
old_name='custom_cmds',
new_name='overrides',
),
migrations.AlterField(
model_name='dependency',
name='bin_name',
field=models.CharField(db_index=True, help_text='Binary executable name (e.g., wget, yt-dlp, chromium)', max_length=63, unique=True),
),
migrations.AlterField(
model_name='dependency',
name='bin_providers',
field=models.CharField(default='*', help_text='Comma-separated list of allowed providers: apt,brew,pip,npm,gem,nix,custom or * for any', max_length=127),
),
migrations.AlterField(
model_name='dependency',
name='overrides',
field=models.JSONField(blank=True, default=dict, help_text="JSON map matching abx-pkg Binary.overrides format: {'pip': {'packages': ['pkg']}, 'apt': {'packages': ['pkg']}}"),
),
migrations.AlterField(
model_name='dependency',
name='config',
field=models.JSONField(blank=True, default=dict, help_text='JSON map of env var config to use during install'),
),
]

View File

@@ -109,13 +109,13 @@ class NetworkInterface(ModelWithHealthStats):
class DependencyManager(models.Manager):
def get_or_create_for_extractor(self, bin_name: str, bin_providers: str = '*', custom_cmds: dict = None, config: dict = None) -> 'Dependency':
def get_or_create_for_extractor(self, bin_name: str, bin_providers: str = '*', overrides: dict = None, config: dict = None) -> 'Dependency':
"""Get or create a Dependency for an extractor's binary."""
dependency, created = self.get_or_create(
bin_name=bin_name,
defaults={
'bin_providers': bin_providers,
'custom_cmds': custom_cmds or {},
'overrides': overrides or {},
'config': config or {},
}
)
@@ -132,11 +132,11 @@ class Dependency(models.Model):
Example:
Dependency.objects.get_or_create(
bin_name='wget',
bin_providers='apt,brew,nix,custom',
custom_cmds={
'apt': 'apt install -y --no-install-recommends wget',
'brew': 'brew install wget',
'custom': 'curl https://example.com/get-wget.sh | bash',
bin_providers='apt,brew,pip,env',
overrides={
'apt': {'packages': ['wget']},
'brew': {'packages': ['wget']},
'pip': {'packages': ['wget']},
}
)
"""
@@ -161,8 +161,8 @@ class Dependency(models.Model):
help_text="Binary executable name (e.g., wget, yt-dlp, chromium)")
bin_providers = models.CharField(max_length=127, default='*',
help_text="Comma-separated list of allowed providers: apt,brew,pip,npm,gem,nix,custom or * for any")
custom_cmds = models.JSONField(default=dict, blank=True,
help_text="JSON map of provider -> custom install command (e.g., {'apt': 'apt install -y wget'})")
overrides = models.JSONField(default=dict, blank=True,
help_text="JSON map matching abx-pkg Binary.overrides format: {'pip': {'packages': ['pkg']}, 'apt': {'packages': ['pkg']}}")
config = models.JSONField(default=dict, blank=True,
help_text="JSON map of env var config to use during install")
@@ -181,9 +181,9 @@ class Dependency(models.Model):
return True
return provider in self.bin_providers.split(',')
def get_install_cmd(self, provider: str) -> str | None:
"""Get the install command for a provider, or None for default."""
return self.custom_cmds.get(provider)
def get_overrides_for_provider(self, provider: str) -> dict | None:
"""Get the overrides for a provider, or None if not specified."""
return self.overrides.get(provider)
@property
def installed_binaries(self):
@@ -195,6 +195,85 @@ class Dependency(models.Model):
"""Check if at least one valid InstalledBinary exists for this dependency."""
return self.installed_binaries.filter(abspath__isnull=False).exclude(abspath='').exists()
def run(self):
"""
Execute dependency installation by running all on_Dependency hooks.
Each hook checks if it can handle this dependency and installs if possible.
Returns the InstalledBinary record on success, None on failure.
"""
import json
from pathlib import Path
from django.conf import settings
# Check if already installed
if self.is_installed:
return self.installed_binaries.first()
# Import here to avoid circular dependency
from archivebox.hooks import run_hooks
# Create output directory
DATA_DIR = getattr(settings, 'DATA_DIR', Path.cwd())
output_dir = Path(DATA_DIR) / 'tmp' / f'dependency_{self.id}'
output_dir.mkdir(parents=True, exist_ok=True)
# Build kwargs for hooks - pass overrides as JSON string
hook_kwargs = {
'dependency_id': str(self.id),
'bin_name': self.bin_name,
'bin_providers': self.bin_providers,
'overrides': json.dumps(self.overrides) if self.overrides else None,
}
# Run all on_Dependency hooks - each decides if it can handle this
results = run_hooks(
event_name='Dependency',
output_dir=output_dir,
timeout=600,
**hook_kwargs
)
# Process results - parse JSONL and create InstalledBinary records
for result in results:
if result['returncode'] != 0:
continue
# Parse JSONL output
for line in result['stdout'].strip().split('\n'):
if not line.strip():
continue
try:
obj = json.loads(line)
if obj.get('type') == 'InstalledBinary':
# Create InstalledBinary record
if not obj.get('name') or not obj.get('abspath') or not obj.get('version'):
continue
machine = Machine.current()
installed_binary, _ = InstalledBinary.objects.update_or_create(
machine=machine,
name=obj['name'],
defaults={
'abspath': obj['abspath'],
'version': obj['version'],
'sha256': obj.get('sha256') or '',
'binprovider': obj.get('binprovider') or 'env',
'dependency': self,
}
)
# Success! Return the installed binary
if self.is_installed:
return installed_binary
except json.JSONDecodeError:
continue
# Failed to install with any hook
return None
class InstalledBinaryManager(models.Manager):
def get_from_db_or_cache(self, name: str, abspath: str = '', version: str = '', sha256: str = '', binprovider: str = 'env') -> 'InstalledBinary':

View File

@@ -6,103 +6,29 @@ Runs at crawl start to verify Chrome is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
# Common Chrome/Chromium binary names and paths
CHROME_NAMES = [
'chromium',
'chromium-browser',
'google-chrome',
'google-chrome-stable',
'chrome',
]
CHROME_PATHS = [
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
'/Applications/Chromium.app/Contents/MacOS/Chromium',
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium',
'/usr/bin/chromium-browser',
'/snap/bin/chromium',
'/opt/google/chrome/chrome',
]
def get_binary_version(abspath: str) -> str | None:
"""Get version string from Chrome binary."""
try:
result = subprocess.run(
[abspath, '--version'],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout:
# Chrome version string: "Google Chrome 120.0.6099.109" or "Chromium 120.0.6099.109"
first_line = result.stdout.strip().split('\n')[0]
parts = first_line.split()
# Find version number (looks like 120.0.6099.109)
for part in parts:
if '.' in part and part[0].isdigit():
return part
return first_line[:32]
except Exception:
pass
return None
def get_binary_hash(abspath: str) -> str | None:
"""Get SHA256 hash of binary."""
try:
with open(abspath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
def find_chrome() -> dict | None:
"""Find Chrome/Chromium binary."""
# Check env var first
env_path = os.environ.get('CHROME_BINARY', '')
if env_path and Path(env_path).is_file():
return {
'name': 'chrome',
'abspath': env_path,
'version': get_binary_version(env_path),
'sha256': get_binary_hash(env_path),
'binprovider': 'env',
}
try:
from abx_pkg import Binary, AptProvider, BrewProvider, EnvProvider
# Try shutil.which for various names
for name in CHROME_NAMES:
abspath = shutil.which(name)
if abspath:
return {
'name': 'chrome',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'env',
}
# Check common paths
for path in CHROME_PATHS:
if Path(path).is_file():
return {
'name': 'chrome',
'abspath': path,
'version': get_binary_version(path),
'sha256': get_binary_hash(path),
'binprovider': 'env',
}
# Try common Chrome/Chromium binary names
for name in ['google-chrome', 'chromium', 'chromium-browser', 'google-chrome-stable', 'chrome']:
binary = Binary(name=name, binproviders=[AptProvider(), BrewProvider(), EnvProvider()])
loaded = binary.load()
if loaded and loaded.abspath:
return {
'name': 'chrome',
'abspath': str(loaded.abspath),
'version': str(loaded.version) if loaded.version else None,
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except Exception:
pass
return None

View File

@@ -6,13 +6,8 @@ Runs at crawl start to verify forum-dl binary is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def find_forumdl() -> dict | None:
@@ -30,22 +25,9 @@ def find_forumdl() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('forum-dl') or os.environ.get('FORUMDL_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'forum-dl',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'env',
}
return None

View File

@@ -6,39 +6,8 @@ Runs at crawl start to verify gallery-dl binary is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def get_binary_version(abspath: str, version_flag: str = '--version') -> str | None:
"""Get version string from binary."""
try:
result = subprocess.run(
[abspath, version_flag],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout:
first_line = result.stdout.strip().split('\n')[0]
return first_line[:64]
except Exception:
pass
return None
def get_binary_hash(abspath: str) -> str | None:
"""Get SHA256 hash of binary."""
try:
with open(abspath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
def find_gallerydl() -> dict | None:
@@ -46,11 +15,7 @@ def find_gallerydl() -> dict | None:
try:
from abx_pkg import Binary, PipProvider, EnvProvider
class GalleryDlBinary(Binary):
name: str = 'gallery-dl'
binproviders_supported = [PipProvider(), EnvProvider()]
binary = GalleryDlBinary()
binary = Binary(name='gallery-dl', binproviders=[PipProvider(), EnvProvider()])
loaded = binary.load()
if loaded and loaded.abspath:
return {
@@ -60,22 +25,9 @@ def find_gallerydl() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('gallery-dl') or os.environ.get('GALLERYDL_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'gallery-dl',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'env',
}
return None

View File

@@ -6,43 +6,8 @@ Runs at crawl start to verify git is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def get_binary_version(abspath: str) -> str | None:
"""Get version string from binary."""
try:
result = subprocess.run(
[abspath, '--version'],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout:
# git version string: "git version 2.43.0"
first_line = result.stdout.strip().split('\n')[0]
parts = first_line.split()
if len(parts) >= 3 and parts[0] == 'git':
return parts[2]
return first_line[:32]
except Exception:
pass
return None
def get_binary_hash(abspath: str) -> str | None:
"""Get SHA256 hash of binary."""
try:
with open(abspath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
def find_git() -> dict | None:
@@ -50,11 +15,7 @@ def find_git() -> dict | None:
try:
from abx_pkg import Binary, EnvProvider
class GitBinary(Binary):
name: str = 'git'
binproviders_supported = [EnvProvider()]
binary = GitBinary()
binary = Binary(name='git', binproviders=[EnvProvider()])
loaded = binary.load()
if loaded and loaded.abspath:
return {
@@ -64,22 +25,9 @@ def find_git() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('git') or os.environ.get('GIT_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'git',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'env',
}
return None

View File

@@ -6,13 +6,8 @@ Runs at crawl start to verify yt-dlp and required binaries are available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def find_ytdlp() -> dict | None:
@@ -30,22 +25,9 @@ def find_ytdlp() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('yt-dlp') or os.environ.get('YTDLP_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'yt-dlp',
'abspath': abspath,
'version': None,
'sha256': None,
'binprovider': 'env',
}
return None
@@ -64,22 +46,9 @@ def find_node() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('node') or os.environ.get('NODE_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'node',
'abspath': abspath,
'version': None,
'sha256': None,
'binprovider': 'env',
}
return None
@@ -98,22 +67,9 @@ def find_ffmpeg() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('ffmpeg') or os.environ.get('FFMPEG_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'ffmpeg',
'abspath': abspath,
'version': None,
'sha256': None,
'binprovider': 'env',
}
return None

View File

@@ -6,13 +6,8 @@ Runs at crawl start to verify postlight-parser is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def find_mercury() -> dict | None:
@@ -30,22 +25,9 @@ def find_mercury() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('postlight-parser') or os.environ.get('MERCURY_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'postlight-parser',
'abspath': abspath,
'version': None,
'sha256': None,
'binprovider': 'env',
}
return None

View File

@@ -6,39 +6,8 @@ Runs at crawl start to verify papers-dl binary is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def get_binary_version(abspath: str, version_flag: str = '--version') -> str | None:
"""Get version string from binary."""
try:
result = subprocess.run(
[abspath, version_flag],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout:
first_line = result.stdout.strip().split('\n')[0]
return first_line[:64]
except Exception:
pass
return None
def get_binary_hash(abspath: str) -> str | None:
"""Get SHA256 hash of binary."""
try:
with open(abspath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
def find_papersdl() -> dict | None:
@@ -46,11 +15,7 @@ def find_papersdl() -> dict | None:
try:
from abx_pkg import Binary, PipProvider, EnvProvider
class PapersdlBinary(Binary):
name: str = 'papers-dl'
binproviders_supported = [PipProvider(), EnvProvider()]
binary = PapersdlBinary()
binary = Binary(name='papers-dl', binproviders=[PipProvider(), EnvProvider()])
loaded = binary.load()
if loaded and loaded.abspath:
return {
@@ -60,22 +25,9 @@ def find_papersdl() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('papers-dl') or os.environ.get('PAPERSDL_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'papers-dl',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'env',
}
return None

View File

@@ -6,13 +6,8 @@ Runs at crawl start to verify readability-extractor is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def find_readability() -> dict | None:
@@ -30,22 +25,9 @@ def find_readability() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('readability-extractor') or os.environ.get('READABILITY_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'readability-extractor',
'abspath': abspath,
'version': None,
'sha256': None,
'binprovider': 'env',
}
return None

View File

@@ -9,67 +9,25 @@ Outputs JSONL for InstalledBinary and Machine config updates.
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def get_binary_version(abspath: str) -> str | None:
"""Get version string from ripgrep binary."""
try:
result = subprocess.run(
[abspath, '--version'],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout:
# ripgrep version string: "ripgrep 14.1.0"
first_line = result.stdout.strip().split('\n')[0]
parts = first_line.split()
for i, part in enumerate(parts):
if part.lower() == 'ripgrep' and i + 1 < len(parts):
return parts[i + 1]
# Try to find version number pattern
for part in parts:
if part[0].isdigit() and '.' in part:
return part
return first_line[:32]
except Exception:
pass
return None
def get_binary_hash(abspath: str) -> str | None:
"""Get SHA256 hash of binary."""
try:
with open(abspath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
def find_ripgrep() -> dict | None:
"""Find ripgrep binary using shutil.which or env var."""
# Check env var first - if it's an absolute path and exists, use it
ripgrep_env = os.environ.get('RIPGREP_BINARY', '')
if ripgrep_env and '/' in ripgrep_env and Path(ripgrep_env).is_file():
abspath = ripgrep_env
else:
# Otherwise try shutil.which with the env var as the binary name
abspath = shutil.which(ripgrep_env) if ripgrep_env else None
if not abspath:
abspath = shutil.which('rg')
"""Find ripgrep binary."""
try:
from abx_pkg import Binary, AptProvider, BrewProvider, EnvProvider
if abspath and Path(abspath).is_file():
return {
'name': 'rg',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'env',
}
binary = Binary(name='rg', binproviders=[AptProvider(), BrewProvider(), EnvProvider()])
loaded = binary.load()
if loaded and loaded.abspath:
return {
'name': 'rg',
'abspath': str(loaded.abspath),
'version': str(loaded.version) if loaded.version else None,
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except Exception:
pass
return None

View File

@@ -6,82 +6,27 @@ Runs at crawl start to verify single-file (npm package) is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def get_binary_version(abspath: str) -> str | None:
"""Get version string from single-file binary."""
try:
result = subprocess.run(
[abspath, '--version'],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout:
return result.stdout.strip().split('\n')[0][:32]
except Exception:
pass
return None
def get_binary_hash(abspath: str) -> str | None:
"""Get SHA256 hash of binary."""
try:
# For scripts, hash the script content
with open(abspath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
def find_singlefile() -> dict | None:
"""Find single-file binary."""
# Check env var first
env_path = os.environ.get('SINGLEFILE_BINARY', '')
if env_path and Path(env_path).is_file():
return {
'name': 'single-file',
'abspath': env_path,
'version': get_binary_version(env_path),
'sha256': get_binary_hash(env_path),
'binprovider': 'env',
}
try:
from abx_pkg import Binary, NpmProvider, EnvProvider
# Try shutil.which
for name in ['single-file', 'singlefile']:
abspath = shutil.which(name)
if abspath:
binary = Binary(name='single-file', binproviders=[NpmProvider(), EnvProvider()])
loaded = binary.load()
if loaded and loaded.abspath:
return {
'name': 'single-file',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'npm',
}
# Check common npm paths
npm_paths = [
Path.home() / '.npm-global/bin/single-file',
Path.home() / 'node_modules/.bin/single-file',
Path('/usr/local/bin/single-file'),
Path('/usr/local/lib/node_modules/.bin/single-file'),
]
for path in npm_paths:
if path.is_file():
return {
'name': 'single-file',
'abspath': str(path),
'version': get_binary_version(str(path)),
'sha256': get_binary_hash(str(path)),
'binprovider': 'npm',
'abspath': str(loaded.abspath),
'version': str(loaded.version) if loaded.version else None,
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except Exception:
pass
return None

View File

@@ -6,58 +6,16 @@ Runs at crawl start to verify wget is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def get_binary_version(abspath: str) -> str | None:
"""Get version string from binary."""
try:
result = subprocess.run(
[abspath, '--version'],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout:
# wget version string: "GNU Wget 1.24.5 built on ..."
first_line = result.stdout.strip().split('\n')[0]
# Extract version number
parts = first_line.split()
for i, part in enumerate(parts):
if part.lower() == 'wget' and i + 1 < len(parts):
return parts[i + 1]
return first_line[:32]
except Exception:
pass
return None
def get_binary_hash(abspath: str) -> str | None:
"""Get SHA256 hash of binary."""
try:
with open(abspath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
def find_wget() -> dict | None:
"""Find wget binary using abx-pkg or fallback to shutil.which."""
# Try abx-pkg first
"""Find wget binary using abx-pkg."""
try:
from abx_pkg import Binary, EnvProvider
class WgetBinary(Binary):
name: str = 'wget'
binproviders_supported = [EnvProvider()]
binary = WgetBinary()
binary = Binary(name='wget', binproviders=[EnvProvider()])
loaded = binary.load()
if loaded and loaded.abspath:
return {
@@ -67,22 +25,9 @@ def find_wget() -> dict | None:
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('wget') or os.environ.get('WGET_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'wget',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'env',
}
return None