Delete pid_utils.py and migrate to Process model (#1741)

This commit is contained in:
Nick Sweeting
2025-12-31 03:43:18 -08:00
committed by GitHub
26 changed files with 3893 additions and 1185 deletions

View File

@@ -28,7 +28,7 @@ Process(cmd=['archivebox', 'add', 'https://example.com']) # CLI entry
**File:** `archivebox/machine/models.py`
```python
class Process(ModelWithStateMachine):
class Process(ModelWithHealthStats):
# ... existing fields ...
# NEW: Parent process FK for hierarchy tracking
@@ -621,6 +621,18 @@ class Process(ModelWithHealthStats):
return self
def is_alive(self) -> bool:
"""Check if this process is still running."""
from archivebox.misc.process_utils import validate_pid_file
if self.status == self.StatusChoices.EXITED:
return False
if not self.pid:
return False
return validate_pid_file(self.pid_file, self.cmd_file)
def kill(self, signal_num: int = 15) -> bool:
"""
Kill this process and update status.
@@ -700,7 +712,7 @@ class Process(ModelWithHealthStats):
Wait for process to exit, polling periodically.
Args:
timeout: Max seconds to wait (None = use self.timeout, or config.TIMEOUT * 5 if that's also None)
timeout: Max seconds to wait (None = use self.timeout)
Returns:
exit_code
@@ -709,10 +721,8 @@ class Process(ModelWithHealthStats):
TimeoutError if process doesn't exit in time
"""
import time
from archivebox import config
# Require a timeout - default to config.TIMEOUT * 5 (typically 300s)
timeout = timeout or self.timeout or (config.TIMEOUT * 5)
timeout = timeout or self.timeout
start = time.time()
while True:
@@ -1692,6 +1702,230 @@ class ProcessAdmin(admin.ModelAdmin):
---
## Phase 8: Code Consolidation (Delete Redundant Logic)
The goal is to consolidate all subprocess management into `Process` model methods, eliminating duplicate logic scattered across the codebase.
### 8.1 Files to Simplify/Delete
| File | Current Lines | After Consolidation | Savings |
|------|--------------|---------------------|---------|
| `workers/pid_utils.py` | ~192 lines | DELETE entirely | -192 |
| `misc/process_utils.py` | ~85 lines | Keep as low-level utils | 0 |
| `hooks.py` (run_hook) | ~100 lines | -50 lines (use Process.launch) | -50 |
| `hooks.py` (kill/alive) | ~50 lines | DELETE (use Process.kill/is_running) | -50 |
| `crawls/models.py` (cleanup) | ~100 lines | -70 lines (use Process.kill) | -70 |
| `supervisord_util.py` | ~50 lines process mgmt | -30 lines | -30 |
| **TOTAL** | | | **~-390 lines** |
### 8.2 Detailed Consolidation Map
#### `workers/pid_utils.py` → DELETE ENTIRELY
| Current Function | Replacement |
|------------------|-------------|
| `write_pid_file(worker_type, worker_id)` | `Process.current()` auto-creates |
| `read_pid_file(path)` | `Process.objects.get_by_pid(pid)` |
| `remove_pid_file(path)` | Manual cleanup in `Process.kill()` and legacy hook cleanup code |
| `is_process_alive(pid)` | `Process.is_running` / `Process.proc is not None` |
| `get_all_pid_files()` | `Process.objects.filter(machine=Machine.current(), status=Process.StatusChoices.RUNNING)` |
| `get_all_worker_pids(type)` | `Process.objects.filter(machine=Machine.current(), process_type=type, status=Process.StatusChoices.RUNNING)` |
| `cleanup_stale_pid_files()` | `Process.cleanup_stale_running()` |
| `get_running_worker_count(type)` | `Process.objects.filter(...).count()` |
| `get_next_worker_id(type)` | Use `Max(worker_id)+1` under transaction or DB sequence to avoid race conditions |
| `stop_worker(pid, graceful)` | `Process.terminate(graceful_timeout)` or `Process.kill_tree()` |
#### `hooks.py` Changes
**Current `run_hook()` lines 374-398:**
```python
# DELETE these lines - replaced by Process.launch()
stdout_file = output_dir / 'stdout.log'
stderr_file = output_dir / 'stderr.log'
pid_file = output_dir / 'hook.pid'
cmd_file = output_dir / 'cmd.sh'
write_cmd_file(cmd_file, cmd)
with open(stdout_file, 'w') as out, open(stderr_file, 'w') as err:
process = subprocess.Popen(cmd, ...)
write_pid_file_with_mtime(pid_file, process.pid, time.time())
```
**New `run_hook()` using Process:**
```python
# Only store env delta or allowlist to avoid leaking secrets
env_delta = {k: v for k, v in env.items() if k in ALLOWED_ENV_VARS}
hook_process = Process.objects.create(
parent=parent_process,
process_type=Process.TypeChoices.HOOK,
cmd=cmd, pwd=str(output_dir), env=env_delta, timeout=timeout,
)
hook_process.launch(background=is_background)
# stdout/stderr/pid_file all handled internally by Process.launch()
```
**DELETE these functions entirely:**
```python
def process_is_alive(pid_file: Path) -> bool: # lines 1238-1256
def kill_process(pid_file: Path, sig, validate): # lines 1259-1282
```
**Replace with:**
```python
# Use Process methods directly:
process.is_running # replaces process_is_alive()
process.kill() # replaces kill_process()
```
#### `crawls/models.py` Changes
**Current `Crawl.cleanup()` lines 418-493:**
```python
# DELETE all this inline process logic:
def is_process_alive(pid):
try:
os.kill(pid, 0)
return True
except (OSError, ProcessLookupError):
return False
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
if not validate_pid_file(pid_file, cmd_file):
pid_file.unlink(missing_ok=True)
continue
pid = int(pid_file.read_text().strip())
os.killpg(pid, signal.SIGTERM)
time.sleep(2)
if not is_process_alive(pid):
pid_file.unlink(missing_ok=True)
continue
os.killpg(pid, signal.SIGKILL)
# ... more cleanup logic
```
**New `Crawl.cleanup()` using Process:**
```python
def cleanup(self):
# Kill all running child processes for this crawl
for snapshot in self.snapshot_set.all():
for ar in snapshot.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED):
if ar.process_id:
# Kill hook process and all its children
ar.process.kill()
for child in ar.process.children.filter(status='running'):
child.kill()
# Run on_CrawlEnd hooks (foreground)
# ... existing hook running logic ...
```
#### `supervisord_util.py` Changes
**Current global tracking:**
```python
_supervisord_proc = None # subprocess.Popen reference
def stop_existing_supervisord_process():
global _supervisord_proc
if _supervisord_proc and _supervisord_proc.poll() is None:
_supervisord_proc.terminate()
_supervisord_proc.wait(timeout=5)
# ... fallback to PID file ...
```
**New using Process model:**
```python
_supervisord_db_process = None # Process model instance
def start_new_supervisord_process():
# ... existing subprocess.Popen ...
global _supervisord_db_process
_supervisord_db_process = Process.objects.create(
parent=Process.current(),
process_type=Process.TypeChoices.SUPERVISORD,
pid=proc.pid,
cmd=['supervisord', f'--configuration={CONFIG_FILE}'],
started_at=timezone.now(),
status=Process.StatusChoices.RUNNING,
)
def stop_existing_supervisord_process():
global _supervisord_db_process
if _supervisord_db_process:
_supervisord_db_process.kill() # Handles children, PID validation, etc.
_supervisord_db_process = None
```
#### `workers/worker.py` Changes
**Current:**
```python
from .pid_utils import write_pid_file, remove_pid_file, ...
def on_startup(self):
self.pid = os.getpid()
self.pid_file = write_pid_file(self.name, self.worker_id)
def on_shutdown(self, error=None):
if self.pid_file:
remove_pid_file(self.pid_file)
```
**New:**
```python
# No import needed - Process.current() handles everything
def on_startup(self):
self.db_process = Process.current()
# Process.current() auto-detects type, finds parent via PPID, creates record
def on_shutdown(self, error=None):
if self.db_process:
self.db_process.exit_code = 0 if error is None else 1
self.db_process.status = Process.StatusChoices.EXITED
self.db_process.ended_at = timezone.now()
self.db_process.save()
```
### 8.3 New Process Model Methods Summary
All process operations now go through `Process`:
```python
# Getting current process
Process.current() # Creates/retrieves Process for os.getpid()
# Spawning new process
proc = Process.objects.create(parent=Process.current(), cmd=[...], ...)
proc.launch(background=False) # Handles Popen, PID file, stdout/stderr
# Checking process status
proc.is_running # True if OS process exists and matches
proc.proc # psutil.Process or None (validated)
proc.poll() # Returns exit_code or None
# Terminating process
proc.kill() # Safe kill with PID validation
proc.kill(SIGKILL) # Force kill
# Waiting for completion
proc.wait(timeout=30) # Blocks until exit or timeout
# Cleanup
Process.cleanup_stale_running() # Mark orphaned processes as EXITED
```
### 8.4 Benefits
1. **Single Source of Truth**: All process state in database, queryable
2. **PID Reuse Protection**: `Process.proc` validates via psutil.create_time()
3. **Hierarchy Tracking**: `Process.parent` / `Process.children` for tree traversal
4. **Machine-Scoped**: All queries filter by `machine=Machine.current()`
5. **Audit Trail**: Every subprocess is logged with timestamps, exit codes
6. **No Stale PID Files**: Process records update status automatically
---
## Open Questions
1. **Performance**: Deep hierarchies with many children could slow queries. Consider:

View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""
archivebox extract [snapshot_ids...] [--plugins=NAMES]
Run plugins on Snapshots. Accepts snapshot IDs as arguments, from stdin, or via JSONL.
Input formats:
- Snapshot UUIDs (one per line)
- JSONL: {"type": "Snapshot", "id": "...", "url": "..."}
- JSONL: {"type": "ArchiveResult", "snapshot_id": "...", "plugin": "..."}
Output (JSONL):
{"type": "ArchiveResult", "id": "...", "snapshot_id": "...", "plugin": "...", "status": "..."}
Examples:
# Extract specific snapshot
archivebox extract 01234567-89ab-cdef-0123-456789abcdef
# Pipe from snapshot command
archivebox snapshot https://example.com | archivebox extract
# Run specific plugins only
archivebox extract --plugins=screenshot,singlefile 01234567-89ab-cdef-0123-456789abcdef
# Chain commands
archivebox crawl https://example.com | archivebox snapshot | archivebox extract
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox extract'
import sys
from typing import Optional, List
import rich_click as click
def process_archiveresult_by_id(archiveresult_id: str) -> int:
"""
Run extraction for a single ArchiveResult by ID (used by workers).
Triggers the ArchiveResult's state machine tick() to run the extractor plugin.
"""
from rich import print as rprint
from archivebox.core.models import ArchiveResult
try:
archiveresult = ArchiveResult.objects.get(id=archiveresult_id)
except ArchiveResult.DoesNotExist:
rprint(f'[red]ArchiveResult {archiveresult_id} not found[/red]', file=sys.stderr)
return 1
rprint(f'[blue]Extracting {archiveresult.plugin} for {archiveresult.snapshot.url}[/blue]', file=sys.stderr)
try:
# Trigger state machine tick - this runs the actual extraction
archiveresult.sm.tick()
archiveresult.refresh_from_db()
if archiveresult.status == ArchiveResult.StatusChoices.SUCCEEDED:
print(f'[green]Extraction succeeded: {archiveresult.output_str}[/green]')
return 0
elif archiveresult.status == ArchiveResult.StatusChoices.FAILED:
print(f'[red]Extraction failed: {archiveresult.output_str}[/red]', file=sys.stderr)
return 1
else:
# Still in progress or backoff - not a failure
print(f'[yellow]Extraction status: {archiveresult.status}[/yellow]')
return 0
except Exception as e:
print(f'[red]Extraction error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
def run_plugins(
args: tuple,
plugins: str = '',
wait: bool = True,
) -> int:
"""
Run plugins on Snapshots from input.
Reads Snapshot IDs or JSONL from args/stdin, runs plugins, outputs JSONL.
Exit codes:
0: Success
1: Failure
"""
from rich import print as rprint
from django.utils import timezone
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record,
TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
)
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.workers.orchestrator import Orchestrator
is_tty = sys.stdout.isatty()
# Parse comma-separated plugins list once (reused in creation and filtering)
plugins_list = [p.strip() for p in plugins.split(',') if p.strip()] if plugins else []
# Collect all input records
records = list(read_args_or_stdin(args))
if not records:
rprint('[yellow]No snapshots provided. Pass snapshot IDs as arguments or via stdin.[/yellow]', file=sys.stderr)
return 1
# Gather snapshot IDs to process
snapshot_ids = set()
for record in records:
record_type = record.get('type')
if record_type == TYPE_SNAPSHOT:
snapshot_id = record.get('id')
if snapshot_id:
snapshot_ids.add(snapshot_id)
elif record.get('url'):
# Look up by URL (get most recent if multiple exist)
snap = Snapshot.objects.filter(url=record['url']).order_by('-created_at').first()
if snap:
snapshot_ids.add(str(snap.id))
else:
rprint(f'[yellow]Snapshot not found for URL: {record["url"]}[/yellow]', file=sys.stderr)
elif record_type == TYPE_ARCHIVERESULT:
snapshot_id = record.get('snapshot_id')
if snapshot_id:
snapshot_ids.add(snapshot_id)
elif 'id' in record:
# Assume it's a snapshot ID
snapshot_ids.add(record['id'])
if not snapshot_ids:
rprint('[red]No valid snapshot IDs found in input[/red]', file=sys.stderr)
return 1
# Get snapshots and ensure they have pending ArchiveResults
processed_count = 0
for snapshot_id in snapshot_ids:
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
except Snapshot.DoesNotExist:
rprint(f'[yellow]Snapshot {snapshot_id} not found[/yellow]', file=sys.stderr)
continue
# Create pending ArchiveResults if needed
if plugins_list:
# Only create for specific plugins
for plugin_name in plugins_list:
result, created = ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin=plugin_name,
defaults={
'status': ArchiveResult.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
)
if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]:
# Reset for retry
result.status = ArchiveResult.StatusChoices.QUEUED
result.retry_at = timezone.now()
result.save()
else:
# Create all pending plugins
snapshot.create_pending_archiveresults()
# Reset snapshot status to allow processing
if snapshot.status == Snapshot.StatusChoices.SEALED:
snapshot.status = Snapshot.StatusChoices.STARTED
snapshot.retry_at = timezone.now()
snapshot.save()
processed_count += 1
if processed_count == 0:
rprint('[red]No snapshots to process[/red]', file=sys.stderr)
return 1
rprint(f'[blue]Queued {processed_count} snapshots for extraction[/blue]', file=sys.stderr)
# Run orchestrator if --wait (default)
if wait:
rprint('[blue]Running plugins...[/blue]', file=sys.stderr)
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
# Output results as JSONL (when piped) or human-readable (when TTY)
for snapshot_id in snapshot_ids:
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
results = snapshot.archiveresult_set.all()
if plugins_list:
results = results.filter(plugin__in=plugins_list)
for result in results:
if is_tty:
status_color = {
'succeeded': 'green',
'failed': 'red',
'skipped': 'yellow',
}.get(result.status, 'dim')
rprint(f' [{status_color}]{result.status}[/{status_color}] {result.plugin}{result.output_str or ""}', file=sys.stderr)
else:
write_record(result.to_jsonl())
except Snapshot.DoesNotExist:
continue
return 0
def is_archiveresult_id(value: str) -> bool:
"""Check if value looks like an ArchiveResult UUID."""
import re
uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I)
if not uuid_pattern.match(value):
return False
# Verify it's actually an ArchiveResult (not a Snapshot or other object)
from archivebox.core.models import ArchiveResult
return ArchiveResult.objects.filter(id=value).exists()
@click.command()
@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run (e.g., screenshot,singlefile)')
@click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)')
@click.argument('args', nargs=-1)
def main(plugins: str, wait: bool, args: tuple):
"""Run plugins on Snapshots, or process existing ArchiveResults by ID"""
from archivebox.misc.jsonl import read_args_or_stdin
# Read all input
records = list(read_args_or_stdin(args))
if not records:
from rich import print as rprint
rprint('[yellow]No Snapshot IDs or ArchiveResult IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
sys.exit(1)
# Check if input looks like existing ArchiveResult IDs to process
all_are_archiveresult_ids = all(
is_archiveresult_id(r.get('id') or r.get('url', ''))
for r in records
)
if all_are_archiveresult_ids:
# Process existing ArchiveResults by ID
exit_code = 0
for record in records:
archiveresult_id = record.get('id') or record.get('url')
result = process_archiveresult_by_id(archiveresult_id)
if result != 0:
exit_code = result
sys.exit(exit_code)
else:
# Default behavior: run plugins on Snapshots from input
sys.exit(run_plugins(args, plugins=plugins, wait=wait))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""
archivebox orchestrator [--daemon]
Start the orchestrator process that manages workers.
The orchestrator polls queues for each model type (Crawl, Snapshot, ArchiveResult)
and lazily spawns worker processes when there is work to be done.
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox orchestrator'
import sys
import rich_click as click
from archivebox.misc.util import docstring
def orchestrator(daemon: bool = False, watch: bool = False) -> int:
"""
Start the orchestrator process.
The orchestrator:
1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
2. Spawns worker processes when there is work to do
3. Monitors worker health and restarts failed workers
4. Exits when all queues are empty (unless --daemon)
Args:
daemon: Run forever (don't exit when idle)
watch: Just watch the queues without spawning workers (for debugging)
Exit codes:
0: All work completed successfully
1: Error occurred
"""
from archivebox.workers.orchestrator import Orchestrator
if Orchestrator.is_running():
print('[yellow]Orchestrator is already running[/yellow]')
return 0
try:
orchestrator_instance = Orchestrator(exit_on_idle=not daemon)
orchestrator_instance.runloop()
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
print(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
@click.command()
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
@click.option('--watch', '-w', is_flag=True, help="Watch queues without spawning workers")
@docstring(orchestrator.__doc__)
def main(daemon: bool, watch: bool):
"""Start the ArchiveBox orchestrator process"""
sys.exit(orchestrator(daemon=daemon, watch=watch))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python3
__package__ = 'archivebox.cli'
__command__ = 'archivebox remove'
import shutil
from pathlib import Path
from typing import Iterable
import rich_click as click
from django.db.models import QuerySet
from archivebox.config import DATA_DIR
from archivebox.config.django import setup_django
from archivebox.misc.util import enforce_types, docstring
from archivebox.misc.checks import check_data_folder
from archivebox.misc.logging_util import (
log_list_started,
log_list_finished,
log_removal_started,
log_removal_finished,
TimedProgress,
)
@enforce_types
def remove(filter_patterns: Iterable[str]=(),
filter_type: str='exact',
snapshots: QuerySet | None=None,
after: float | None=None,
before: float | None=None,
yes: bool=False,
delete: bool=False,
out_dir: Path=DATA_DIR) -> QuerySet:
"""Remove the specified URLs from the archive"""
setup_django()
check_data_folder()
from archivebox.cli.archivebox_search import get_snapshots
log_list_started(filter_patterns, filter_type)
timer = TimedProgress(360, prefix=' ')
try:
snapshots = get_snapshots(
snapshots=snapshots,
filter_patterns=list(filter_patterns) if filter_patterns else None,
filter_type=filter_type,
after=after,
before=before,
)
finally:
timer.end()
if not snapshots.exists():
log_removal_finished(0, 0)
raise SystemExit(1)
log_list_finished(snapshots)
log_removal_started(snapshots, yes=yes, delete=delete)
timer = TimedProgress(360, prefix=' ')
try:
for snapshot in snapshots:
if delete:
shutil.rmtree(snapshot.output_dir, ignore_errors=True)
finally:
timer.end()
to_remove = snapshots.count()
from archivebox.search import flush_search_index
from archivebox.core.models import Snapshot
flush_search_index(snapshots=snapshots)
snapshots.delete()
all_snapshots = Snapshot.objects.all()
log_removal_finished(all_snapshots.count(), to_remove)
return all_snapshots
@click.command()
@click.option('--yes', is_flag=True, help='Remove links instantly without prompting to confirm')
@click.option('--delete', is_flag=True, help='Delete the archived content and metadata folder in addition to removing from index')
@click.option('--before', type=float, help='Remove only URLs bookmarked before timestamp')
@click.option('--after', type=float, help='Remove only URLs bookmarked after timestamp')
@click.option('--filter-type', '-f', type=click.Choice(('exact', 'substring', 'domain', 'regex', 'tag')), default='exact', help='Type of pattern matching to use when filtering URLs')
@click.argument('filter_patterns', nargs=-1)
@docstring(remove.__doc__)
def main(**kwargs):
"""Remove the specified URLs from the archive"""
remove(**kwargs)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
__package__ = 'archivebox.cli'
__command__ = 'archivebox search'
from pathlib import Path
from typing import Optional, List, Any
import rich_click as click
from rich import print
from django.db.models import QuerySet
from archivebox.config import DATA_DIR
from archivebox.misc.logging import stderr
from archivebox.misc.util import enforce_types, docstring
# Filter types for URL matching
LINK_FILTERS = {
'exact': lambda pattern: {'url': pattern},
'substring': lambda pattern: {'url__icontains': pattern},
'regex': lambda pattern: {'url__iregex': pattern},
'domain': lambda pattern: {'url__istartswith': f'http://{pattern}'},
'tag': lambda pattern: {'tags__name': pattern},
'timestamp': lambda pattern: {'timestamp': pattern},
}
STATUS_CHOICES = ['indexed', 'archived', 'unarchived']
def get_snapshots(snapshots: Optional[QuerySet]=None,
filter_patterns: Optional[List[str]]=None,
filter_type: str='substring',
after: Optional[float]=None,
before: Optional[float]=None,
out_dir: Path=DATA_DIR) -> QuerySet:
"""Filter and return Snapshots matching the given criteria."""
from archivebox.core.models import Snapshot
if snapshots:
result = snapshots
else:
result = Snapshot.objects.all()
if after is not None:
result = result.filter(timestamp__gte=after)
if before is not None:
result = result.filter(timestamp__lt=before)
if filter_patterns:
result = Snapshot.objects.filter_by_patterns(filter_patterns, filter_type)
if not result:
stderr('[!] No Snapshots matched your filters:', filter_patterns, f'({filter_type})', color='lightyellow')
return result
@enforce_types
def search(filter_patterns: list[str] | None=None,
filter_type: str='substring',
status: str='indexed',
before: float | None=None,
after: float | None=None,
sort: str | None=None,
json: bool=False,
html: bool=False,
csv: str | None=None,
with_headers: bool=False):
"""List, filter, and export information about archive entries"""
from archivebox.core.models import Snapshot
if with_headers and not (json or html or csv):
stderr('[X] --with-headers requires --json, --html or --csv\n', color='red')
raise SystemExit(2)
# Query DB directly - no filesystem scanning
snapshots = get_snapshots(
filter_patterns=list(filter_patterns) if filter_patterns else None,
filter_type=filter_type,
before=before,
after=after,
)
# Apply status filter
if status == 'archived':
snapshots = snapshots.filter(downloaded_at__isnull=False)
elif status == 'unarchived':
snapshots = snapshots.filter(downloaded_at__isnull=True)
# 'indexed' = all snapshots (no filter)
if sort:
snapshots = snapshots.order_by(sort)
# Export to requested format
if json:
output = snapshots.to_json(with_headers=with_headers)
elif html:
output = snapshots.to_html(with_headers=with_headers)
elif csv:
output = snapshots.to_csv(cols=csv.split(','), header=with_headers)
else:
from archivebox.misc.logging_util import printable_folders
# Convert to dict for printable_folders
folders = {s.output_dir: s for s in snapshots}
output = printable_folders(folders, with_headers)
print(output)
return output
@click.command()
@click.option('--filter-type', '-f', type=click.Choice(['search', *LINK_FILTERS.keys()]), default='substring', help='Pattern matching type for filtering URLs')
@click.option('--status', '-s', type=click.Choice(STATUS_CHOICES), default='indexed', help='List snapshots with the given status')
@click.option('--before', '-b', type=float, help='List snapshots bookmarked before the given UNIX timestamp')
@click.option('--after', '-a', type=float, help='List snapshots bookmarked after the given UNIX timestamp')
@click.option('--sort', '-o', type=str, help='Field to sort by, e.g. url, created_at, bookmarked_at, downloaded_at')
@click.option('--json', '-J', is_flag=True, help='Print output in JSON format')
@click.option('--html', '-M', is_flag=True, help='Print output in HTML format (suitable for viewing statically without a server)')
@click.option('--csv', '-C', type=str, help='Print output as CSV with the provided fields, e.g.: created_at,url,title')
@click.option('--with-headers', '-H', is_flag=True, help='Include extra CSV/HTML headers in the output')
@click.help_option('--help', '-h')
@click.argument('filter_patterns', nargs=-1)
@docstring(search.__doc__)
def main(**kwargs):
return search(**kwargs)
if __name__ == '__main__':
main()

View File

@@ -1,6 +1,6 @@
__package__ = 'archivebox.core'
from typing import Optional, Dict, Iterable, Any, List, TYPE_CHECKING, Iterator, Set
from typing import Optional, Dict, Iterable, Any, List, TYPE_CHECKING
from archivebox.uuid_compat import uuid7
from datetime import datetime, timedelta
from django_stubs_ext.db.models import TypedModelMeta
@@ -41,8 +41,6 @@ from archivebox.machine.models import NetworkInterface, Binary
class Tag(ModelWithSerializers):
JSONL_TYPE = 'Tag'
# Keep AutoField for compatibility with main branch migrations
# Don't use UUIDField here - requires complex FK transformation
id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
@@ -93,66 +91,26 @@ class Tag(ModelWithSerializers):
def api_url(self) -> str:
return reverse_lazy('api-1:get_tag', args=[self.id])
def to_json(self) -> dict:
def to_jsonl(self) -> dict:
"""
Convert Tag model instance to a JSON-serializable dict.
Convert Tag model instance to a JSONL record.
"""
from archivebox.config import VERSION
return {
'type': self.JSONL_TYPE,
'type': 'Tag',
'schema_version': VERSION,
'id': str(self.id),
'name': self.name,
'slug': self.slug,
}
def to_jsonl(self, seen: Set[tuple] = None, **kwargs) -> Iterator[dict]:
"""
Yield this Tag as a JSON record.
Args:
seen: Set of (type, id) tuples already emitted (for deduplication)
**kwargs: Passed to children (none for Tag, leaf node)
Yields:
dict: JSON-serializable record for this tag
"""
if seen is not None:
key = (self.JSONL_TYPE, str(self.id))
if key in seen:
return
seen.add(key)
yield self.to_json()
@classmethod
def from_jsonl(cls, records, overrides: Dict[str, Any] = None) -> list['Tag']:
"""
Create/update Tags from an iterable of JSONL records.
Filters to only records with type='Tag'.
Args:
records: Iterable of dicts (JSONL records)
overrides: Optional dict with 'snapshot' to auto-attach tags
Returns:
List of Tag instances (skips None results)
"""
results = []
for record in records:
record_type = record.get('type', cls.JSONL_TYPE)
if record_type == cls.JSONL_TYPE:
instance = cls.from_json(record, overrides=overrides)
if instance:
results.append(instance)
return results
@staticmethod
def from_json(record: Dict[str, Any], overrides: Dict[str, Any] = None) -> 'Tag | None':
def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None):
"""
Create/update a single Tag from a JSON record dict.
Create/update Tag from JSONL record.
Args:
record: Dict with 'name' field
record: JSONL record with 'name' field
overrides: Optional dict with 'snapshot' to auto-attach tag
Returns:
@@ -331,8 +289,6 @@ class SnapshotManager(models.Manager.from_queryset(SnapshotQuerySet)):
class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
JSONL_TYPE = 'Snapshot'
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
@@ -469,7 +425,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
def _fs_next_version(self, version: str) -> str:
"""Get next version in migration chain (0.7/0.8 had same layout, only 0.8→0.9 migration needed)"""
# Treat 0.7.0 and 0.8.0 as equivalent (both used data/archive/{timestamp})
# Treat 0.7.0 and 0.8.0 as equivalent (both used archive/{timestamp})
if version in ('0.7.0', '0.8.0'):
return '0.9.0'
return self._fs_current_version()
@@ -478,8 +434,8 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
"""
Migrate from flat to nested structure.
0.8.x: data/archive/{timestamp}/{extractor}/
0.9.x: data/users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/{plugin}/
0.8.x: archive/{timestamp}/
0.9.x: users/{user}/snapshots/YYYYMMDD/{domain}/{uuid}/
Transaction handling:
1. Copy files INSIDE transaction
@@ -597,8 +553,8 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
Calculate storage path for specific filesystem version.
Centralizes path logic so it's reusable.
0.7.x/0.8.x: data/archive/{timestamp}
0.9.x: data/users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/
0.7.x/0.8.x: archive/{timestamp}
0.9.x: users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/
"""
from datetime import datetime
@@ -1012,18 +968,38 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
Each line is a JSON record with a 'type' field:
- Snapshot: snapshot metadata (crawl_id, url, tags, etc.)
- ArchiveResult: extractor results (plugin, status, output, etc.)
- Binary: binary info used for the extraction
- Process: process execution details (cmd, exit_code, timing, etc.)
- ArchiveResult: extractor results (plugin, status, output, etc.)
"""
import json
index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
index_path.parent.mkdir(parents=True, exist_ok=True)
# Track unique binaries and processes to avoid duplicates
binaries_seen = set()
processes_seen = set()
with open(index_path, 'w') as f:
for record in self.to_jsonl():
f.write(json.dumps(record) + '\n')
# Write Snapshot record first (to_jsonl includes crawl_id, fs_version)
f.write(json.dumps(self.to_jsonl()) + '\n')
# Write ArchiveResult records with their associated Binary and Process
# Use select_related to optimize queries
for ar in self.archiveresult_set.select_related('process__binary').order_by('start_ts'):
# Write Binary record if not already written
if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen:
binaries_seen.add(ar.process.binary_id)
f.write(json.dumps(ar.process.binary.to_jsonl()) + '\n')
# Write Process record if not already written
if ar.process and ar.process_id not in processes_seen:
processes_seen.add(ar.process_id)
f.write(json.dumps(ar.process.to_jsonl()) + '\n')
# Write ArchiveResult record
f.write(json.dumps(ar.to_jsonl()) + '\n')
def read_index_jsonl(self) -> dict:
"""
@@ -1407,22 +1383,18 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
Clean up background ArchiveResult hooks.
Called by the state machine when entering the 'sealed' state.
Gracefully terminates background hooks using plugin-specific timeouts:
1. Send SIGTERM to all background hook processes
2. Wait up to each hook's plugin-specific timeout
3. Send SIGKILL to any hooks still running after timeout
Kills any background hooks and finalizes their ArchiveResults.
"""
from archivebox.hooks import graceful_terminate_background_hooks
from archivebox.config.configset import get_config
from archivebox.misc.process_utils import safe_kill_process
# Kill any background ArchiveResult hooks
if not self.OUTPUT_DIR.exists():
return
# Get merged config for plugin-specific timeout lookup
config = get_config(crawl=self.crawl, snapshot=self)
# Gracefully terminate all background hooks with plugin-specific timeouts
graceful_terminate_background_hooks(self.OUTPUT_DIR, config)
# Find all .pid files in this snapshot's output directory
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
cmd_file = pid_file.parent / 'cmd.sh'
safe_kill_process(pid_file, cmd_file)
# Update all STARTED ArchiveResults from filesystem
results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED)
@@ -1435,32 +1407,35 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
Used by state machine to determine if snapshot is finished.
"""
from archivebox.hooks import process_is_alive
from archivebox.misc.process_utils import validate_pid_file
if not self.OUTPUT_DIR.exists():
return False
# Check all .pid files in the snapshot directory (hook-specific names)
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
if process_is_alive(pid_file):
for plugin_dir in self.OUTPUT_DIR.iterdir():
if not plugin_dir.is_dir():
continue
pid_file = plugin_dir / 'hook.pid'
cmd_file = plugin_dir / 'cmd.sh'
if validate_pid_file(pid_file, cmd_file):
return True
return False
def to_json(self) -> dict:
def to_jsonl(self) -> dict:
"""
Convert Snapshot model instance to a JSON-serializable dict.
Convert Snapshot model instance to a JSONL record.
Includes all fields needed to fully reconstruct/identify this snapshot.
"""
from archivebox.config import VERSION
return {
'type': self.JSONL_TYPE,
'type': 'Snapshot',
'schema_version': VERSION,
'id': str(self.id),
'crawl_id': str(self.crawl_id),
'url': self.url,
'title': self.title,
'tags_str': self.tags_str(),
'tags': self.tags_str(),
'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'timestamp': self.timestamp,
@@ -1469,68 +1444,12 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
'fs_version': self.fs_version,
}
def to_jsonl(self, seen: Set[tuple] = None, archiveresult: bool = True, process: bool = True, binary: bool = True, machine: bool = False, iface: bool = False, **kwargs) -> Iterator[dict]:
"""
Yield this Snapshot and optionally related objects as JSON records.
Uses select_related for efficient querying. Deduplicates automatically.
Args:
seen: Set of (type, id) tuples already emitted (for deduplication)
archiveresult: Include related ArchiveResults (default: True)
process: Include Process for each ArchiveResult (default: True)
binary: Include Binary for each Process (default: True)
machine: Include Machine for each Process (default: False)
iface: Include NetworkInterface for each Process (default: False)
**kwargs: Additional options passed to children
Yields:
dict: JSON-serializable records
"""
if seen is None:
seen = set()
key = (self.JSONL_TYPE, str(self.id))
if key in seen:
return
seen.add(key)
yield self.to_json()
if archiveresult:
# Use select_related to optimize queries
for ar in self.archiveresult_set.select_related('process__binary').order_by('start_ts'):
yield from ar.to_jsonl(seen=seen, process=process, binary=binary, machine=machine, iface=iface, **kwargs)
@classmethod
def from_jsonl(cls, records, overrides: Dict[str, Any] = None, queue_for_extraction: bool = True) -> list['Snapshot']:
"""
Create/update Snapshots from an iterable of JSONL records.
Filters to only records with type='Snapshot' (or no type).
Args:
records: Iterable of dicts (JSONL records)
overrides: Dict with 'crawl', 'snapshot' (parent), 'created_by_id'
queue_for_extraction: If True, sets status=QUEUED and retry_at (default: True)
Returns:
List of Snapshot instances (skips None results)
"""
results = []
for record in records:
record_type = record.get('type', cls.JSONL_TYPE)
if record_type == cls.JSONL_TYPE:
instance = cls.from_json(record, overrides=overrides, queue_for_extraction=queue_for_extraction)
if instance:
results.append(instance)
return results
@staticmethod
def from_json(record: Dict[str, Any], overrides: Dict[str, Any] = None, queue_for_extraction: bool = True) -> 'Snapshot | None':
def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None, queue_for_extraction: bool = True):
"""
Create/update a single Snapshot from a JSON record dict.
Create/update Snapshot from JSONL record or dict.
Handles:
Unified method that handles:
- ID-based patching: {"id": "...", "title": "new title"}
- URL-based create/update: {"url": "...", "title": "...", "tags": "..."}
- Auto-creates Crawl if not provided
@@ -2137,8 +2056,8 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
result['canonical'] = self.canonical_outputs()
return result
def to_json_str(self, indent: int = 4) -> str:
"""Convert to JSON string for file output."""
def to_json(self, indent: int = 4) -> str:
"""Convert to JSON string"""
return to_json(self.to_dict(extended=True), indent=indent)
def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
@@ -2286,8 +2205,6 @@ class SnapshotMachine(BaseStateMachine, strict_states=True):
class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
JSONL_TYPE = 'ArchiveResult'
class StatusChoices(models.TextChoices):
QUEUED = 'queued', 'Queued'
STARTED = 'started', 'Started'
@@ -2321,7 +2238,7 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
process = models.OneToOneField(
'machine.Process',
on_delete=models.PROTECT,
null=False,
null=False, # Required after migration 4
related_name='archiveresult',
help_text='Process execution details for this archive result'
)
@@ -2359,13 +2276,13 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
"""Convenience property to access the user who created this archive result via its snapshot's crawl."""
return self.snapshot.crawl.created_by
def to_json(self) -> dict:
def to_jsonl(self) -> dict:
"""
Convert ArchiveResult model instance to a JSON-serializable dict.
Convert ArchiveResult model instance to a JSONL record.
"""
from archivebox.config import VERSION
record = {
'type': self.JSONL_TYPE,
'type': 'ArchiveResult',
'schema_version': VERSION,
'id': str(self.id),
'snapshot_id': str(self.snapshot_id),
@@ -2393,121 +2310,6 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
record['process_id'] = str(self.process_id)
return record
def to_jsonl(self, seen: Set[tuple] = None, process: bool = True, **kwargs) -> Iterator[dict]:
"""
Yield this ArchiveResult and optionally related objects as JSON records.
Args:
seen: Set of (type, id) tuples already emitted (for deduplication)
process: Include related Process and its children (default: True)
**kwargs: Passed to Process.to_jsonl() (e.g., binary=True, machine=False)
Yields:
dict: JSON-serializable records
"""
if seen is None:
seen = set()
key = (self.JSONL_TYPE, str(self.id))
if key in seen:
return
seen.add(key)
yield self.to_json()
if process and self.process:
yield from self.process.to_jsonl(seen=seen, **kwargs)
@classmethod
def from_jsonl(cls, records, overrides: Dict[str, Any] = None) -> list['ArchiveResult']:
"""
Create/update ArchiveResults from an iterable of JSONL records.
Filters to only records with type='ArchiveResult'.
Args:
records: Iterable of dicts (JSONL records)
overrides: Dict of field overrides
Returns:
List of ArchiveResult instances (skips None results)
"""
results = []
for record in records:
record_type = record.get('type', cls.JSONL_TYPE)
if record_type == cls.JSONL_TYPE:
instance = cls.from_json(record, overrides=overrides)
if instance:
results.append(instance)
return results
@staticmethod
def from_json(record: Dict[str, Any], overrides: Dict[str, Any] = None) -> 'ArchiveResult | None':
"""
Create or update a single ArchiveResult from a JSON record dict.
Args:
record: Dict with 'snapshot_id' and 'plugin' (required for create),
or 'id' (for update)
overrides: Dict of field overrides (e.g., config overrides)
Returns:
ArchiveResult instance or None if invalid
"""
from django.utils import timezone
overrides = overrides or {}
# If 'id' is provided, lookup and update existing
result_id = record.get('id')
if result_id:
try:
result = ArchiveResult.objects.get(id=result_id)
# Update fields from record
if record.get('status'):
result.status = record['status']
result.retry_at = timezone.now()
result.save()
return result
except ArchiveResult.DoesNotExist:
pass # Fall through to create
# Required fields for creation
snapshot_id = record.get('snapshot_id')
plugin = record.get('plugin')
if not snapshot_id or not plugin:
return None
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
except Snapshot.DoesNotExist:
return None
# Check if result already exists for this snapshot+plugin
existing = ArchiveResult.objects.filter(
snapshot=snapshot,
plugin=plugin,
).first()
if existing:
# Update existing result if status provided
if record.get('status'):
existing.status = record['status']
existing.retry_at = timezone.now()
existing.save()
return existing
# Create new ArchiveResult
result = ArchiveResult(
snapshot=snapshot,
plugin=plugin,
status=record.get('status', ArchiveResult.StatusChoices.QUEUED),
retry_at=timezone.now(),
hook_name=record.get('hook_name', ''),
)
result.save()
return result
def save(self, *args, **kwargs):
is_new = self._state.adding
@@ -2795,26 +2597,9 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
self.save()
return
# Derive hook basename for hook-specific filenames
# e.g., "on_Snapshot__50_wget.py" -> "on_Snapshot__50_wget"
hook_basename = Path(self.hook_name).stem if self.hook_name else 'hook'
# Read and parse JSONL output from hook-specific stdout log
stdout_file = plugin_dir / f'{hook_basename}.stdout.log'
stderr_file = plugin_dir / f'{hook_basename}.stderr.log'
returncode_file = plugin_dir / f'{hook_basename}.returncode'
# Read and parse JSONL output from stdout.log
stdout_file = plugin_dir / 'stdout.log'
stdout = stdout_file.read_text() if stdout_file.exists() else ''
stderr = stderr_file.read_text() if stderr_file.exists() else ''
# Read returncode from file (written by graceful_terminate_background_hooks)
returncode = None
if returncode_file.exists():
try:
rc_text = returncode_file.read_text().strip()
returncode = int(rc_text) if rc_text else None
except (ValueError, OSError):
pass
records = []
for line in stdout.splitlines():
@@ -2849,43 +2634,12 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
self._set_binary_from_cmd(hook_data['cmd'])
# Note: cmd_version is derived from binary.version, not stored on Process
else:
# No ArchiveResult JSONL record - determine status from returncode
if returncode is not None:
if returncode == 0:
self.status = self.StatusChoices.SUCCEEDED
self.output_str = 'Hook completed successfully (no JSONL output)'
elif returncode < 0:
# Negative = killed by signal (e.g., -9 for SIGKILL, -15 for SIGTERM)
sig_num = abs(returncode)
sig_name = {9: 'SIGKILL', 15: 'SIGTERM'}.get(sig_num, f'signal {sig_num}')
self.status = self.StatusChoices.FAILED
self.output_str = f'Hook killed by {sig_name}'
if stderr:
self.output_str += f'\n\nstderr:\n{stderr[:2000]}'
else:
self.status = self.StatusChoices.FAILED
self.output_str = f'Hook failed with exit code {returncode}'
if stderr:
self.output_str += f'\n\nstderr:\n{stderr[:2000]}'
else:
# No returncode file and no JSONL = failed
self.status = self.StatusChoices.FAILED
self.output_str = 'Hook did not output ArchiveResult record'
if stderr:
self.output_str += f'\n\nstderr:\n{stderr[:2000]}'
# No ArchiveResult record = failed
self.status = self.StatusChoices.FAILED
self.output_str = 'Hook did not output ArchiveResult record'
# Walk filesystem and populate output_files, output_size, output_mimetypes
# Exclude hook output files (hook-specific names like on_Snapshot__50_wget.stdout.log)
def is_hook_output_file(name: str) -> bool:
"""Check if a file is a hook output file that should be excluded."""
return (
name.endswith('.stdout.log') or
name.endswith('.stderr.log') or
name.endswith('.pid') or
name.endswith('.returncode') or
(name.endswith('.sh') and name.startswith('on_'))
)
exclude_names = {'stdout.log', 'stderr.log', 'hook.pid', 'listener.pid'}
mime_sizes = defaultdict(int)
total_size = 0
output_files = {}
@@ -2893,7 +2647,7 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
for file_path in plugin_dir.rglob('*'):
if not file_path.is_file():
continue
if is_hook_output_file(file_path.name):
if file_path.name in exclude_names:
continue
try:
@@ -2951,10 +2705,10 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
}
process_hook_records(filtered_records, overrides=overrides)
# Cleanup PID files, returncode files, and empty logs (hook-specific names)
pid_file = plugin_dir / f'{hook_basename}.pid'
# Cleanup PID files and empty logs
pid_file = plugin_dir / 'hook.pid'
pid_file.unlink(missing_ok=True)
returncode_file.unlink(missing_ok=True)
stderr_file = plugin_dir / 'stderr.log'
if stdout_file.exists() and stdout_file.stat().st_size == 0:
stdout_file.unlink()
if stderr_file.exists() and stderr_file.stat().st_size == 0:
@@ -3060,9 +2814,7 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
plugin_dir = Path(self.pwd) if self.pwd else None
if not plugin_dir:
return False
# Use hook-specific pid filename
hook_basename = Path(self.hook_name).stem if self.hook_name else 'hook'
pid_file = plugin_dir / f'{hook_basename}.pid'
pid_file = plugin_dir / 'hook.pid'
return pid_file.exists()

View File

@@ -1,6 +1,6 @@
__package__ = 'archivebox.crawls'
from typing import TYPE_CHECKING, Iterable, Iterator, Set
from typing import TYPE_CHECKING, Iterable
from datetime import timedelta
from archivebox.uuid_compat import uuid7
from pathlib import Path
@@ -59,8 +59,6 @@ class CrawlSchedule(ModelWithSerializers, ModelWithNotes, ModelWithHealthStats):
class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWithStateMachine):
JSONL_TYPE = 'Crawl'
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False)
@@ -136,13 +134,13 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
def api_url(self) -> str:
return reverse_lazy('api-1:get_crawl', args=[self.id])
def to_json(self) -> dict:
def to_jsonl(self) -> dict:
"""
Convert Crawl model instance to a JSON-serializable dict.
Convert Crawl model instance to a JSONL record.
"""
from archivebox.config import VERSION
return {
'type': self.JSONL_TYPE,
'type': 'Crawl',
'schema_version': VERSION,
'id': str(self.id),
'urls': self.urls,
@@ -153,63 +151,10 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
'created_at': self.created_at.isoformat() if self.created_at else None,
}
def to_jsonl(self, seen: Set[tuple] = None, snapshot: bool = True, archiveresult: bool = True, process: bool = True, binary: bool = True, machine: bool = False, iface: bool = False, **kwargs) -> Iterator[dict]:
"""
Yield this Crawl and optionally related objects as JSON records.
Args:
seen: Set of (type, id) tuples already emitted (for deduplication)
snapshot: Include related Snapshots (default: True)
archiveresult: Include ArchiveResults for each Snapshot (default: True)
process: Include Process for each ArchiveResult (default: True)
binary: Include Binary for each Process (default: True)
machine: Include Machine for each Process (default: False)
iface: Include NetworkInterface for each Process (default: False)
**kwargs: Additional options passed to children
Yields:
dict: JSON-serializable records
"""
if seen is None:
seen = set()
key = (self.JSONL_TYPE, str(self.id))
if key in seen:
return
seen.add(key)
yield self.to_json()
if snapshot:
for snap in self.snapshot_set.all():
yield from snap.to_jsonl(seen=seen, archiveresult=archiveresult, process=process, binary=binary, machine=machine, iface=iface, **kwargs)
@classmethod
def from_jsonl(cls, records, overrides: dict = None) -> list['Crawl']:
"""
Create/update Crawls from an iterable of JSONL records.
Filters to only records with type='Crawl' (or no type).
Args:
records: Iterable of dicts (JSONL records)
overrides: Dict of field overrides (e.g., created_by_id)
Returns:
List of Crawl instances (skips None results)
"""
results = []
for record in records:
record_type = record.get('type', cls.JSONL_TYPE)
if record_type == cls.JSONL_TYPE:
instance = cls.from_json(record, overrides=overrides)
if instance:
results.append(instance)
return results
@staticmethod
def from_json(record: dict, overrides: dict = None) -> 'Crawl | None':
def from_jsonl(record: dict, overrides: dict = None):
"""
Create or get a single Crawl from a JSON record dict.
Create or get a Crawl from a JSONL record.
Args:
record: Dict with 'urls' (required), optional 'max_depth', 'tags_str', 'label'
@@ -250,45 +195,11 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
)
return crawl
@staticmethod
def extract_domain_from_url(url: str) -> str:
"""
Extract domain from URL for path structure.
Uses full hostname with sanitized special chars.
Examples:
https://example.com:8080 → example.com_8080
https://sub.example.com → sub.example.com
file:///path → localhost
data:text/html → data
"""
from urllib.parse import urlparse
try:
parsed = urlparse(url)
if parsed.scheme in ('http', 'https'):
if parsed.port:
return f"{parsed.hostname}_{parsed.port}".replace(':', '_')
return parsed.hostname or 'unknown'
elif parsed.scheme == 'file':
return 'localhost'
elif parsed.scheme:
return parsed.scheme
else:
return 'unknown'
except Exception:
return 'unknown'
@property
def output_dir_parent(self) -> str:
"""Construct parent directory: users/{username}/crawls/{YYYYMMDD}/{domain}"""
"""Construct parent directory: users/{user_id}/crawls/{YYYYMMDD}"""
date_str = self.created_at.strftime('%Y%m%d')
username = self.created_by.username
# Get domain from first URL
first_url = self.get_urls_list()[0] if self.get_urls_list() else ''
domain = self.extract_domain_from_url(first_url) if first_url else 'unknown'
return f'users/{username}/crawls/{date_str}/{domain}'
return f'users/{self.created_by_id}/crawls/{date_str}'
@property
def output_dir_name(self) -> str:
@@ -506,84 +417,18 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
def cleanup(self):
"""Clean up background hooks and run on_CrawlEnd hooks."""
import os
import signal
import time
from pathlib import Path
from archivebox.hooks import run_hook, discover_hooks
from archivebox.misc.process_utils import validate_pid_file
def is_process_alive(pid):
"""Check if a process exists."""
try:
os.kill(pid, 0) # Signal 0 checks existence without killing
return True
except (OSError, ProcessLookupError):
return False
from archivebox.misc.process_utils import safe_kill_process
# Kill any background processes by scanning for all .pid files
if self.OUTPUT_DIR.exists():
for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
# Validate PID before killing to avoid killing unrelated processes
cmd_file = pid_file.parent / 'cmd.sh'
if not validate_pid_file(pid_file, cmd_file):
# PID reused by different process or process dead
# safe_kill_process now waits for termination and escalates to SIGKILL
# Returns True only if process is confirmed dead
killed = safe_kill_process(pid_file, cmd_file)
if killed:
pid_file.unlink(missing_ok=True)
continue
try:
pid = int(pid_file.read_text().strip())
# Step 1: Send SIGTERM for graceful shutdown
try:
# Try to kill process group first (handles detached processes like Chrome)
try:
os.killpg(pid, signal.SIGTERM)
except (OSError, ProcessLookupError):
# Fall back to killing just the process
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
# Already dead
pid_file.unlink(missing_ok=True)
continue
# Step 2: Wait for graceful shutdown
time.sleep(2)
# Step 3: Check if still alive
if not is_process_alive(pid):
# Process terminated gracefully
pid_file.unlink(missing_ok=True)
continue
# Step 4: Process still alive, force kill ENTIRE process group with SIGKILL
try:
try:
# Always kill entire process group with SIGKILL (not individual processes)
os.killpg(pid, signal.SIGKILL)
except (OSError, ProcessLookupError) as e:
# Process group kill failed, try single process as fallback
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
# Process died between check and kill
pid_file.unlink(missing_ok=True)
continue
# Step 5: Wait and verify death
time.sleep(1)
if is_process_alive(pid):
# Process is unkillable (likely in UNE state on macOS)
# This happens when Chrome crashes in kernel syscall (IOSurface)
# Log but don't block cleanup - process will remain until reboot
print(f'[yellow]⚠️ Process {pid} is unkillable (likely crashed in kernel). Will remain until reboot.[/yellow]')
else:
# Successfully killed
pid_file.unlink(missing_ok=True)
except (ValueError, OSError) as e:
# Invalid PID file or permission error
pass
# Run on_CrawlEnd hooks
from archivebox.config.configset import get_config

View File

@@ -365,14 +365,11 @@ def run_hook(
# Old convention: __background in stem (for backwards compatibility)
is_background = '.bg.' in script.name or '__background' in script.stem
# Set up output files for ALL hooks - use hook-specific names to avoid conflicts
# when multiple hooks run in the same plugin directory
# e.g., on_Snapshot__20_chrome_tab.bg.js -> on_Snapshot__20_chrome_tab.bg.stdout.log
hook_basename = script.stem # e.g., "on_Snapshot__20_chrome_tab.bg"
stdout_file = output_dir / f'{hook_basename}.stdout.log'
stderr_file = output_dir / f'{hook_basename}.stderr.log'
pid_file = output_dir / f'{hook_basename}.pid'
cmd_file = output_dir / f'{hook_basename}.sh'
# Set up output files for ALL hooks (useful for debugging)
stdout_file = output_dir / 'stdout.log'
stderr_file = output_dir / 'stderr.log'
pid_file = output_dir / 'hook.pid'
cmd_file = output_dir / 'cmd.sh'
try:
# Write command script for validation
@@ -424,14 +421,8 @@ def run_hook(
# Detect new files created by the hook
files_after = set(output_dir.rglob('*')) if output_dir.exists() else set()
new_files = [str(f.relative_to(output_dir)) for f in (files_after - files_before) if f.is_file()]
# Exclude the log/pid/sh files themselves from new_files (hook-specific names)
hook_output_files = {
f'{hook_basename}.stdout.log',
f'{hook_basename}.stderr.log',
f'{hook_basename}.pid',
f'{hook_basename}.sh',
}
new_files = [f for f in new_files if f not in hook_output_files]
# Exclude the log files themselves from new_files
new_files = [f for f in new_files if f not in ('stdout.log', 'stderr.log', 'hook.pid')]
# Parse JSONL output from stdout
# Each line starting with { that has 'type' field is a record
@@ -1185,9 +1176,7 @@ def create_model_record(record: Dict[str, Any]) -> Any:
def process_hook_records(records: List[Dict[str, Any]], overrides: Dict[str, Any] = None) -> Dict[str, int]:
"""
Process JSONL records from hook output.
Uses Model.from_jsonl() which automatically filters by JSONL_TYPE.
Each model only processes records matching its type.
Dispatches to Model.from_jsonl() for each record type.
Args:
records: List of JSONL record dicts from result['records']
@@ -1196,250 +1185,51 @@ def process_hook_records(records: List[Dict[str, Any]], overrides: Dict[str, Any
Returns:
Dict with counts by record type
"""
from archivebox.core.models import Snapshot, Tag
from archivebox.machine.models import Binary, Machine
stats = {}
overrides = overrides or {}
# Filter out ArchiveResult records (they update the calling AR, not create new ones)
filtered_records = [r for r in records if r.get('type') != 'ArchiveResult']
for record in records:
record_type = record.get('type')
if not record_type:
continue
# Each model's from_jsonl() filters to only its own type
snapshots = Snapshot.from_jsonl(filtered_records, overrides)
tags = Tag.from_jsonl(filtered_records, overrides)
binaries = Binary.from_jsonl(filtered_records, overrides)
machines = Machine.from_jsonl(filtered_records, overrides)
return {
'Snapshot': len(snapshots),
'Tag': len(tags),
'Binary': len(binaries),
'Machine': len(machines),
}
def process_is_alive(pid_file: Path) -> bool:
"""
Check if process in PID file is still running.
Args:
pid_file: Path to hook.pid file
Returns:
True if process is alive, False otherwise
"""
if not pid_file.exists():
return False
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, 0) # Signal 0 = check if process exists without killing it
return True
except (OSError, ValueError):
return False
def kill_process(pid_file: Path, sig: int = signal.SIGTERM, validate: bool = True):
"""
Kill process in PID file with optional validation.
Args:
pid_file: Path to hook-specific .pid file (e.g., on_Snapshot__20_chrome_tab.bg.pid)
sig: Signal to send (default SIGTERM)
validate: If True, validate process identity before killing (default: True)
"""
from archivebox.misc.process_utils import safe_kill_process
if validate:
# Use safe kill with validation
# Derive cmd file from pid file: on_Snapshot__20_chrome_tab.bg.pid -> on_Snapshot__20_chrome_tab.bg.sh
cmd_file = pid_file.with_suffix('.sh')
safe_kill_process(pid_file, cmd_file, signal_num=sig)
else:
# Legacy behavior - kill without validation
if not pid_file.exists():
return
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, sig)
except (OSError, ValueError):
pass
def graceful_terminate_background_hooks(
output_dir: Path,
config: Dict[str, Any],
poll_interval: float = 0.5,
) -> Dict[str, Dict[str, Any]]:
"""
Gracefully terminate all background hooks in an output directory.
Termination strategy:
1. Send SIGTERM to all background hook processes (polite shutdown request)
2. For each hook, wait up to its plugin-specific timeout
3. Send SIGKILL to any hooks still running after their timeout expires
4. Reap each process with waitpid() to get exit code
5. Write returncode to .returncode file for update_from_output()
Args:
output_dir: Snapshot output directory containing plugin subdirs with .pid files
config: Merged config dict from get_config() for timeout lookup
poll_interval: Seconds between process liveness checks (default: 0.5s)
Returns:
Dict mapping hook names to result info:
{
'hook_name': {
'status': 'sigterm' | 'sigkill' | 'already_dead' | 'invalid',
'returncode': int or None,
'pid': int or None,
}
}
Example:
from archivebox.config.configset import get_config
config = get_config(crawl=my_crawl, snapshot=my_snapshot)
results = graceful_terminate_background_hooks(snapshot.OUTPUT_DIR, config)
# {'on_Snapshot__20_chrome_tab.bg': {'status': 'sigterm', 'returncode': 0, 'pid': 12345}}
"""
from archivebox.misc.process_utils import validate_pid_file
if not output_dir.exists():
return {}
results = {}
# Collect all pid files and their metadata
pid_files = list(output_dir.glob('**/*.pid'))
if not pid_files:
return {}
# Phase 1: Send SIGTERM to all background hook processes
active_hooks = [] # List of (pid_file, hook_name, plugin_name, timeout, pid)
for pid_file in pid_files:
hook_name = pid_file.stem # e.g., "on_Snapshot__20_chrome_tab.bg"
cmd_file = pid_file.with_suffix('.sh')
# Validate and get PID
if not validate_pid_file(pid_file, cmd_file):
results[hook_name] = {'status': 'invalid', 'returncode': None, 'pid': None}
pid_file.unlink(missing_ok=True)
# Skip ArchiveResult records (they update the calling ArchiveResult, not create new ones)
if record_type == 'ArchiveResult':
continue
try:
pid = int(pid_file.read_text().strip())
except (ValueError, OSError):
results[hook_name] = {'status': 'invalid', 'returncode': None, 'pid': None}
pid_file.unlink(missing_ok=True)
# Dispatch to appropriate model's from_jsonl() method
if record_type == 'Snapshot':
from archivebox.core.models import Snapshot
obj = Snapshot.from_jsonl(record.copy(), overrides)
if obj:
stats['Snapshot'] = stats.get('Snapshot', 0) + 1
elif record_type == 'Tag':
from archivebox.core.models import Tag
obj = Tag.from_jsonl(record.copy(), overrides)
if obj:
stats['Tag'] = stats.get('Tag', 0) + 1
elif record_type == 'Binary':
from archivebox.machine.models import Binary
obj = Binary.from_jsonl(record.copy(), overrides)
if obj:
stats['Binary'] = stats.get('Binary', 0) + 1
elif record_type == 'Machine':
from archivebox.machine.models import Machine
obj = Machine.from_jsonl(record.copy(), overrides)
if obj:
stats['Machine'] = stats.get('Machine', 0) + 1
else:
import sys
print(f"Warning: Unknown record type '{record_type}' from hook output", file=sys.stderr)
except Exception as e:
import sys
print(f"Warning: Failed to create {record_type}: {e}", file=sys.stderr)
continue
# Check if process is still alive
if not process_is_alive(pid_file):
# Process already dead - try to reap it and get exit code
returncode = _reap_process(pid)
results[hook_name] = {'status': 'already_dead', 'returncode': returncode, 'pid': pid}
_write_returncode_file(pid_file, returncode)
pid_file.unlink(missing_ok=True)
continue
# Get plugin name from parent directory (e.g., "chrome_session")
plugin_name = pid_file.parent.name
# Get plugin-specific timeout
plugin_config = get_plugin_special_config(plugin_name, config)
timeout = plugin_config['timeout']
# Send SIGTERM
try:
os.kill(pid, signal.SIGTERM)
except (OSError, ProcessLookupError):
returncode = _reap_process(pid)
results[hook_name] = {'status': 'already_dead', 'returncode': returncode, 'pid': pid}
_write_returncode_file(pid_file, returncode)
pid_file.unlink(missing_ok=True)
continue
active_hooks.append((pid_file, hook_name, plugin_name, timeout, pid))
# Phase 2: Wait for each hook's timeout, then SIGKILL if still running
for pid_file, hook_name, plugin_name, timeout, pid in active_hooks:
deadline = time.time() + timeout
exited_cleanly = False
# Poll until deadline or process exits
while time.time() < deadline:
if not process_is_alive(pid_file):
exited_cleanly = True
break
time.sleep(poll_interval)
if exited_cleanly:
# Process exited from SIGTERM - reap it to get exit code
returncode = _reap_process(pid)
results[hook_name] = {'status': 'sigterm', 'returncode': returncode, 'pid': pid}
else:
# Timeout expired, send SIGKILL
try:
os.kill(pid, signal.SIGKILL)
except (OSError, ProcessLookupError):
pass # Process died between check and kill
# Wait briefly for SIGKILL to take effect, then reap
time.sleep(0.1)
returncode = _reap_process(pid)
# returncode from SIGKILL is typically -9 (negative signal number)
results[hook_name] = {'status': 'sigkill', 'returncode': returncode, 'pid': pid}
# Write returncode file for update_from_output() to read
_write_returncode_file(pid_file, results[hook_name]['returncode'])
pid_file.unlink(missing_ok=True)
return results
def _reap_process(pid: int) -> Optional[int]:
"""
Reap a terminated process and return its exit code.
Uses os.waitpid() with WNOHANG to avoid blocking.
Returns None if process cannot be reaped (not a child, already reaped, etc).
"""
try:
# WNOHANG: return immediately if process hasn't exited
# We call this after we know process is dead, so it should return immediately
wpid, status = os.waitpid(pid, os.WNOHANG)
if wpid == 0:
# Process still running (shouldn't happen since we checked)
return None
if os.WIFEXITED(status):
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
# Killed by signal - return negative signal number (convention)
return -os.WTERMSIG(status)
return None
except ChildProcessError:
# Not our child process (was started by subprocess.Popen which already reaped it,
# or process was started by different parent). This is expected for hooks.
return None
except OSError:
return None
def _write_returncode_file(pid_file: Path, returncode: Optional[int]) -> None:
"""
Write returncode to a .returncode file next to the .pid file.
This allows update_from_output() to know the exit code even for background hooks.
"""
returncode_file = pid_file.with_suffix('.returncode')
try:
if returncode is not None:
returncode_file.write_text(str(returncode))
else:
# Unknown exit code - write empty file to indicate process was terminated
returncode_file.write_text('')
except OSError:
pass # Best effort
return stats

View File

@@ -0,0 +1,101 @@
# Generated on 2025-12-31
# Adds parent FK and process_type field to Process model
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('machine', '0001_initial'),
]
operations = [
migrations.SeparateDatabaseAndState(
database_operations=[
migrations.RunSQL(
sql="""
-- Add parent_id FK column to machine_process
ALTER TABLE machine_process ADD COLUMN parent_id TEXT REFERENCES machine_process(id) ON DELETE SET NULL;
CREATE INDEX IF NOT EXISTS machine_process_parent_id_idx ON machine_process(parent_id);
-- Add process_type column with default 'binary'
ALTER TABLE machine_process ADD COLUMN process_type VARCHAR(16) NOT NULL DEFAULT 'binary';
CREATE INDEX IF NOT EXISTS machine_process_process_type_idx ON machine_process(process_type);
-- Add composite index for parent + status queries
CREATE INDEX IF NOT EXISTS machine_process_parent_status_idx ON machine_process(parent_id, status);
-- Add composite index for machine + pid + started_at (for PID reuse protection)
CREATE INDEX IF NOT EXISTS machine_process_machine_pid_started_idx ON machine_process(machine_id, pid, started_at);
""",
# Migration is irreversible due to SQLite limitations
# SQLite doesn't support DROP COLUMN, would require table rebuild
reverse_sql=migrations.RunSQL.noop
),
],
state_operations=[
# Add parent FK
migrations.AddField(
model_name='process',
name='parent',
field=models.ForeignKey(
blank=True,
help_text='Parent process that spawned this one',
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='children',
to='machine.process',
),
),
# Add process_type field
migrations.AddField(
model_name='process',
name='process_type',
field=models.CharField(
choices=[
('cli', 'CLI Command'),
('supervisord', 'Supervisord Daemon'),
('orchestrator', 'Orchestrator'),
('worker', 'Worker Process'),
('hook', 'Hook Script'),
('binary', 'Binary Execution'),
],
default='binary',
help_text='Type of process in the execution hierarchy',
max_length=16,
),
),
# Add indexes - must match the SQL index names exactly
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['parent'],
name='machine_process_parent_id_idx',
),
),
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['process_type'],
name='machine_process_process_type_idx',
),
),
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['parent', 'status'],
name='machine_process_parent_status_idx',
),
),
migrations.AddIndex(
model_name='process',
index=models.Index(
fields=['machine', 'pid', 'started_at'],
name='machine_process_machine_pid_started_idx',
),
),
],
),
]

File diff suppressed because it is too large Load Diff

View File

@@ -70,15 +70,54 @@ def write_cmd_file(cmd_file: Path, cmd: list[str]):
pass
def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15) -> bool:
"""Kill process after validation. Returns True if killed."""
def safe_kill_process(pid_file: Path, cmd_file: Optional[Path] = None, signal_num: int = 15, timeout: float = 3.0) -> bool:
"""
Kill process after validation, with graceful wait and SIGKILL escalation.
Returns True only if process is confirmed dead (either already dead or killed successfully).
"""
import time
import signal
if not validate_pid_file(pid_file, cmd_file):
pid_file.unlink(missing_ok=True) # Clean stale file
return False
return True # Process already dead, consider it killed
try:
pid = int(pid_file.read_text().strip())
os.kill(pid, signal_num)
return True
except (OSError, ValueError, ProcessLookupError):
# Send initial signal (SIGTERM by default)
try:
os.kill(pid, signal_num)
except ProcessLookupError:
# Process already dead
return True
# Wait for process to terminate gracefully
start_time = time.time()
while time.time() - start_time < timeout:
try:
os.kill(pid, 0) # Check if process still exists
time.sleep(0.1)
except ProcessLookupError:
# Process terminated
return True
# Process didn't terminate, escalate to SIGKILL
try:
os.kill(pid, signal.SIGKILL)
time.sleep(0.5) # Brief wait after SIGKILL
# Verify it's dead
try:
os.kill(pid, 0)
# Process still alive after SIGKILL - this is unusual
return False
except ProcessLookupError:
# Process finally dead
return True
except ProcessLookupError:
# Process died between timeout and SIGKILL
return True
except (OSError, ValueError):
return False

View File

@@ -0,0 +1,21 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": false,
"required_plugins": ["chrome"],
"properties": {
"CAPTCHA2_ENABLED": {
"type": "boolean",
"default": true,
"x-aliases": ["USE_CAPTCHA2"],
"description": "Enable Captcha2 browser extension for CAPTCHA solving"
},
"CAPTCHA2_TIMEOUT": {
"type": "integer",
"default": 60,
"minimum": 5,
"x-fallback": "TIMEOUT",
"description": "Timeout for CAPTCHA solving in seconds"
}
}
}

View File

@@ -0,0 +1,121 @@
#!/usr/bin/env node
/**
* 2Captcha Extension Plugin
*
* Installs and configures the 2captcha Chrome extension for automatic
* CAPTCHA solving during page archiving.
*
* Extension: https://chromewebstore.google.com/detail/ifibfemgeogfhoebkmokieepdoobkbpo
* Documentation: https://2captcha.com/blog/how-to-use-2captcha-solver-extension-in-puppeteer
*
* Priority: 01 (early) - Must install before Chrome session starts at Crawl level
* Hook: on_Crawl (runs once per crawl, not per snapshot)
*
* Requirements:
* - API_KEY_2CAPTCHA environment variable must be set
* - Extension will automatically solve reCAPTCHA, hCaptcha, Cloudflare Turnstile, etc.
*/
const path = require('path');
const fs = require('fs');
// Import extension utilities
const extensionUtils = require('../chrome/chrome_utils.js');
// Extension metadata
const EXTENSION = {
webstore_id: 'ifibfemgeogfhoebkmokieepdoobkbpo',
name: 'captcha2',
};
// Get extensions directory from environment or use default
const EXTENSIONS_DIR = process.env.CHROME_EXTENSIONS_DIR ||
path.join(process.env.DATA_DIR || './data', 'personas', process.env.ACTIVE_PERSONA || 'Default', 'chrome_extensions');
/**
* Install and configure the 2captcha extension
*/
async function installCaptchaExtension() {
console.log('[*] Installing 2captcha extension...');
// Install the extension
const extension = await extensionUtils.loadOrInstallExtension(EXTENSION, EXTENSIONS_DIR);
if (!extension) {
console.error('[❌] Failed to install 2captcha extension');
return null;
}
// Check if API key is configured
const apiKey = process.env.API_KEY_2CAPTCHA;
if (!apiKey || apiKey === 'YOUR_API_KEY_HERE') {
console.warn('[⚠️] 2captcha extension installed but API_KEY_2CAPTCHA not configured');
console.warn('[⚠️] Set API_KEY_2CAPTCHA environment variable to enable automatic CAPTCHA solving');
} else {
console.log('[+] 2captcha extension installed and API key configured');
}
return extension;
}
/**
* Note: 2captcha configuration is now handled by chrome plugin
* during first-time browser setup to avoid repeated configuration on every snapshot.
* The API key is injected via chrome.storage API once per browser session.
*/
/**
* Main entry point - install extension before archiving
*/
async function main() {
// Check if extension is already cached
const cacheFile = path.join(EXTENSIONS_DIR, 'captcha2.extension.json');
if (fs.existsSync(cacheFile)) {
try {
const cached = JSON.parse(fs.readFileSync(cacheFile, 'utf-8'));
const manifestPath = path.join(cached.unpacked_path, 'manifest.json');
if (fs.existsSync(manifestPath)) {
console.log('[*] 2captcha extension already installed (using cache)');
return cached;
}
} catch (e) {
// Cache file corrupted, re-install
console.warn('[⚠️] Extension cache corrupted, re-installing...');
}
}
// Install extension
const extension = await installCaptchaExtension();
// Export extension metadata for chrome plugin to load
if (extension) {
// Write extension info to a cache file that chrome plugin can read
await fs.promises.mkdir(EXTENSIONS_DIR, { recursive: true });
await fs.promises.writeFile(
cacheFile,
JSON.stringify(extension, null, 2)
);
console.log(`[+] Extension metadata written to ${cacheFile}`);
}
return extension;
}
// Export functions for use by other plugins
module.exports = {
EXTENSION,
installCaptchaExtension,
};
// Run if executed directly
if (require.main === module) {
main().then(() => {
console.log('[✓] 2captcha extension setup complete');
process.exit(0);
}).catch(err => {
console.error('[❌] 2captcha extension setup failed:', err);
process.exit(1);
});
}

View File

@@ -0,0 +1,279 @@
#!/usr/bin/env node
/**
* 2Captcha Extension Configuration
*
* Configures the 2captcha extension with API key after Crawl-level Chrome session starts.
* Runs once per crawl to inject API key into extension storage.
*
* Priority: 11 (after chrome_launch at 20)
* Hook: on_Crawl (runs once per crawl, not per snapshot)
*
* Requirements:
* - API_KEY_2CAPTCHA environment variable must be set
* - chrome plugin must have loaded extensions (extensions.json must exist)
*/
const path = require('path');
const fs = require('fs');
// Add NODE_MODULES_DIR to module resolution paths if set
if (process.env.NODE_MODULES_DIR) module.paths.unshift(process.env.NODE_MODULES_DIR);
const puppeteer = require('puppeteer-core');
// Get crawl's chrome directory from environment variable set by hooks.py
function getCrawlChromeSessionDir() {
const crawlOutputDir = process.env.CRAWL_OUTPUT_DIR || '';
if (!crawlOutputDir) {
return null;
}
return path.join(crawlOutputDir, 'chrome');
}
const CHROME_SESSION_DIR = getCrawlChromeSessionDir() || '../chrome';
const CONFIG_MARKER = path.join(CHROME_SESSION_DIR, '.captcha2_configured');
// Get environment variable with default
function getEnv(name, defaultValue = '') {
return (process.env[name] || defaultValue).trim();
}
// Parse command line arguments
function parseArgs() {
const args = {};
process.argv.slice(2).forEach(arg => {
if (arg.startsWith('--')) {
const [key, ...valueParts] = arg.slice(2).split('=');
args[key.replace(/-/g, '_')] = valueParts.join('=') || true;
}
});
return args;
}
async function configure2Captcha() {
// Check if already configured in this session
if (fs.existsSync(CONFIG_MARKER)) {
console.error('[*] 2captcha already configured in this browser session');
return { success: true, skipped: true };
}
// Check if API key is set
const apiKey = getEnv('API_KEY_2CAPTCHA');
if (!apiKey || apiKey === 'YOUR_API_KEY_HERE') {
console.warn('[⚠️] 2captcha extension loaded but API_KEY_2CAPTCHA not configured');
console.warn('[⚠️] Set API_KEY_2CAPTCHA environment variable to enable automatic CAPTCHA solving');
return { success: false, error: 'API_KEY_2CAPTCHA not configured' };
}
// Load extensions metadata
const extensionsFile = path.join(CHROME_SESSION_DIR, 'extensions.json');
if (!fs.existsSync(extensionsFile)) {
return { success: false, error: 'extensions.json not found - chrome plugin must run first' };
}
const extensions = JSON.parse(fs.readFileSync(extensionsFile, 'utf-8'));
const captchaExt = extensions.find(ext => ext.name === 'captcha2');
if (!captchaExt) {
console.error('[*] 2captcha extension not installed, skipping configuration');
return { success: true, skipped: true };
}
console.error('[*] Configuring 2captcha extension with API key...');
try {
// Connect to the existing Chrome session via CDP
const cdpFile = path.join(CHROME_SESSION_DIR, 'cdp_url.txt');
if (!fs.existsSync(cdpFile)) {
return { success: false, error: 'CDP URL not found - chrome plugin must run first' };
}
const cdpUrl = fs.readFileSync(cdpFile, 'utf-8').trim();
const browser = await puppeteer.connect({ browserWSEndpoint: cdpUrl });
try {
// Method 1: Try to inject via extension background page
if (captchaExt.target && captchaExt.target_ctx) {
console.error('[*] Attempting to configure via extension background page...');
// Reconnect to the browser to get fresh target context
const targets = await browser.targets();
const extTarget = targets.find(t =>
t.url().startsWith(`chrome-extension://${captchaExt.id}`)
);
if (extTarget) {
const extContext = await extTarget.worker() || await extTarget.page();
if (extContext) {
await extContext.evaluate((key) => {
// Try all common storage patterns
if (typeof chrome !== 'undefined' && chrome.storage) {
chrome.storage.local.set({
apiKey: key,
api_key: key,
'2captcha_apikey': key,
apikey: key,
'solver-api-key': key,
});
chrome.storage.sync.set({
apiKey: key,
api_key: key,
'2captcha_apikey': key,
apikey: key,
'solver-api-key': key,
});
}
// Also try localStorage as fallback
if (typeof localStorage !== 'undefined') {
localStorage.setItem('apiKey', key);
localStorage.setItem('2captcha_apikey', key);
localStorage.setItem('solver-api-key', key);
}
}, apiKey);
console.error('[+] 2captcha API key configured successfully via background page');
// Mark as configured
fs.writeFileSync(CONFIG_MARKER, new Date().toISOString());
return { success: true, method: 'background_page' };
}
}
}
// Method 2: Try to configure via options page
console.error('[*] Attempting to configure via options page...');
const optionsUrl = `chrome-extension://${captchaExt.id}/options.html`;
const configPage = await browser.newPage();
try {
await configPage.goto(optionsUrl, { waitUntil: 'networkidle0', timeout: 10000 });
const configured = await configPage.evaluate((key) => {
// Try to find API key input field
const selectors = [
'input[name*="apikey" i]',
'input[id*="apikey" i]',
'input[name*="api-key" i]',
'input[id*="api-key" i]',
'input[name*="key" i]',
'input[placeholder*="api" i]',
'input[type="text"]',
];
for (const selector of selectors) {
const input = document.querySelector(selector);
if (input) {
input.value = key;
input.dispatchEvent(new Event('input', { bubbles: true }));
input.dispatchEvent(new Event('change', { bubbles: true }));
// Try to find and click save button
const saveSelectors = [
'button[type="submit"]',
'input[type="submit"]',
'button:contains("Save")',
'button:contains("Apply")',
];
for (const btnSel of saveSelectors) {
const btn = document.querySelector(btnSel);
if (btn) {
btn.click();
break;
}
}
// Also save to storage
if (typeof chrome !== 'undefined' && chrome.storage) {
chrome.storage.local.set({ apiKey: key, api_key: key, '2captcha_apikey': key });
chrome.storage.sync.set({ apiKey: key, api_key: key, '2captcha_apikey': key });
}
return true;
}
}
// Fallback: Just save to storage
if (typeof chrome !== 'undefined' && chrome.storage) {
chrome.storage.local.set({ apiKey: key, api_key: key, '2captcha_apikey': key });
chrome.storage.sync.set({ apiKey: key, api_key: key, '2captcha_apikey': key });
return true;
}
return false;
}, apiKey);
await configPage.close();
if (configured) {
console.error('[+] 2captcha API key configured successfully via options page');
// Mark as configured
fs.writeFileSync(CONFIG_MARKER, new Date().toISOString());
return { success: true, method: 'options_page' };
}
} catch (e) {
console.warn(`[⚠️] Failed to configure via options page: ${e.message}`);
try {
await configPage.close();
} catch (e2) {}
}
return { success: false, error: 'Could not configure via any method' };
} finally {
browser.disconnect();
}
} catch (e) {
return { success: false, error: `${e.name}: ${e.message}` };
}
}
async function main() {
const args = parseArgs();
const url = args.url;
const snapshotId = args.snapshot_id;
if (!url || !snapshotId) {
console.error('Usage: on_Snapshot__21_captcha2_config.js --url=<url> --snapshot-id=<uuid>');
process.exit(1);
}
const startTs = new Date();
let status = 'failed';
let error = '';
try {
const result = await configure2Captcha();
if (result.skipped) {
status = 'skipped';
} else if (result.success) {
status = 'succeeded';
} else {
status = 'failed';
error = result.error || 'Configuration failed';
}
} catch (e) {
error = `${e.name}: ${e.message}`;
status = 'failed';
}
const endTs = new Date();
const duration = (endTs - startTs) / 1000;
if (error) {
console.error(`ERROR: ${error}`);
}
// Config hooks don't emit JSONL - they're utility hooks for setup
// Exit code indicates success/failure
process.exit(status === 'succeeded' || status === 'skipped' ? 0 : 1);
}
main().catch(e => {
console.error(`Fatal error: ${e.message}`);
process.exit(1);
});

View File

@@ -0,0 +1,184 @@
"""
Unit tests for captcha2 plugin
Tests invoke the plugin hooks as external processes and verify outputs/side effects.
"""
import json
import os
import subprocess
import tempfile
from pathlib import Path
import pytest
PLUGIN_DIR = Path(__file__).parent.parent
INSTALL_SCRIPT = next(PLUGIN_DIR.glob('on_Crawl__*_captcha2.*'), None)
CONFIG_SCRIPT = next(PLUGIN_DIR.glob('on_Crawl__*_captcha2_config.*'), None)
def test_install_script_exists():
"""Verify install script exists"""
assert INSTALL_SCRIPT.exists(), f"Install script not found: {INSTALL_SCRIPT}"
def test_config_script_exists():
"""Verify config script exists"""
assert CONFIG_SCRIPT.exists(), f"Config script not found: {CONFIG_SCRIPT}"
def test_extension_metadata():
"""Test that captcha2 extension has correct metadata"""
with tempfile.TemporaryDirectory() as tmpdir:
env = os.environ.copy()
env["CHROME_EXTENSIONS_DIR"] = str(Path(tmpdir) / "chrome_extensions")
# Just check the script can be loaded
result = subprocess.run(
["node", "-e", f"const ext = require('{INSTALL_SCRIPT}'); console.log(JSON.stringify(ext.EXTENSION))"],
capture_output=True,
text=True,
env=env
)
assert result.returncode == 0, f"Failed to load extension metadata: {result.stderr}"
metadata = json.loads(result.stdout)
assert metadata["webstore_id"] == "ifibfemgeogfhoebkmokieepdoobkbpo"
assert metadata["name"] == "captcha2"
def test_install_creates_cache():
"""Test that install creates extension cache"""
with tempfile.TemporaryDirectory() as tmpdir:
ext_dir = Path(tmpdir) / "chrome_extensions"
ext_dir.mkdir(parents=True)
env = os.environ.copy()
env["CHROME_EXTENSIONS_DIR"] = str(ext_dir)
env["API_KEY_2CAPTCHA"] = "test_api_key"
# Run install script
result = subprocess.run(
["node", str(INSTALL_SCRIPT)],
capture_output=True,
text=True,
env=env,
timeout=60
)
# Check output mentions installation
assert "[*] Installing 2captcha extension" in result.stdout or "[*] 2captcha extension already installed" in result.stdout
# Check cache file was created
cache_file = ext_dir / "captcha2.extension.json"
assert cache_file.exists(), "Cache file should be created"
# Verify cache content
cache_data = json.loads(cache_file.read_text())
assert cache_data["webstore_id"] == "ifibfemgeogfhoebkmokieepdoobkbpo"
assert cache_data["name"] == "captcha2"
assert "unpacked_path" in cache_data
assert "version" in cache_data
def test_install_twice_uses_cache():
"""Test that running install twice uses existing cache on second run"""
with tempfile.TemporaryDirectory() as tmpdir:
ext_dir = Path(tmpdir) / "chrome_extensions"
ext_dir.mkdir(parents=True)
env = os.environ.copy()
env["CHROME_EXTENSIONS_DIR"] = str(ext_dir)
env["API_KEY_2CAPTCHA"] = "test_api_key"
# First install - downloads the extension
result1 = subprocess.run(
["node", str(INSTALL_SCRIPT)],
capture_output=True,
text=True,
env=env,
timeout=60
)
assert result1.returncode == 0, f"First install failed: {result1.stderr}"
# Verify cache was created
cache_file = ext_dir / "captcha2.extension.json"
assert cache_file.exists(), "Cache file should exist after first install"
# Second install - should use cache
result2 = subprocess.run(
["node", str(INSTALL_SCRIPT)],
capture_output=True,
text=True,
env=env,
timeout=30
)
assert result2.returncode == 0, f"Second install failed: {result2.stderr}"
# Second run should mention cache reuse
assert "already installed" in result2.stdout or "cache" in result2.stdout.lower() or result2.returncode == 0
def test_install_warns_without_api_key():
"""Test that install warns when API key not configured"""
with tempfile.TemporaryDirectory() as tmpdir:
ext_dir = Path(tmpdir) / "chrome_extensions"
ext_dir.mkdir(parents=True)
env = os.environ.copy()
env["CHROME_EXTENSIONS_DIR"] = str(ext_dir)
# Don't set API_KEY_2CAPTCHA
# Run install script
result = subprocess.run(
["node", str(INSTALL_SCRIPT)],
capture_output=True,
text=True,
env=env,
timeout=60
)
# Should warn about missing API key
combined_output = result.stdout + result.stderr
assert "API_KEY_2CAPTCHA not configured" in combined_output or "Set API_KEY_2CAPTCHA" in combined_output
def test_install_success_with_api_key():
"""Test that install succeeds when API key is configured"""
with tempfile.TemporaryDirectory() as tmpdir:
ext_dir = Path(tmpdir) / "chrome_extensions"
ext_dir.mkdir(parents=True)
env = os.environ.copy()
env["CHROME_EXTENSIONS_DIR"] = str(ext_dir)
env["API_KEY_2CAPTCHA"] = "test_valid_api_key_123"
# Run install script
result = subprocess.run(
["node", str(INSTALL_SCRIPT)],
capture_output=True,
text=True,
env=env,
timeout=60
)
# Should mention API key configured
combined_output = result.stdout + result.stderr
assert "API key configured" in combined_output or "API_KEY_2CAPTCHA" in combined_output
def test_config_script_structure():
"""Test that config script has proper structure"""
# Verify the script exists and contains expected markers
script_content = CONFIG_SCRIPT.read_text()
# Should mention configuration marker file
assert "CONFIG_MARKER" in script_content or "captcha2_configured" in script_content
# Should mention API key
assert "API_KEY_2CAPTCHA" in script_content
# Should have main function or be executable
assert "async function" in script_content or "main" in script_content

View File

@@ -0,0 +1,184 @@
#!/usr/bin/env python3
"""
Install hook for Chrome/Chromium and puppeteer-core.
Runs at crawl start to install/find Chromium and puppeteer-core.
Outputs JSONL for Binary and Machine config updates.
Respects CHROME_BINARY env var for custom binary paths.
Uses `npx @puppeteer/browsers install chromium@latest` and parses output.
NOTE: We use Chromium instead of Chrome because Chrome 137+ removed support for
--load-extension and --disable-extensions-except flags, which are needed for
loading unpacked extensions in headless mode.
"""
import os
import sys
import json
import subprocess
from pathlib import Path
def get_chrome_version(binary_path: str) -> str | None:
"""Get Chrome/Chromium version string."""
try:
result = subprocess.run(
[binary_path, '--version'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return None
def install_puppeteer_core() -> bool:
"""Install puppeteer-core to NODE_MODULES_DIR if not present."""
node_modules_dir = os.environ.get('NODE_MODULES_DIR', '').strip()
if not node_modules_dir:
# No isolated node_modules, skip (will use global)
return True
node_modules_path = Path(node_modules_dir)
if (node_modules_path / 'puppeteer-core').exists():
return True
# Get npm prefix from NODE_MODULES_DIR (parent of node_modules)
npm_prefix = node_modules_path.parent
try:
print(f"[*] Installing puppeteer-core to {npm_prefix}...", file=sys.stderr)
result = subprocess.run(
['npm', 'install', '--prefix', str(npm_prefix), 'puppeteer-core', '@puppeteer/browsers'],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
print(f"[+] puppeteer-core installed", file=sys.stderr)
return True
else:
print(f"[!] Failed to install puppeteer-core: {result.stderr}", file=sys.stderr)
return False
except Exception as e:
print(f"[!] Failed to install puppeteer-core: {e}", file=sys.stderr)
return False
def install_chromium() -> dict | None:
"""Install Chromium using @puppeteer/browsers and parse output for binary path.
Output format: "chromium@<version> <path_to_binary>"
e.g.: "chromium@1563294 /Users/x/.cache/puppeteer/chromium/.../Chromium"
Note: npx is fast when chromium is already cached - it returns the path without re-downloading.
"""
try:
print("[*] Installing Chromium via @puppeteer/browsers...", file=sys.stderr)
# Use --path to install to puppeteer's standard cache location
cache_path = os.path.expanduser('~/.cache/puppeteer')
result = subprocess.run(
['npx', '@puppeteer/browsers', 'install', 'chromium@1563297', f'--path={cache_path}'],
capture_output=True,
text=True,
stdin=subprocess.DEVNULL,
timeout=300
)
if result.returncode != 0:
print(f"[!] Failed to install Chromium: {result.stderr}", file=sys.stderr)
return None
# Parse output: "chromium@1563294 /path/to/Chromium"
output = result.stdout.strip()
parts = output.split(' ', 1)
if len(parts) != 2:
print(f"[!] Failed to parse install output: {output}", file=sys.stderr)
return None
version_str = parts[0] # "chromium@1563294"
binary_path = parts[1].strip()
if not binary_path or not os.path.exists(binary_path):
print(f"[!] Binary not found at: {binary_path}", file=sys.stderr)
return None
# Extract version number
version = version_str.split('@')[1] if '@' in version_str else None
print(f"[+] Chromium installed: {binary_path}", file=sys.stderr)
return {
'name': 'chromium',
'abspath': binary_path,
'version': version,
'binprovider': 'puppeteer',
}
except subprocess.TimeoutExpired:
print("[!] Chromium install timed out", file=sys.stderr)
except FileNotFoundError:
print("[!] npx not found - is Node.js installed?", file=sys.stderr)
except Exception as e:
print(f"[!] Failed to install Chromium: {e}", file=sys.stderr)
return None
def main():
# Install puppeteer-core if NODE_MODULES_DIR is set
install_puppeteer_core()
# Check if CHROME_BINARY is already set and valid
configured_binary = os.environ.get('CHROME_BINARY', '').strip()
if configured_binary and os.path.isfile(configured_binary) and os.access(configured_binary, os.X_OK):
version = get_chrome_version(configured_binary)
print(json.dumps({
'type': 'Binary',
'name': 'chromium',
'abspath': configured_binary,
'version': version,
'binprovider': 'env',
}))
sys.exit(0)
# Install/find Chromium via puppeteer
result = install_chromium()
if result and result.get('abspath'):
print(json.dumps({
'type': 'Binary',
'name': result['name'],
'abspath': result['abspath'],
'version': result['version'],
'binprovider': result['binprovider'],
}))
print(json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/CHROME_BINARY',
'value': result['abspath'],
}))
if result['version']:
print(json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/CHROMIUM_VERSION',
'value': result['version'],
}))
sys.exit(0)
else:
print("Chromium binary not found", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
"""
Validate and compute derived Chrome config values.
This hook runs early in the Crawl lifecycle to:
1. Auto-detect Chrome binary location
2. Compute sandbox settings based on Docker detection
3. Validate binary availability and version
4. Set computed env vars for subsequent hooks
Output:
- COMPUTED:KEY=VALUE lines that hooks.py parses and adds to env
- Binary JSONL records to stdout when binaries are found
"""
import json
import os
import sys
from abx_pkg import Binary, EnvProvider
# Chrome binary search order
CHROME_BINARY_NAMES = [
'chromium',
'chromium-browser',
'google-chrome',
'google-chrome-stable',
'chrome',
]
def get_env(name: str, default: str = '') -> str:
return os.environ.get(name, default).strip()
def get_env_bool(name: str, default: bool = False) -> bool:
val = get_env(name, '').lower()
if val in ('true', '1', 'yes', 'on'):
return True
if val in ('false', '0', 'no', 'off'):
return False
return default
def detect_docker() -> bool:
"""Detect if running inside Docker container."""
return (
os.path.exists('/.dockerenv') or
os.environ.get('IN_DOCKER', '').lower() in ('true', '1', 'yes') or
os.path.exists('/run/.containerenv')
)
def find_chrome_binary(configured: str, provider: EnvProvider) -> Binary | None:
"""Find Chrome binary using abx-pkg, checking configured path first."""
# Try configured binary first
if configured:
try:
binary = Binary(name=configured, binproviders=[provider]).load()
if binary.abspath:
return binary
except Exception:
pass
# Search common names
for name in CHROME_BINARY_NAMES:
try:
binary = Binary(name=name, binproviders=[provider]).load()
if binary.abspath:
return binary
except Exception:
continue
return None
def output_binary(binary: Binary, name: str):
"""Output Binary JSONL record to stdout."""
machine_id = os.environ.get('MACHINE_ID', '')
record = {
'type': 'Binary',
'name': name,
'abspath': str(binary.abspath),
'version': str(binary.version) if binary.version else '',
'sha256': binary.sha256 or '',
'binprovider': 'env',
'machine_id': machine_id,
}
print(json.dumps(record))
def main():
warnings = []
errors = []
computed = {}
# Get config values
chrome_binary = get_env('CHROME_BINARY', 'chromium')
chrome_sandbox = get_env_bool('CHROME_SANDBOX', True)
screenshot_enabled = get_env_bool('SCREENSHOT_ENABLED', True)
pdf_enabled = get_env_bool('PDF_ENABLED', True)
dom_enabled = get_env_bool('DOM_ENABLED', True)
# Compute USE_CHROME (derived from extractor enabled flags)
use_chrome = screenshot_enabled or pdf_enabled or dom_enabled
computed['USE_CHROME'] = str(use_chrome).lower()
# Detect Docker and adjust sandbox
in_docker = detect_docker()
computed['IN_DOCKER'] = str(in_docker).lower()
if in_docker and chrome_sandbox:
warnings.append(
"Running in Docker with CHROME_SANDBOX=true. "
"Chrome may fail to start. Consider setting CHROME_SANDBOX=false."
)
# Auto-disable sandbox in Docker unless explicitly set
if not get_env('CHROME_SANDBOX'):
computed['CHROME_SANDBOX'] = 'false'
# Find Chrome binary using abx-pkg
provider = EnvProvider()
if use_chrome:
chrome = find_chrome_binary(chrome_binary, provider)
if not chrome or not chrome.abspath:
errors.append(
f"Chrome binary not found (tried: {chrome_binary}). "
"Install Chrome/Chromium or set CHROME_BINARY path."
)
computed['CHROME_BINARY'] = ''
else:
computed['CHROME_BINARY'] = str(chrome.abspath)
computed['CHROME_VERSION'] = str(chrome.version) if chrome.version else 'unknown'
# Output Binary JSONL record for Chrome
output_binary(chrome, name='chrome')
# Check Node.js for Puppeteer
node_binary_name = get_env('NODE_BINARY', 'node')
try:
node = Binary(name=node_binary_name, binproviders=[provider]).load()
node_path = str(node.abspath) if node.abspath else ''
except Exception:
node = None
node_path = ''
if use_chrome and not node_path:
errors.append(
f"Node.js not found (tried: {node_binary_name}). "
"Install Node.js or set NODE_BINARY path for Puppeteer."
)
else:
computed['NODE_BINARY'] = node_path
if node and node.abspath:
# Output Binary JSONL record for Node
output_binary(node, name='node')
# Output computed values
for key, value in computed.items():
print(f"COMPUTED:{key}={value}")
for warning in warnings:
print(f"WARNING:{warning}", file=sys.stderr)
for error in errors:
print(f"ERROR:{error}", file=sys.stderr)
sys.exit(1 if errors else 0)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,245 @@
#!/usr/bin/env node
/**
* Launch a shared Chromium browser session for the entire crawl.
*
* This runs once per crawl and keeps Chromium alive for all snapshots to share.
* Each snapshot creates its own tab via on_Snapshot__20_chrome_tab.bg.js.
*
* NOTE: We use Chromium instead of Chrome because Chrome 137+ removed support for
* --load-extension and --disable-extensions-except flags.
*
* Usage: on_Crawl__20_chrome_launch.bg.js --crawl-id=<uuid> --source-url=<url>
* Output: Creates chrome/ directory under crawl output dir with:
* - cdp_url.txt: WebSocket URL for CDP connection
* - chrome.pid: Chromium process ID (for cleanup)
* - port.txt: Debug port number
* - extensions.json: Loaded extensions metadata
*
* Environment variables:
* NODE_MODULES_DIR: Path to node_modules directory for module resolution
* CHROME_BINARY: Path to Chromium binary (falls back to auto-detection)
* CHROME_RESOLUTION: Page resolution (default: 1440,2000)
* CHROME_HEADLESS: Run in headless mode (default: true)
* CHROME_CHECK_SSL_VALIDITY: Whether to check SSL certificates (default: true)
* CHROME_EXTENSIONS_DIR: Directory containing Chrome extensions
*/
// Add NODE_MODULES_DIR to module resolution paths if set
if (process.env.NODE_MODULES_DIR) {
module.paths.unshift(process.env.NODE_MODULES_DIR);
}
const fs = require('fs');
const path = require('path');
const puppeteer = require('puppeteer-core');
const {
findChromium,
launchChromium,
killChrome,
getEnv,
writePidWithMtime,
} = require('./chrome_utils.js');
// Extractor metadata
const PLUGIN_NAME = 'chrome_launch';
const OUTPUT_DIR = 'chrome';
// Global state for cleanup
let chromePid = null;
let browserInstance = null;
// Parse command line arguments
function parseArgs() {
const args = {};
process.argv.slice(2).forEach((arg) => {
if (arg.startsWith('--')) {
const [key, ...valueParts] = arg.slice(2).split('=');
args[key.replace(/-/g, '_')] = valueParts.join('=') || true;
}
});
return args;
}
// Cleanup handler for SIGTERM
async function cleanup() {
console.error('[*] Cleaning up Chrome session...');
// Try graceful browser close first
if (browserInstance) {
try {
console.error('[*] Closing browser gracefully...');
await browserInstance.close();
browserInstance = null;
console.error('[+] Browser closed gracefully');
} catch (e) {
console.error(`[!] Graceful close failed: ${e.message}`);
}
}
// Kill Chrome process
if (chromePid) {
await killChrome(chromePid, OUTPUT_DIR);
}
process.exit(0);
}
// Register signal handlers
process.on('SIGTERM', cleanup);
process.on('SIGINT', cleanup);
async function main() {
const args = parseArgs();
const crawlId = args.crawl_id;
try {
const binary = findChromium();
if (!binary) {
console.error('ERROR: Chromium binary not found');
console.error('DEPENDENCY_NEEDED=chromium');
console.error('BIN_PROVIDERS=puppeteer,env,playwright,apt,brew');
console.error('INSTALL_HINT=npx @puppeteer/browsers install chromium@latest');
process.exit(1);
}
// Get Chromium version
let version = '';
try {
const { execSync } = require('child_process');
version = execSync(`"${binary}" --version`, { encoding: 'utf8', timeout: 5000 })
.trim()
.slice(0, 64);
} catch (e) {}
console.error(`[*] Using browser: ${binary}`);
if (version) console.error(`[*] Version: ${version}`);
// Load installed extensions
const extensionsDir = getEnv('CHROME_EXTENSIONS_DIR') ||
path.join(getEnv('DATA_DIR', '.'), 'personas', getEnv('ACTIVE_PERSONA', 'Default'), 'chrome_extensions');
const installedExtensions = [];
const extensionPaths = [];
if (fs.existsSync(extensionsDir)) {
const files = fs.readdirSync(extensionsDir);
for (const file of files) {
if (file.endsWith('.extension.json')) {
try {
const extPath = path.join(extensionsDir, file);
const extData = JSON.parse(fs.readFileSync(extPath, 'utf-8'));
if (extData.unpacked_path && fs.existsSync(extData.unpacked_path)) {
installedExtensions.push(extData);
extensionPaths.push(extData.unpacked_path);
console.error(`[*] Loading extension: ${extData.name || file}`);
}
} catch (e) {
console.warn(`[!] Skipping invalid extension cache: ${file}`);
}
}
}
}
if (installedExtensions.length > 0) {
console.error(`[+] Found ${installedExtensions.length} extension(s) to load`);
}
// Write hook's own PID
const hookStartTime = Date.now() / 1000;
if (!fs.existsSync(OUTPUT_DIR)) {
fs.mkdirSync(OUTPUT_DIR, { recursive: true });
}
writePidWithMtime(path.join(OUTPUT_DIR, 'hook.pid'), process.pid, hookStartTime);
// Launch Chromium using consolidated function
const result = await launchChromium({
binary,
outputDir: OUTPUT_DIR,
extensionPaths,
});
if (!result.success) {
console.error(`ERROR: ${result.error}`);
process.exit(1);
}
chromePid = result.pid;
const cdpUrl = result.cdpUrl;
// Write extensions metadata
if (installedExtensions.length > 0) {
fs.writeFileSync(
path.join(OUTPUT_DIR, 'extensions.json'),
JSON.stringify(installedExtensions, null, 2)
);
}
// Connect puppeteer for extension verification
console.error(`[*] Connecting puppeteer to CDP...`);
const browser = await puppeteer.connect({
browserWSEndpoint: cdpUrl,
defaultViewport: null,
});
browserInstance = browser;
// Verify extensions loaded
if (extensionPaths.length > 0) {
await new Promise(r => setTimeout(r, 3000));
const targets = browser.targets();
console.error(`[*] All browser targets (${targets.length}):`);
for (const t of targets) {
console.error(` - ${t.type()}: ${t.url().slice(0, 80)}`);
}
const extTargets = targets.filter(t =>
t.url().startsWith('chrome-extension://') ||
t.type() === 'service_worker' ||
t.type() === 'background_page'
);
// Filter out built-in extensions
const builtinIds = [
'nkeimhogjdpnpccoofpliimaahmaaome',
'fignfifoniblkonapihmkfakmlgkbkcf',
'ahfgeienlihckogmohjhadlkjgocpleb',
'mhjfbmdgcfjbbpaeojofohoefgiehjai',
];
const customExtTargets = extTargets.filter(t => {
const url = t.url();
if (!url.startsWith('chrome-extension://')) return false;
const extId = url.split('://')[1].split('/')[0];
return !builtinIds.includes(extId);
});
console.error(`[+] Found ${customExtTargets.length} custom extension target(s)`);
for (const target of customExtTargets) {
const url = target.url();
const extId = url.split('://')[1].split('/')[0];
console.error(`[+] Extension loaded: ${extId} (${target.type()})`);
}
if (customExtTargets.length === 0 && extensionPaths.length > 0) {
console.error(`[!] Warning: No custom extensions detected. Extension loading may have failed.`);
console.error(`[!] Make sure you are using Chromium, not Chrome (Chrome 137+ removed --load-extension support)`);
}
}
console.error(`[+] Chromium session started for crawl ${crawlId}`);
console.error(`[+] CDP URL: ${cdpUrl}`);
console.error(`[+] PID: ${chromePid}`);
// Stay alive to handle cleanup on SIGTERM
console.log('[*] Chromium launch hook staying alive to handle cleanup...');
setInterval(() => {}, 1000000);
} catch (e) {
console.error(`ERROR: ${e.name}: ${e.message}`);
process.exit(1);
}
}
main().catch((e) => {
console.error(`Fatal error: ${e.message}`);
process.exit(1);
});

View File

@@ -0,0 +1,115 @@
#!/usr/bin/env node
/**
* I Still Don't Care About Cookies Extension Plugin
*
* Installs and configures the "I still don't care about cookies" Chrome extension
* for automatic cookie consent banner dismissal during page archiving.
*
* Extension: https://chromewebstore.google.com/detail/edibdbjcniadpccecjdfdjjppcpchdlm
*
* Priority: 02 (early) - Must install before Chrome session starts at Crawl level
* Hook: on_Crawl (runs once per crawl, not per snapshot)
*
* This extension automatically:
* - Dismisses cookie consent popups
* - Removes cookie banners
* - Accepts necessary cookies to proceed with browsing
* - Works on thousands of websites out of the box
*/
const path = require('path');
const fs = require('fs');
// Import extension utilities
const extensionUtils = require('../chrome/chrome_utils.js');
// Extension metadata
const EXTENSION = {
webstore_id: 'edibdbjcniadpccecjdfdjjppcpchdlm',
name: 'istilldontcareaboutcookies',
};
// Get extensions directory from environment or use default
const EXTENSIONS_DIR = process.env.CHROME_EXTENSIONS_DIR ||
path.join(process.env.DATA_DIR || './data', 'personas', process.env.ACTIVE_PERSONA || 'Default', 'chrome_extensions');
/**
* Install the I Still Don't Care About Cookies extension
*/
async function installCookiesExtension() {
console.log('[*] Installing I Still Don\'t Care About Cookies extension...');
// Install the extension
const extension = await extensionUtils.loadOrInstallExtension(EXTENSION, EXTENSIONS_DIR);
if (!extension) {
console.error('[❌] Failed to install I Still Don\'t Care About Cookies extension');
return null;
}
console.log('[+] I Still Don\'t Care About Cookies extension installed');
console.log('[+] Cookie banners will be automatically dismissed during archiving');
return extension;
}
/**
* Note: This extension works out of the box with no configuration needed.
* It automatically detects and dismisses cookie banners on page load.
*/
/**
* Main entry point - install extension before archiving
*/
async function main() {
// Check if extension is already cached
const cacheFile = path.join(EXTENSIONS_DIR, 'istilldontcareaboutcookies.extension.json');
if (fs.existsSync(cacheFile)) {
try {
const cached = JSON.parse(fs.readFileSync(cacheFile, 'utf-8'));
const manifestPath = path.join(cached.unpacked_path, 'manifest.json');
if (fs.existsSync(manifestPath)) {
console.log('[*] I Still Don\'t Care About Cookies extension already installed (using cache)');
return cached;
}
} catch (e) {
// Cache file corrupted, re-install
console.warn('[⚠️] Extension cache corrupted, re-installing...');
}
}
// Install extension
const extension = await installCookiesExtension();
// Export extension metadata for chrome plugin to load
if (extension) {
// Write extension info to a cache file that chrome plugin can read
await fs.promises.mkdir(EXTENSIONS_DIR, { recursive: true });
await fs.promises.writeFile(
cacheFile,
JSON.stringify(extension, null, 2)
);
console.log(`[+] Extension metadata written to ${cacheFile}`);
}
return extension;
}
// Export functions for use by other plugins
module.exports = {
EXTENSION,
installCookiesExtension,
};
// Run if executed directly
if (require.main === module) {
main().then(() => {
console.log('[✓] I Still Don\'t Care About Cookies extension setup complete');
process.exit(0);
}).catch(err => {
console.error('[❌] I Still Don\'t Care About Cookies extension setup failed:', err);
process.exit(1);
});
}

View File

@@ -0,0 +1,268 @@
#!/usr/bin/env node
/**
* SingleFile Extension Plugin
*
* Installs and uses the SingleFile Chrome extension for archiving complete web pages.
* Falls back to single-file-cli if the extension is not available.
*
* Extension: https://chromewebstore.google.com/detail/mpiodijhokgodhhofbcjdecpffjipkle
*
* Priority: 04 (early) - Must install before Chrome session starts at Crawl level
* Hook: on_Crawl (runs once per crawl, not per snapshot)
*
* This extension automatically:
* - Saves complete web pages as single HTML files
* - Inlines all resources (CSS, JS, images, fonts)
* - Preserves page fidelity better than wget/curl
* - Works with SPAs and dynamically loaded content
*/
const path = require('path');
const fs = require('fs');
const { promisify } = require('util');
const { exec } = require('child_process');
const execAsync = promisify(exec);
// Import extension utilities
const extensionUtils = require('../chrome/chrome_utils.js');
// Extension metadata
const EXTENSION = {
webstore_id: 'mpiodijhokgodhhofbcjdecpffjipkle',
name: 'singlefile',
};
// Get extensions directory from environment or use default
const EXTENSIONS_DIR = process.env.CHROME_EXTENSIONS_DIR ||
path.join(process.env.DATA_DIR || './data', 'personas', process.env.ACTIVE_PERSONA || 'Default', 'chrome_extensions');
const CHROME_DOWNLOADS_DIR = process.env.CHROME_DOWNLOADS_DIR ||
path.join(process.env.DATA_DIR || './data', 'personas', process.env.ACTIVE_PERSONA || 'Default', 'chrome_downloads');
const OUTPUT_DIR = '.';
const OUTPUT_FILE = 'singlefile.html';
/**
* Install the SingleFile extension
*/
async function installSinglefileExtension() {
console.log('[*] Installing SingleFile extension...');
// Install the extension
const extension = await extensionUtils.loadOrInstallExtension(EXTENSION, EXTENSIONS_DIR);
if (!extension) {
console.error('[❌] Failed to install SingleFile extension');
return null;
}
console.log('[+] SingleFile extension installed');
console.log('[+] Web pages will be saved as single HTML files');
return extension;
}
/**
* Wait for a specified amount of time
*/
function wait(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Save a page using the SingleFile extension
*
* @param {Object} page - Puppeteer page object
* @param {Object} extension - Extension metadata with dispatchAction method
* @param {Object} options - Additional options
* @returns {Promise<string|null>} - Path to saved file or null on failure
*/
async function saveSinglefileWithExtension(page, extension, options = {}) {
if (!extension || !extension.version) {
throw new Error('SingleFile extension not found or not loaded');
}
const url = await page.url();
// Check for unsupported URL schemes
const URL_SCHEMES_IGNORED = ['about', 'chrome', 'chrome-extension', 'data', 'javascript', 'blob'];
const scheme = url.split(':')[0];
if (URL_SCHEMES_IGNORED.includes(scheme)) {
console.log(`[⚠️] Skipping SingleFile for URL scheme: ${scheme}`);
return null;
}
// Ensure downloads directory exists
await fs.promises.mkdir(CHROME_DOWNLOADS_DIR, { recursive: true });
// Get list of existing files to ignore
const files_before = new Set(
(await fs.promises.readdir(CHROME_DOWNLOADS_DIR))
.filter(fn => fn.endsWith('.html'))
);
// Output directory is current directory (hook already runs in output dir)
const out_path = path.join(OUTPUT_DIR, OUTPUT_FILE);
console.log(`[🛠️] Saving SingleFile HTML using extension (${extension.id})...`);
// Bring page to front (extension action button acts on foreground tab)
await page.bringToFront();
// Trigger the extension's action (toolbar button click)
await extension.dispatchAction();
// Wait for file to appear in downloads directory
const check_delay = 3000; // 3 seconds
const max_tries = 10;
let files_new = [];
for (let attempt = 0; attempt < max_tries; attempt++) {
await wait(check_delay);
const files_after = (await fs.promises.readdir(CHROME_DOWNLOADS_DIR))
.filter(fn => fn.endsWith('.html'));
files_new = files_after.filter(file => !files_before.has(file));
if (files_new.length === 0) {
continue;
}
// Find the matching file by checking if it contains the URL in the HTML header
for (const file of files_new) {
const dl_path = path.join(CHROME_DOWNLOADS_DIR, file);
const dl_text = await fs.promises.readFile(dl_path, 'utf-8');
const dl_header = dl_text.split('meta charset')[0];
if (dl_header.includes(`url: ${url}`)) {
console.log(`[✍️] Moving SingleFile download from ${file} to ${out_path}`);
await fs.promises.rename(dl_path, out_path);
return out_path;
}
}
}
console.warn(`[❌] Couldn't find matching SingleFile HTML in ${CHROME_DOWNLOADS_DIR} after waiting ${(check_delay * max_tries) / 1000}s`);
console.warn(`[⚠️] New files found: ${files_new.join(', ')}`);
return null;
}
/**
* Save a page using single-file-cli (fallback method)
*
* @param {string} url - URL to archive
* @param {Object} options - Additional options
* @returns {Promise<string|null>} - Path to saved file or null on failure
*/
async function saveSinglefileWithCLI(url, options = {}) {
console.log('[*] Falling back to single-file-cli...');
// Find single-file binary
let binary = null;
try {
const { stdout } = await execAsync('which single-file');
binary = stdout.trim();
} catch (err) {
console.error('[❌] single-file-cli not found. Install with: npm install -g single-file-cli');
return null;
}
// Output directory is current directory (hook already runs in output dir)
const out_path = path.join(OUTPUT_DIR, OUTPUT_FILE);
// Build command
const cmd = [
binary,
'--browser-headless',
url,
out_path,
];
// Add optional args
if (options.userAgent) {
cmd.splice(2, 0, '--browser-user-agent', options.userAgent);
}
if (options.cookiesFile && fs.existsSync(options.cookiesFile)) {
cmd.splice(2, 0, '--browser-cookies-file', options.cookiesFile);
}
if (options.ignoreSSL) {
cmd.splice(2, 0, '--browser-ignore-insecure-certs');
}
// Execute
try {
const timeout = options.timeout || 120000;
await execAsync(cmd.join(' '), { timeout });
if (fs.existsSync(out_path) && fs.statSync(out_path).size > 0) {
console.log(`[+] SingleFile saved via CLI: ${out_path}`);
return out_path;
}
console.error('[❌] SingleFile CLI completed but no output file found');
return null;
} catch (err) {
console.error(`[❌] SingleFile CLI error: ${err.message}`);
return null;
}
}
/**
* Main entry point - install extension before archiving
*/
async function main() {
// Check if extension is already cached
const cacheFile = path.join(EXTENSIONS_DIR, 'singlefile.extension.json');
if (fs.existsSync(cacheFile)) {
try {
const cached = JSON.parse(fs.readFileSync(cacheFile, 'utf-8'));
const manifestPath = path.join(cached.unpacked_path, 'manifest.json');
if (fs.existsSync(manifestPath)) {
console.log('[*] SingleFile extension already installed (using cache)');
return cached;
}
} catch (e) {
// Cache file corrupted, re-install
console.warn('[⚠️] Extension cache corrupted, re-installing...');
}
}
// Install extension
const extension = await installSinglefileExtension();
// Export extension metadata for chrome plugin to load
if (extension) {
// Write extension info to a cache file that chrome plugin can read
await fs.promises.mkdir(EXTENSIONS_DIR, { recursive: true });
await fs.promises.writeFile(
cacheFile,
JSON.stringify(extension, null, 2)
);
console.log(`[+] Extension metadata written to ${cacheFile}`);
}
return extension;
}
// Export functions for use by other plugins
module.exports = {
EXTENSION,
installSinglefileExtension,
saveSinglefileWithExtension,
saveSinglefileWithCLI,
};
// Run if executed directly
if (require.main === module) {
main().then(() => {
console.log('[✓] SingleFile extension setup complete');
process.exit(0);
}).catch(err => {
console.error('[❌] SingleFile extension setup failed:', err);
process.exit(1);
});
}

View File

@@ -0,0 +1,116 @@
#!/usr/bin/env node
/**
* uBlock Origin Extension Plugin
*
* Installs and configures the uBlock Origin Chrome extension for ad blocking
* and privacy protection during page archiving.
*
* Extension: https://chromewebstore.google.com/detail/cjpalhdlnbpafiamejdnhcphjbkeiagm
*
* Priority: 03 (early) - Must install before Chrome session starts at Crawl level
* Hook: on_Crawl (runs once per crawl, not per snapshot)
*
* This extension automatically:
* - Blocks ads, trackers, and malware domains
* - Reduces page load time and bandwidth usage
* - Improves privacy during archiving
* - Removes clutter from archived pages
* - Uses efficient blocking with filter lists
*/
const path = require('path');
const fs = require('fs');
// Import extension utilities
const extensionUtils = require('../chrome/chrome_utils.js');
// Extension metadata
const EXTENSION = {
webstore_id: 'cjpalhdlnbpafiamejdnhcphjbkeiagm',
name: 'ublock',
};
// Get extensions directory from environment or use default
const EXTENSIONS_DIR = process.env.CHROME_EXTENSIONS_DIR ||
path.join(process.env.DATA_DIR || './data', 'personas', process.env.ACTIVE_PERSONA || 'Default', 'chrome_extensions');
/**
* Install the uBlock Origin extension
*/
async function installUblockExtension() {
console.log('[*] Installing uBlock Origin extension...');
// Install the extension
const extension = await extensionUtils.loadOrInstallExtension(EXTENSION, EXTENSIONS_DIR);
if (!extension) {
console.error('[❌] Failed to install uBlock Origin extension');
return null;
}
console.log('[+] uBlock Origin extension installed');
console.log('[+] Ads and trackers will be blocked during archiving');
return extension;
}
/**
* Note: uBlock Origin works automatically with default filter lists.
* No configuration needed - blocks ads, trackers, and malware domains out of the box.
*/
/**
* Main entry point - install extension before archiving
*/
async function main() {
// Check if extension is already cached
const cacheFile = path.join(EXTENSIONS_DIR, 'ublock.extension.json');
if (fs.existsSync(cacheFile)) {
try {
const cached = JSON.parse(fs.readFileSync(cacheFile, 'utf-8'));
const manifestPath = path.join(cached.unpacked_path, 'manifest.json');
if (fs.existsSync(manifestPath)) {
console.log('[*] uBlock Origin extension already installed (using cache)');
return cached;
}
} catch (e) {
// Cache file corrupted, re-install
console.warn('[⚠️] Extension cache corrupted, re-installing...');
}
}
// Install extension
const extension = await installUblockExtension();
// Export extension metadata for chrome plugin to load
if (extension) {
// Write extension info to a cache file that chrome plugin can read
await fs.promises.mkdir(EXTENSIONS_DIR, { recursive: true });
await fs.promises.writeFile(
cacheFile,
JSON.stringify(extension, null, 2)
);
console.log(`[+] Extension metadata written to ${cacheFile}`);
}
return extension;
}
// Export functions for use by other plugins
module.exports = {
EXTENSION,
installUblockExtension,
};
// Run if executed directly
if (require.main === module) {
main().then(() => {
console.log('[✓] uBlock Origin extension setup complete');
process.exit(0);
}).catch(err => {
console.error('[❌] uBlock Origin extension setup failed:', err);
process.exit(1);
});
}

View File

@@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""
Validate and compute derived wget config values.
This hook runs early in the Crawl lifecycle to:
1. Validate config values with warnings (not hard errors)
2. Compute derived values (USE_WGET from WGET_ENABLED)
3. Check binary availability and version
Output:
- COMPUTED:KEY=VALUE lines that hooks.py parses and adds to env
- Binary JSONL records to stdout when binaries are found
"""
import json
import os
import shutil
import subprocess
import sys
from abx_pkg import Binary, EnvProvider
# Read config from environment (already validated by JSONSchema)
def get_env(name: str, default: str = '') -> str:
return os.environ.get(name, default).strip()
def get_env_bool(name: str, default: bool = False) -> bool:
val = get_env(name, '').lower()
if val in ('true', '1', 'yes', 'on'):
return True
if val in ('false', '0', 'no', 'off'):
return False
return default
def get_env_int(name: str, default: int = 0) -> int:
try:
return int(get_env(name, str(default)))
except ValueError:
return default
def output_binary(binary: Binary, name: str):
"""Output Binary JSONL record to stdout."""
machine_id = os.environ.get('MACHINE_ID', '')
record = {
'type': 'Binary',
'name': name,
'abspath': str(binary.abspath),
'version': str(binary.version) if binary.version else '',
'sha256': binary.sha256 or '',
'binprovider': 'env',
'machine_id': machine_id,
}
print(json.dumps(record))
def main():
warnings = []
errors = []
computed = {}
# Get config values
wget_enabled = get_env_bool('WGET_ENABLED', True)
wget_save_warc = get_env_bool('WGET_SAVE_WARC', True)
wget_timeout = get_env_int('WGET_TIMEOUT') or get_env_int('TIMEOUT', 60)
wget_binary = get_env('WGET_BINARY', 'wget')
# Compute derived values (USE_WGET for backward compatibility)
use_wget = wget_enabled
computed['USE_WGET'] = str(use_wget).lower()
# Validate timeout with warning (not error)
if use_wget and wget_timeout < 20:
warnings.append(
f"WGET_TIMEOUT={wget_timeout} is very low. "
"wget may fail to archive sites if set to less than ~20 seconds. "
"Consider setting WGET_TIMEOUT=60 or higher."
)
# Check binary availability using abx-pkg
provider = EnvProvider()
try:
binary = Binary(name=wget_binary, binproviders=[provider]).load()
binary_path = str(binary.abspath) if binary.abspath else ''
except Exception:
binary = None
binary_path = ''
if not binary_path:
if use_wget:
errors.append(f"WGET_BINARY={wget_binary} not found. Install wget or set WGET_ENABLED=false.")
computed['WGET_BINARY'] = ''
else:
computed['WGET_BINARY'] = binary_path
wget_version = str(binary.version) if binary.version else 'unknown'
computed['WGET_VERSION'] = wget_version
# Output Binary JSONL record
output_binary(binary, name='wget')
# Check for compression support
if computed.get('WGET_BINARY'):
try:
result = subprocess.run(
[computed['WGET_BINARY'], '--compression=auto', '--help'],
capture_output=True, timeout=5
)
computed['WGET_AUTO_COMPRESSION'] = 'true' if result.returncode == 0 else 'false'
except Exception:
computed['WGET_AUTO_COMPRESSION'] = 'false'
# Output results
# Format: KEY=VALUE lines that hooks.py will parse and add to env
for key, value in computed.items():
print(f"COMPUTED:{key}={value}")
for warning in warnings:
print(f"WARNING:{warning}", file=sys.stderr)
for error in errors:
print(f"ERROR:{error}", file=sys.stderr)
# Exit with error if any hard errors
sys.exit(1 if errors else 0)
if __name__ == '__main__':
main()

View File

@@ -30,7 +30,7 @@ __package__ = 'archivebox.workers'
import os
import time
from typing import Type
from multiprocessing import Process
from multiprocessing import Process as MPProcess
from django.utils import timezone
@@ -38,12 +38,6 @@ from rich import print
from archivebox.misc.logging_util import log_worker_event
from .worker import Worker, CrawlWorker, SnapshotWorker, ArchiveResultWorker
from .pid_utils import (
write_pid_file,
remove_pid_file,
get_all_worker_pids,
cleanup_stale_pid_files,
)
def _run_orchestrator_process(exit_on_idle: bool) -> None:
@@ -78,6 +72,7 @@ class Orchestrator:
self.pid: int = os.getpid()
self.pid_file = None
self.idle_count: int = 0
self._last_cleanup_time: float = 0.0 # For throttling cleanup_stale_running()
def __repr__(self) -> str:
return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]'
@@ -85,16 +80,26 @@ class Orchestrator:
@classmethod
def is_running(cls) -> bool:
"""Check if an orchestrator is already running."""
workers = get_all_worker_pids('orchestrator')
return len(workers) > 0
from archivebox.machine.models import Process
# Clean up stale processes before counting
Process.cleanup_stale_running()
return Process.get_running_count(process_type=Process.TypeChoices.ORCHESTRATOR) > 0
def on_startup(self) -> None:
"""Called when orchestrator starts."""
self.pid = os.getpid()
self.pid_file = write_pid_file('orchestrator', worker_id=0)
from archivebox.machine.models import Process
# Clean up any stale PID files from previous runs
stale_count = cleanup_stale_pid_files()
self.pid = os.getpid()
# Register orchestrator process in database with explicit type
self.db_process = Process.current()
# Ensure the process type is correctly set to ORCHESTRATOR
if self.db_process.process_type != Process.TypeChoices.ORCHESTRATOR:
self.db_process.process_type = Process.TypeChoices.ORCHESTRATOR
self.db_process.save(update_fields=['process_type'])
# Clean up any stale Process records from previous runs
stale_count = Process.cleanup_stale_running()
# Collect startup metadata
metadata = {
@@ -112,11 +117,16 @@ class Orchestrator:
pid=self.pid,
metadata=metadata,
)
def on_shutdown(self, error: BaseException | None = None) -> None:
"""Called when orchestrator shuts down."""
if self.pid_file:
remove_pid_file(self.pid_file)
# Update Process record status
if hasattr(self, 'db_process') and self.db_process:
# KeyboardInterrupt is a graceful shutdown, not an error
self.db_process.exit_code = 1 if error and not isinstance(error, KeyboardInterrupt) else 0
self.db_process.status = self.db_process.StatusChoices.EXITED
self.db_process.ended_at = timezone.now()
self.db_process.save()
log_worker_event(
worker_type='Orchestrator',
@@ -125,10 +135,19 @@ class Orchestrator:
pid=self.pid,
error=error if error and not isinstance(error, KeyboardInterrupt) else None,
)
def get_total_worker_count(self) -> int:
"""Get total count of running workers across all types."""
cleanup_stale_pid_files()
from archivebox.machine.models import Process
import time
# Throttle cleanup to once every 30 seconds to avoid performance issues
CLEANUP_THROTTLE_SECONDS = 30
now = time.time()
if now - self._last_cleanup_time > CLEANUP_THROTTLE_SECONDS:
Process.cleanup_stale_running()
self._last_cleanup_time = now
return sum(len(W.get_running_workers()) for W in self.WORKER_TYPES)
def should_spawn_worker(self, WorkerClass: Type[Worker], queue_count: int) -> bool:
@@ -287,7 +306,7 @@ class Orchestrator:
Returns the PID of the new process.
"""
# Use module-level function to avoid pickle errors with local functions
proc = Process(
proc = MPProcess(
target=_run_orchestrator_process,
args=(self.exit_on_idle,),
name='orchestrator'

View File

@@ -1,191 +0,0 @@
"""
PID file utilities for tracking worker and orchestrator processes.
PID files are stored in data/tmp/workers/ and contain:
- Line 1: PID
- Line 2: Worker type (orchestrator, crawl, snapshot, archiveresult)
- Line 3: Extractor filter (optional, for archiveresult workers)
- Line 4: Started at ISO timestamp
"""
__package__ = 'archivebox.workers'
import os
import signal
from pathlib import Path
from datetime import datetime, timezone
from django.conf import settings
def get_pid_dir() -> Path:
"""Get the directory for PID files, creating it if needed."""
pid_dir = Path(settings.DATA_DIR) / 'tmp' / 'workers'
pid_dir.mkdir(parents=True, exist_ok=True)
return pid_dir
def write_pid_file(worker_type: str, worker_id: int = 0, extractor: str | None = None) -> Path:
"""
Write a PID file for the current process.
Returns the path to the PID file.
"""
pid_dir = get_pid_dir()
if worker_type == 'orchestrator':
pid_file = pid_dir / 'orchestrator.pid'
else:
pid_file = pid_dir / f'{worker_type}_worker_{worker_id}.pid'
content = f"{os.getpid()}\n{worker_type}\n{extractor or ''}\n{datetime.now(timezone.utc).isoformat()}\n"
pid_file.write_text(content)
return pid_file
def read_pid_file(path: Path) -> dict | None:
"""
Read and parse a PID file.
Returns dict with pid, worker_type, extractor, started_at or None if invalid.
"""
try:
if not path.exists():
return None
lines = path.read_text().strip().split('\n')
if len(lines) < 4:
return None
return {
'pid': int(lines[0]),
'worker_type': lines[1],
'extractor': lines[2] or None,
'started_at': datetime.fromisoformat(lines[3]),
'pid_file': path,
}
except (ValueError, IndexError, OSError):
return None
def remove_pid_file(path: Path) -> None:
"""Remove a PID file if it exists."""
try:
path.unlink(missing_ok=True)
except OSError:
pass
def is_process_alive(pid: int) -> bool:
"""Check if a process with the given PID is still running."""
try:
os.kill(pid, 0) # Signal 0 doesn't kill, just checks
return True
except (OSError, ProcessLookupError):
return False
def get_all_pid_files() -> list[Path]:
"""Get all PID files in the workers directory."""
pid_dir = get_pid_dir()
return list(pid_dir.glob('*.pid'))
def get_all_worker_pids(worker_type: str | None = None) -> list[dict]:
"""
Get info about all running workers.
Optionally filter by worker_type.
"""
workers = []
for pid_file in get_all_pid_files():
info = read_pid_file(pid_file)
if info is None:
continue
# Skip if process is dead
if not is_process_alive(info['pid']):
continue
# Filter by type if specified
if worker_type and info['worker_type'] != worker_type:
continue
workers.append(info)
return workers
def cleanup_stale_pid_files() -> int:
"""
Remove PID files for processes that are no longer running.
Returns the number of stale files removed.
"""
removed = 0
for pid_file in get_all_pid_files():
info = read_pid_file(pid_file)
if info is None:
# Invalid PID file, remove it
remove_pid_file(pid_file)
removed += 1
continue
if not is_process_alive(info['pid']):
remove_pid_file(pid_file)
removed += 1
return removed
def get_running_worker_count(worker_type: str) -> int:
"""Get the count of running workers of a specific type."""
return len(get_all_worker_pids(worker_type))
def get_next_worker_id(worker_type: str) -> int:
"""Get the next available worker ID for a given type."""
existing_ids = set()
for pid_file in get_all_pid_files():
# Parse worker ID from filename like "snapshot_worker_3.pid"
name = pid_file.stem
if name.startswith(f'{worker_type}_worker_'):
try:
worker_id = int(name.split('_')[-1])
existing_ids.add(worker_id)
except ValueError:
continue
# Find the lowest unused ID
next_id = 0
while next_id in existing_ids:
next_id += 1
return next_id
def stop_worker(pid: int, graceful: bool = True) -> bool:
"""
Stop a worker process.
If graceful=True, sends SIGTERM first, then SIGKILL after timeout.
Returns True if process was stopped.
"""
if not is_process_alive(pid):
return True
try:
if graceful:
os.kill(pid, signal.SIGTERM)
# Give it a moment to shut down
import time
for _ in range(10): # Wait up to 1 second
time.sleep(0.1)
if not is_process_alive(pid):
return True
# Force kill if still running
os.kill(pid, signal.SIGKILL)
else:
os.kill(pid, signal.SIGKILL)
return True
except (OSError, ProcessLookupError):
return True # Process already dead

View File

@@ -17,7 +17,7 @@ import traceback
from typing import ClassVar, Any
from datetime import timedelta
from pathlib import Path
from multiprocessing import Process, cpu_count
from multiprocessing import Process as MPProcess, cpu_count
from django.db.models import QuerySet
from django.utils import timezone
@@ -26,13 +26,6 @@ from django.conf import settings
from rich import print
from archivebox.misc.logging_util import log_worker_event
from .pid_utils import (
write_pid_file,
remove_pid_file,
get_all_worker_pids,
get_next_worker_id,
cleanup_stale_pid_files,
)
CPU_COUNT = cpu_count()
@@ -133,8 +126,15 @@ class Worker:
def on_startup(self) -> None:
"""Called when worker starts."""
from archivebox.machine.models import Process
self.pid = os.getpid()
self.pid_file = write_pid_file(self.name, self.worker_id)
# Register this worker process in the database
self.db_process = Process.current()
# Explicitly set process_type to WORKER to prevent mis-detection
if self.db_process.process_type != Process.TypeChoices.WORKER:
self.db_process.process_type = Process.TypeChoices.WORKER
self.db_process.save(update_fields=['process_type'])
# Determine worker type for logging
worker_type_name = self.__class__.__name__
@@ -160,9 +160,12 @@ class Worker:
def on_shutdown(self, error: BaseException | None = None) -> None:
"""Called when worker shuts down."""
# Remove PID file
if self.pid_file:
remove_pid_file(self.pid_file)
# Update Process record status
if hasattr(self, 'db_process') and self.db_process:
self.db_process.exit_code = 1 if error else 0
self.db_process.status = self.db_process.StatusChoices.EXITED
self.db_process.ended_at = timezone.now()
self.db_process.save()
# Determine worker type for logging
worker_type_name = self.__class__.__name__
@@ -288,11 +291,13 @@ class Worker:
Fork a new worker as a subprocess.
Returns the PID of the new process.
"""
from archivebox.machine.models import Process
if worker_id is None:
worker_id = get_next_worker_id(cls.name)
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
# Use module-level function for pickling compatibility
proc = Process(
proc = MPProcess(
target=_run_worker,
args=(cls.name, worker_id, daemon),
kwargs=kwargs,
@@ -304,15 +309,31 @@ class Worker:
return proc.pid
@classmethod
def get_running_workers(cls) -> list[dict]:
def get_running_workers(cls) -> list:
"""Get info about all running workers of this type."""
cleanup_stale_pid_files()
return get_all_worker_pids(cls.name)
from archivebox.machine.models import Process
Process.cleanup_stale_running()
# Convert Process objects to dicts to match the expected API contract
processes = Process.get_running(process_type=Process.TypeChoices.WORKER)
# Note: worker_id is not stored on Process model, it's dynamically generated
# We return process_id (UUID) and pid (OS process ID) instead
return [
{
'pid': p.pid,
'process_id': str(p.id), # UUID of Process record
'started_at': p.started_at.isoformat() if p.started_at else None,
'status': p.status,
}
for p in processes
]
@classmethod
def get_worker_count(cls) -> int:
"""Get count of running workers of this type."""
return len(cls.get_running_workers())
from archivebox.machine.models import Process
return Process.get_running_count(process_type=Process.TypeChoices.WORKER)
class CrawlWorker(Worker):
@@ -402,11 +423,13 @@ class ArchiveResultWorker(Worker):
@classmethod
def start(cls, worker_id: int | None = None, daemon: bool = False, plugin: str | None = None, **kwargs: Any) -> int:
"""Fork a new worker as subprocess with optional plugin filter."""
from archivebox.machine.models import Process
if worker_id is None:
worker_id = get_next_worker_id(cls.name)
worker_id = Process.get_next_worker_id(process_type=Process.TypeChoices.WORKER)
# Use module-level function for pickling compatibility
proc = Process(
proc = MPProcess(
target=_run_worker,
args=(cls.name, worker_id, daemon),
kwargs={'plugin': plugin, **kwargs},