Merge branch 'dev' into claude/refactor-process-management-WcQyZ

Resolved conflicts by keeping Process model changes and accepting dev changes for unrelated files. Ensured pid_utils.py remains deleted as intended by this PR.

Co-authored-by: Nick Sweeting <pirate@users.noreply.github.com>
This commit is contained in:
claude[bot]
2025-12-31 11:28:47 +00:00
68 changed files with 9076 additions and 1447 deletions

View File

@@ -27,36 +27,45 @@ class ArchiveBoxGroup(click.Group):
'init': 'archivebox.cli.archivebox_init.main',
'install': 'archivebox.cli.archivebox_install.main',
}
# Model commands (CRUD operations via subcommands)
model_commands = {
'crawl': 'archivebox.cli.archivebox_crawl.main',
'snapshot': 'archivebox.cli.archivebox_snapshot.main',
'archiveresult': 'archivebox.cli.archivebox_archiveresult.main',
'tag': 'archivebox.cli.archivebox_tag.main',
'binary': 'archivebox.cli.archivebox_binary.main',
'process': 'archivebox.cli.archivebox_process.main',
'machine': 'archivebox.cli.archivebox_machine.main',
}
archive_commands = {
# High-level commands
'add': 'archivebox.cli.archivebox_add.main',
'remove': 'archivebox.cli.archivebox_remove.main',
'run': 'archivebox.cli.archivebox_run.main',
'update': 'archivebox.cli.archivebox_update.main',
'search': 'archivebox.cli.archivebox_search.main',
'status': 'archivebox.cli.archivebox_status.main',
'config': 'archivebox.cli.archivebox_config.main',
'schedule': 'archivebox.cli.archivebox_schedule.main',
'server': 'archivebox.cli.archivebox_server.main',
'shell': 'archivebox.cli.archivebox_shell.main',
'manage': 'archivebox.cli.archivebox_manage.main',
# Worker/orchestrator commands
'orchestrator': 'archivebox.cli.archivebox_orchestrator.main',
# Introspection commands
'pluginmap': 'archivebox.cli.archivebox_pluginmap.main',
# Worker command
'worker': 'archivebox.cli.archivebox_worker.main',
# Task commands (called by workers as subprocesses)
'crawl': 'archivebox.cli.archivebox_crawl.main',
'snapshot': 'archivebox.cli.archivebox_snapshot.main',
'extract': 'archivebox.cli.archivebox_extract.main',
}
all_subcommands = {
**meta_commands,
**setup_commands,
**model_commands,
**archive_commands,
}
renamed_commands = {
'setup': 'install',
'list': 'search',
'import': 'add',
'archive': 'add',
'export': 'search',
# Old commands replaced by new model commands
'orchestrator': 'run',
'extract': 'archiveresult',
}
@classmethod
@@ -110,9 +119,9 @@ def cli(ctx, help=False):
if help or ctx.invoked_subcommand is None:
ctx.invoke(ctx.command.get_command(ctx, 'help'))
# if the subcommand is in the archive_commands dict and is not 'manage',
# if the subcommand is in archive_commands or model_commands,
# then we need to set up the django environment and check that we're in a valid data folder
if subcommand in ArchiveBoxGroup.archive_commands:
if subcommand in ArchiveBoxGroup.archive_commands or subcommand in ArchiveBoxGroup.model_commands:
# print('SETUP DJANGO AND CHECK DATA FOLDER')
try:
from archivebox.config.django import setup_django

View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
archivebox archiveresult <action> [args...] [--filters]
Manage ArchiveResult records (plugin extraction results).
Actions:
create - Create ArchiveResults for Snapshots (queue extractions)
list - List ArchiveResults as JSONL (with optional filters)
update - Update ArchiveResults from stdin JSONL
delete - Delete ArchiveResults from stdin JSONL
Examples:
# Create ArchiveResults for snapshots (queue for extraction)
archivebox snapshot list --status=queued | archivebox archiveresult create
archivebox archiveresult create --plugin=screenshot --snapshot-id=<uuid>
# List with filters
archivebox archiveresult list --status=failed
archivebox archiveresult list --plugin=screenshot --status=succeeded
# Update (reset failed extractions to queued)
archivebox archiveresult list --status=failed | archivebox archiveresult update --status=queued
# Delete
archivebox archiveresult list --plugin=singlefile | archivebox archiveresult delete --yes
# Re-run failed extractions
archivebox archiveresult list --status=failed | archivebox run
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox archiveresult'
import sys
from typing import Optional
import rich_click as click
from rich import print as rprint
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
# CREATE
# =============================================================================
def create_archiveresults(
snapshot_id: Optional[str] = None,
plugin: Optional[str] = None,
status: str = 'queued',
) -> int:
"""
Create ArchiveResults for Snapshots.
Reads Snapshot records from stdin and creates ArchiveResult entries.
Pass-through: Non-Snapshot/ArchiveResult records are output unchanged.
If --plugin is specified, only creates results for that plugin.
Otherwise, creates results for all pending plugins.
Exit codes:
0: Success
1: Failure
"""
from django.utils import timezone
from archivebox.misc.jsonl import read_stdin, write_record, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
from archivebox.core.models import Snapshot, ArchiveResult
is_tty = sys.stdout.isatty()
# If snapshot_id provided directly, use that
if snapshot_id:
try:
snapshots = [Snapshot.objects.get(id=snapshot_id)]
pass_through_records = []
except Snapshot.DoesNotExist:
rprint(f'[red]Snapshot not found: {snapshot_id}[/red]', file=sys.stderr)
return 1
else:
# Read from stdin
records = list(read_stdin())
if not records:
rprint('[yellow]No Snapshot records provided via stdin[/yellow]', file=sys.stderr)
return 1
# Separate snapshot records from pass-through records
snapshot_ids = []
pass_through_records = []
for record in records:
record_type = record.get('type', '')
if record_type == TYPE_SNAPSHOT:
# Pass through the Snapshot record itself
pass_through_records.append(record)
if record.get('id'):
snapshot_ids.append(record['id'])
elif record_type == TYPE_ARCHIVERESULT:
# ArchiveResult records: pass through if they have an id
if record.get('id'):
pass_through_records.append(record)
# If no id, we could create it, but for now just pass through
else:
pass_through_records.append(record)
elif record_type:
# Other typed records (Crawl, Tag, etc): pass through
pass_through_records.append(record)
elif record.get('id'):
# Untyped record with id - assume it's a snapshot ID
snapshot_ids.append(record['id'])
# Output pass-through records first
if not is_tty:
for record in pass_through_records:
write_record(record)
if not snapshot_ids:
if pass_through_records:
rprint(f'[dim]Passed through {len(pass_through_records)} records, no new snapshots to process[/dim]', file=sys.stderr)
return 0
rprint('[yellow]No valid Snapshot IDs in input[/yellow]', file=sys.stderr)
return 1
snapshots = list(Snapshot.objects.filter(id__in=snapshot_ids))
if not snapshots:
rprint('[yellow]No matching snapshots found[/yellow]', file=sys.stderr)
return 0 if pass_through_records else 1
created_count = 0
for snapshot in snapshots:
if plugin:
# Create for specific plugin only
result, created = ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin=plugin,
defaults={
'status': status,
'retry_at': timezone.now(),
}
)
if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]:
# Reset for retry
result.status = status
result.retry_at = timezone.now()
result.save()
if not is_tty:
write_record(result.to_json())
created_count += 1
else:
# Create all pending plugins
snapshot.create_pending_archiveresults()
for result in snapshot.archiveresult_set.filter(status=ArchiveResult.StatusChoices.QUEUED):
if not is_tty:
write_record(result.to_json())
created_count += 1
rprint(f'[green]Created/queued {created_count} archive results[/green]', file=sys.stderr)
return 0
# =============================================================================
# LIST
# =============================================================================
def list_archiveresults(
status: Optional[str] = None,
plugin: Optional[str] = None,
snapshot_id: Optional[str] = None,
limit: Optional[int] = None,
) -> int:
"""
List ArchiveResults as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.core.models import ArchiveResult
is_tty = sys.stdout.isatty()
queryset = ArchiveResult.objects.all().order_by('-start_ts')
# Apply filters
filter_kwargs = {
'status': status,
'plugin': plugin,
'snapshot_id': snapshot_id,
}
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
count = 0
for result in queryset:
if is_tty:
status_color = {
'queued': 'yellow',
'started': 'blue',
'succeeded': 'green',
'failed': 'red',
'skipped': 'dim',
'backoff': 'magenta',
}.get(result.status, 'dim')
rprint(f'[{status_color}]{result.status:10}[/{status_color}] {result.plugin:15} [dim]{result.id}[/dim] {result.snapshot.url[:40]}')
else:
write_record(result.to_json())
count += 1
rprint(f'[dim]Listed {count} archive results[/dim]', file=sys.stderr)
return 0
# =============================================================================
# UPDATE
# =============================================================================
def update_archiveresults(
status: Optional[str] = None,
) -> int:
"""
Update ArchiveResults from stdin JSONL.
Reads ArchiveResult records from stdin and applies updates.
Uses PATCH semantics - only specified fields are updated.
Exit codes:
0: Success
1: No input or error
"""
from django.utils import timezone
from archivebox.misc.jsonl import read_stdin, write_record
from archivebox.core.models import ArchiveResult
is_tty = sys.stdout.isatty()
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
updated_count = 0
for record in records:
result_id = record.get('id')
if not result_id:
continue
try:
result = ArchiveResult.objects.get(id=result_id)
# Apply updates from CLI flags
if status:
result.status = status
result.retry_at = timezone.now()
result.save()
updated_count += 1
if not is_tty:
write_record(result.to_json())
except ArchiveResult.DoesNotExist:
rprint(f'[yellow]ArchiveResult not found: {result_id}[/yellow]', file=sys.stderr)
continue
rprint(f'[green]Updated {updated_count} archive results[/green]', file=sys.stderr)
return 0
# =============================================================================
# DELETE
# =============================================================================
def delete_archiveresults(yes: bool = False, dry_run: bool = False) -> int:
"""
Delete ArchiveResults from stdin JSONL.
Requires --yes flag to confirm deletion.
Exit codes:
0: Success
1: No input or missing --yes flag
"""
from archivebox.misc.jsonl import read_stdin
from archivebox.core.models import ArchiveResult
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
result_ids = [r.get('id') for r in records if r.get('id')]
if not result_ids:
rprint('[yellow]No valid archive result IDs in input[/yellow]', file=sys.stderr)
return 1
results = ArchiveResult.objects.filter(id__in=result_ids)
count = results.count()
if count == 0:
rprint('[yellow]No matching archive results found[/yellow]', file=sys.stderr)
return 0
if dry_run:
rprint(f'[yellow]Would delete {count} archive results (dry run)[/yellow]', file=sys.stderr)
for result in results[:10]:
rprint(f' [dim]{result.id}[/dim] {result.plugin} {result.snapshot.url[:40]}', file=sys.stderr)
if count > 10:
rprint(f' ... and {count - 10} more', file=sys.stderr)
return 0
if not yes:
rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
return 1
# Perform deletion
deleted_count, _ = results.delete()
rprint(f'[green]Deleted {deleted_count} archive results[/green]', file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage ArchiveResult records (plugin extraction results)."""
pass
@main.command('create')
@click.option('--snapshot-id', help='Snapshot ID to create results for')
@click.option('--plugin', '-p', help='Plugin name (e.g., screenshot, singlefile)')
@click.option('--status', '-s', default='queued', help='Initial status (default: queued)')
def create_cmd(snapshot_id: Optional[str], plugin: Optional[str], status: str):
"""Create ArchiveResults for Snapshots from stdin JSONL."""
sys.exit(create_archiveresults(snapshot_id=snapshot_id, plugin=plugin, status=status))
@main.command('list')
@click.option('--status', '-s', help='Filter by status (queued, started, succeeded, failed, skipped)')
@click.option('--plugin', '-p', help='Filter by plugin name')
@click.option('--snapshot-id', help='Filter by snapshot ID')
@click.option('--limit', '-n', type=int, help='Limit number of results')
def list_cmd(status: Optional[str], plugin: Optional[str],
snapshot_id: Optional[str], limit: Optional[int]):
"""List ArchiveResults as JSONL."""
sys.exit(list_archiveresults(
status=status,
plugin=plugin,
snapshot_id=snapshot_id,
limit=limit,
))
@main.command('update')
@click.option('--status', '-s', help='Set status')
def update_cmd(status: Optional[str]):
"""Update ArchiveResults from stdin JSONL."""
sys.exit(update_archiveresults(status=status))
@main.command('delete')
@click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
@click.option('--dry-run', is_flag=True, help='Show what would be deleted')
def delete_cmd(yes: bool, dry_run: bool):
"""Delete ArchiveResults from stdin JSONL."""
sys.exit(delete_archiveresults(yes=yes, dry_run=dry_run))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,290 @@
#!/usr/bin/env python3
"""
archivebox binary <action> [args...] [--filters]
Manage Binary records (detected executables like chrome, wget, etc.).
Actions:
create - Create/register a Binary
list - List Binaries as JSONL (with optional filters)
update - Update Binaries from stdin JSONL
delete - Delete Binaries from stdin JSONL
Examples:
# List all binaries
archivebox binary list
# List specific binary
archivebox binary list --name=chrome
# List binaries with specific version
archivebox binary list --version__icontains=120
# Delete old binary entries
archivebox binary list --name=chrome | archivebox binary delete --yes
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox binary'
import sys
from typing import Optional
import rich_click as click
from rich import print as rprint
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
# CREATE
# =============================================================================
def create_binary(
name: str,
abspath: str,
version: str = '',
) -> int:
"""
Create/register a Binary.
Exit codes:
0: Success
1: Failure
"""
from archivebox.misc.jsonl import write_record
from archivebox.machine.models import Binary
is_tty = sys.stdout.isatty()
if not name or not abspath:
rprint('[red]Both --name and --abspath are required[/red]', file=sys.stderr)
return 1
try:
binary, created = Binary.objects.get_or_create(
name=name,
abspath=abspath,
defaults={'version': version}
)
if not is_tty:
write_record(binary.to_json())
if created:
rprint(f'[green]Created binary: {name} at {abspath}[/green]', file=sys.stderr)
else:
rprint(f'[dim]Binary already exists: {name} at {abspath}[/dim]', file=sys.stderr)
return 0
except Exception as e:
rprint(f'[red]Error creating binary: {e}[/red]', file=sys.stderr)
return 1
# =============================================================================
# LIST
# =============================================================================
def list_binaries(
name: Optional[str] = None,
abspath__icontains: Optional[str] = None,
version__icontains: Optional[str] = None,
limit: Optional[int] = None,
) -> int:
"""
List Binaries as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.machine.models import Binary
is_tty = sys.stdout.isatty()
queryset = Binary.objects.all().order_by('name', '-loaded_at')
# Apply filters
filter_kwargs = {
'name': name,
'abspath__icontains': abspath__icontains,
'version__icontains': version__icontains,
}
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
count = 0
for binary in queryset:
if is_tty:
rprint(f'[cyan]{binary.name:20}[/cyan] [dim]{binary.version:15}[/dim] {binary.abspath}')
else:
write_record(binary.to_json())
count += 1
rprint(f'[dim]Listed {count} binaries[/dim]', file=sys.stderr)
return 0
# =============================================================================
# UPDATE
# =============================================================================
def update_binaries(
version: Optional[str] = None,
abspath: Optional[str] = None,
) -> int:
"""
Update Binaries from stdin JSONL.
Reads Binary records from stdin and applies updates.
Uses PATCH semantics - only specified fields are updated.
Exit codes:
0: Success
1: No input or error
"""
from archivebox.misc.jsonl import read_stdin, write_record
from archivebox.machine.models import Binary
is_tty = sys.stdout.isatty()
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
updated_count = 0
for record in records:
binary_id = record.get('id')
if not binary_id:
continue
try:
binary = Binary.objects.get(id=binary_id)
# Apply updates from CLI flags
if version:
binary.version = version
if abspath:
binary.abspath = abspath
binary.save()
updated_count += 1
if not is_tty:
write_record(binary.to_json())
except Binary.DoesNotExist:
rprint(f'[yellow]Binary not found: {binary_id}[/yellow]', file=sys.stderr)
continue
rprint(f'[green]Updated {updated_count} binaries[/green]', file=sys.stderr)
return 0
# =============================================================================
# DELETE
# =============================================================================
def delete_binaries(yes: bool = False, dry_run: bool = False) -> int:
"""
Delete Binaries from stdin JSONL.
Requires --yes flag to confirm deletion.
Exit codes:
0: Success
1: No input or missing --yes flag
"""
from archivebox.misc.jsonl import read_stdin
from archivebox.machine.models import Binary
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
binary_ids = [r.get('id') for r in records if r.get('id')]
if not binary_ids:
rprint('[yellow]No valid binary IDs in input[/yellow]', file=sys.stderr)
return 1
binaries = Binary.objects.filter(id__in=binary_ids)
count = binaries.count()
if count == 0:
rprint('[yellow]No matching binaries found[/yellow]', file=sys.stderr)
return 0
if dry_run:
rprint(f'[yellow]Would delete {count} binaries (dry run)[/yellow]', file=sys.stderr)
for binary in binaries:
rprint(f' {binary.name} {binary.abspath}', file=sys.stderr)
return 0
if not yes:
rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
return 1
# Perform deletion
deleted_count, _ = binaries.delete()
rprint(f'[green]Deleted {deleted_count} binaries[/green]', file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage Binary records (detected executables)."""
pass
@main.command('create')
@click.option('--name', '-n', required=True, help='Binary name (e.g., chrome, wget)')
@click.option('--abspath', '-p', required=True, help='Absolute path to binary')
@click.option('--version', '-v', default='', help='Binary version')
def create_cmd(name: str, abspath: str, version: str):
"""Create/register a Binary."""
sys.exit(create_binary(name=name, abspath=abspath, version=version))
@main.command('list')
@click.option('--name', '-n', help='Filter by name')
@click.option('--abspath__icontains', help='Filter by path contains')
@click.option('--version__icontains', help='Filter by version contains')
@click.option('--limit', type=int, help='Limit number of results')
def list_cmd(name: Optional[str], abspath__icontains: Optional[str],
version__icontains: Optional[str], limit: Optional[int]):
"""List Binaries as JSONL."""
sys.exit(list_binaries(
name=name,
abspath__icontains=abspath__icontains,
version__icontains=version__icontains,
limit=limit,
))
@main.command('update')
@click.option('--version', '-v', help='Set version')
@click.option('--abspath', '-p', help='Set path')
def update_cmd(version: Optional[str], abspath: Optional[str]):
"""Update Binaries from stdin JSONL."""
sys.exit(update_binaries(version=version, abspath=abspath))
@main.command('delete')
@click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
@click.option('--dry-run', is_flag=True, help='Show what would be deleted')
def delete_cmd(yes: bool, dry_run: bool):
"""Delete Binaries from stdin JSONL."""
sys.exit(delete_binaries(yes=yes, dry_run=dry_run))
if __name__ == '__main__':
main()

View File

@@ -1,108 +1,153 @@
#!/usr/bin/env python3
"""
archivebox crawl [urls...] [--depth=N] [--tag=TAG]
archivebox crawl <action> [args...] [--filters]
Create Crawl jobs from URLs. Accepts URLs as arguments, from stdin, or via JSONL.
Does NOT immediately start the crawl - pipe to `archivebox snapshot` to process.
Manage Crawl records.
Input formats:
- Plain URLs (one per line)
- JSONL: {"url": "...", "depth": 1, "tags": "..."}
Output (JSONL):
{"type": "Crawl", "id": "...", "urls": "...", "status": "queued", ...}
Actions:
create - Create Crawl jobs from URLs
list - List Crawls as JSONL (with optional filters)
update - Update Crawls from stdin JSONL
delete - Delete Crawls from stdin JSONL
Examples:
# Create a crawl job
archivebox crawl https://example.com
# Create
archivebox crawl create https://example.com https://foo.com --depth=1
archivebox crawl create --tag=news https://example.com
# Create crawl with depth
archivebox crawl --depth=1 https://example.com
# List with filters
archivebox crawl list --status=queued
archivebox crawl list --urls__icontains=example.com
# Full pipeline: create crawl, create snapshots, run extractors
archivebox crawl https://example.com | archivebox snapshot | archivebox extract
# Update
archivebox crawl list --status=started | archivebox crawl update --status=queued
# Process existing Crawl by ID (runs the crawl state machine)
archivebox crawl 01234567-89ab-cdef-0123-456789abcdef
# Delete
archivebox crawl list --urls__icontains=spam.com | archivebox crawl delete --yes
# Full pipeline
archivebox crawl create https://example.com | archivebox snapshot create | archivebox run
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox crawl'
import sys
from typing import Optional
from typing import Optional, Iterable
import rich_click as click
from rich import print as rprint
from archivebox.cli.cli_utils import apply_filters
def create_crawls(
records: list,
# =============================================================================
# CREATE
# =============================================================================
def create_crawl(
urls: Iterable[str],
depth: int = 0,
tag: str = '',
status: str = 'queued',
created_by_id: Optional[int] = None,
) -> int:
"""
Create a single Crawl job from all input URLs.
Create a Crawl job from URLs.
Takes pre-read records, creates one Crawl with all URLs, outputs JSONL.
Does NOT start the crawl - just creates the job in QUEUED state.
Takes URLs as args or stdin, creates one Crawl with all URLs, outputs JSONL.
Pass-through: Records that are not URLs are output unchanged (for piping).
Exit codes:
0: Success
1: Failure
"""
from rich import print as rprint
from archivebox.misc.jsonl import write_record
from archivebox.misc.jsonl import read_args_or_stdin, write_record, TYPE_CRAWL
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.crawls.models import Crawl
created_by_id = created_by_id or get_or_create_system_user_pk()
is_tty = sys.stdout.isatty()
# Collect all input records
records = list(read_args_or_stdin(urls))
if not records:
rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
return 1
# Collect all URLs into a single newline-separated string
urls = []
# Separate pass-through records from URL records
url_list = []
pass_through_records = []
for record in records:
record_type = record.get('type', '')
# Pass-through: output records that aren't URL/Crawl types
if record_type and record_type != TYPE_CRAWL and not record.get('url') and not record.get('urls'):
pass_through_records.append(record)
continue
# Handle existing Crawl records (just pass through with id)
if record_type == TYPE_CRAWL and record.get('id'):
pass_through_records.append(record)
continue
# Collect URLs
url = record.get('url')
if url:
urls.append(url)
url_list.append(url)
if not urls:
# Handle 'urls' field (newline-separated)
urls_field = record.get('urls')
if urls_field:
for line in urls_field.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
url_list.append(line)
# Output pass-through records first
if not is_tty:
for record in pass_through_records:
write_record(record)
if not url_list:
if pass_through_records:
# If we had pass-through records but no URLs, that's OK
rprint(f'[dim]Passed through {len(pass_through_records)} records, no new URLs[/dim]', file=sys.stderr)
return 0
rprint('[red]No valid URLs found[/red]', file=sys.stderr)
return 1
try:
# Build crawl record with all URLs as newline-separated string
crawl_record = {
'urls': '\n'.join(urls),
'urls': '\n'.join(url_list),
'max_depth': depth,
'tags_str': tag,
'status': status,
'label': '',
}
crawl = Crawl.from_jsonl(crawl_record, overrides={'created_by_id': created_by_id})
crawl = Crawl.from_json(crawl_record, overrides={'created_by_id': created_by_id})
if not crawl:
rprint('[red]Failed to create crawl[/red]', file=sys.stderr)
return 1
# Output JSONL record (only when piped)
if not is_tty:
write_record(crawl.to_jsonl())
write_record(crawl.to_json())
rprint(f'[green]Created crawl with {len(urls)} URLs[/green]', file=sys.stderr)
rprint(f'[green]Created crawl with {len(url_list)} URLs[/green]', file=sys.stderr)
# If TTY, show human-readable output
if is_tty:
rprint(f' [dim]{crawl.id}[/dim]', file=sys.stderr)
for url in urls[:5]: # Show first 5 URLs
for url in url_list[:5]: # Show first 5 URLs
rprint(f' {url[:70]}', file=sys.stderr)
if len(urls) > 5:
rprint(f' ... and {len(urls) - 5} more', file=sys.stderr)
if len(url_list) > 5:
rprint(f' ... and {len(url_list) - 5} more', file=sys.stderr)
return 0
@@ -111,81 +156,217 @@ def create_crawls(
return 1
def process_crawl_by_id(crawl_id: str) -> int:
"""
Process a single Crawl by ID (used by workers).
# =============================================================================
# LIST
# =============================================================================
Triggers the Crawl's state machine tick() which will:
- Transition from queued -> started (creates root snapshot)
- Transition from started -> sealed (when all snapshots done)
def list_crawls(
status: Optional[str] = None,
urls__icontains: Optional[str] = None,
max_depth: Optional[int] = None,
limit: Optional[int] = None,
) -> int:
"""
from rich import print as rprint
List Crawls as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.crawls.models import Crawl
try:
crawl = Crawl.objects.get(id=crawl_id)
except Crawl.DoesNotExist:
rprint(f'[red]Crawl {crawl_id} not found[/red]', file=sys.stderr)
return 1
is_tty = sys.stdout.isatty()
rprint(f'[blue]Processing Crawl {crawl.id} (status={crawl.status})[/blue]', file=sys.stderr)
queryset = Crawl.objects.all().order_by('-created_at')
try:
crawl.sm.tick()
crawl.refresh_from_db()
rprint(f'[green]Crawl complete (status={crawl.status})[/green]', file=sys.stderr)
return 0
except Exception as e:
rprint(f'[red]Crawl error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
# Apply filters
filter_kwargs = {
'status': status,
'urls__icontains': urls__icontains,
'max_depth': max_depth,
}
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
count = 0
for crawl in queryset:
if is_tty:
status_color = {
'queued': 'yellow',
'started': 'blue',
'sealed': 'green',
}.get(crawl.status, 'dim')
url_preview = crawl.urls[:50].replace('\n', ' ')
rprint(f'[{status_color}]{crawl.status:8}[/{status_color}] [dim]{crawl.id}[/dim] {url_preview}...')
else:
write_record(crawl.to_json())
count += 1
rprint(f'[dim]Listed {count} crawls[/dim]', file=sys.stderr)
return 0
def is_crawl_id(value: str) -> bool:
"""Check if value looks like a Crawl UUID."""
import re
uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I)
if not uuid_pattern.match(value):
return False
# Verify it's actually a Crawl (not a Snapshot or other object)
# =============================================================================
# UPDATE
# =============================================================================
def update_crawls(
status: Optional[str] = None,
max_depth: Optional[int] = None,
) -> int:
"""
Update Crawls from stdin JSONL.
Reads Crawl records from stdin and applies updates.
Uses PATCH semantics - only specified fields are updated.
Exit codes:
0: Success
1: No input or error
"""
from django.utils import timezone
from archivebox.misc.jsonl import read_stdin, write_record
from archivebox.crawls.models import Crawl
return Crawl.objects.filter(id=value).exists()
is_tty = sys.stdout.isatty()
@click.command()
@click.option('--depth', '-d', type=int, default=0, help='Max depth for recursive crawling (default: 0, no recursion)')
@click.option('--tag', '-t', default='', help='Comma-separated tags to add to snapshots')
@click.argument('args', nargs=-1)
def main(depth: int, tag: str, args: tuple):
"""Create Crawl jobs from URLs, or process existing Crawls by ID"""
from archivebox.misc.jsonl import read_args_or_stdin
# Read all input
records = list(read_args_or_stdin(args))
records = list(read_stdin())
if not records:
from rich import print as rprint
rprint('[yellow]No URLs or Crawl IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
sys.exit(1)
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
# Check if input looks like existing Crawl IDs to process
# If ALL inputs are Crawl UUIDs, process them
all_are_crawl_ids = all(
is_crawl_id(r.get('id') or r.get('url', ''))
for r in records
)
updated_count = 0
for record in records:
crawl_id = record.get('id')
if not crawl_id:
continue
if all_are_crawl_ids:
# Process existing Crawls by ID
exit_code = 0
for record in records:
crawl_id = record.get('id') or record.get('url')
result = process_crawl_by_id(crawl_id)
if result != 0:
exit_code = result
sys.exit(exit_code)
else:
# Default behavior: create Crawl jobs from URLs
sys.exit(create_crawls(records, depth=depth, tag=tag))
try:
crawl = Crawl.objects.get(id=crawl_id)
# Apply updates from CLI flags
if status:
crawl.status = status
crawl.retry_at = timezone.now()
if max_depth is not None:
crawl.max_depth = max_depth
crawl.save()
updated_count += 1
if not is_tty:
write_record(crawl.to_json())
except Crawl.DoesNotExist:
rprint(f'[yellow]Crawl not found: {crawl_id}[/yellow]', file=sys.stderr)
continue
rprint(f'[green]Updated {updated_count} crawls[/green]', file=sys.stderr)
return 0
# =============================================================================
# DELETE
# =============================================================================
def delete_crawls(yes: bool = False, dry_run: bool = False) -> int:
"""
Delete Crawls from stdin JSONL.
Requires --yes flag to confirm deletion.
Exit codes:
0: Success
1: No input or missing --yes flag
"""
from archivebox.misc.jsonl import read_stdin
from archivebox.crawls.models import Crawl
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
crawl_ids = [r.get('id') for r in records if r.get('id')]
if not crawl_ids:
rprint('[yellow]No valid crawl IDs in input[/yellow]', file=sys.stderr)
return 1
crawls = Crawl.objects.filter(id__in=crawl_ids)
count = crawls.count()
if count == 0:
rprint('[yellow]No matching crawls found[/yellow]', file=sys.stderr)
return 0
if dry_run:
rprint(f'[yellow]Would delete {count} crawls (dry run)[/yellow]', file=sys.stderr)
for crawl in crawls:
url_preview = crawl.urls[:50].replace('\n', ' ')
rprint(f' [dim]{crawl.id}[/dim] {url_preview}...', file=sys.stderr)
return 0
if not yes:
rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
return 1
# Perform deletion
deleted_count, _ = crawls.delete()
rprint(f'[green]Deleted {deleted_count} crawls[/green]', file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage Crawl records."""
pass
@main.command('create')
@click.argument('urls', nargs=-1)
@click.option('--depth', '-d', type=int, default=0, help='Max crawl depth (default: 0)')
@click.option('--tag', '-t', default='', help='Comma-separated tags to add')
@click.option('--status', '-s', default='queued', help='Initial status (default: queued)')
def create_cmd(urls: tuple, depth: int, tag: str, status: str):
"""Create a Crawl job from URLs or stdin."""
sys.exit(create_crawl(urls, depth=depth, tag=tag, status=status))
@main.command('list')
@click.option('--status', '-s', help='Filter by status (queued, started, sealed)')
@click.option('--urls__icontains', help='Filter by URLs contains')
@click.option('--max-depth', type=int, help='Filter by max depth')
@click.option('--limit', '-n', type=int, help='Limit number of results')
def list_cmd(status: Optional[str], urls__icontains: Optional[str],
max_depth: Optional[int], limit: Optional[int]):
"""List Crawls as JSONL."""
sys.exit(list_crawls(
status=status,
urls__icontains=urls__icontains,
max_depth=max_depth,
limit=limit,
))
@main.command('update')
@click.option('--status', '-s', help='Set status')
@click.option('--max-depth', type=int, help='Set max depth')
def update_cmd(status: Optional[str], max_depth: Optional[int]):
"""Update Crawls from stdin JSONL."""
sys.exit(update_crawls(status=status, max_depth=max_depth))
@main.command('delete')
@click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
@click.option('--dry-run', is_flag=True, help='Show what would be deleted')
def delete_cmd(yes: bool, dry_run: bool):
"""Delete Crawls from stdin JSONL."""
sys.exit(delete_crawls(yes=yes, dry_run=dry_run))
if __name__ == '__main__':

View File

@@ -127,7 +127,7 @@ def init(force: bool=False, quick: bool=False, install: bool=False) -> None:
if pending_links:
for link_dict in pending_links.values():
Snapshot.from_jsonl(link_dict)
Snapshot.from_json(link_dict)
# Hint for orphaned snapshot directories
print()

View File

@@ -0,0 +1,99 @@
#!/usr/bin/env python3
"""
archivebox machine <action> [--filters]
Manage Machine records (system-managed, mostly read-only).
Machine records track the host machines where ArchiveBox runs.
They are created automatically by the system and are primarily for debugging.
Actions:
list - List Machines as JSONL (with optional filters)
Examples:
# List all machines
archivebox machine list
# List machines by hostname
archivebox machine list --hostname__icontains=myserver
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox machine'
import sys
from typing import Optional
import rich_click as click
from rich import print as rprint
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
# LIST
# =============================================================================
def list_machines(
hostname__icontains: Optional[str] = None,
os_platform: Optional[str] = None,
limit: Optional[int] = None,
) -> int:
"""
List Machines as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.machine.models import Machine
is_tty = sys.stdout.isatty()
queryset = Machine.objects.all().order_by('-created_at')
# Apply filters
filter_kwargs = {
'hostname__icontains': hostname__icontains,
'os_platform': os_platform,
}
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
count = 0
for machine in queryset:
if is_tty:
rprint(f'[cyan]{machine.hostname:30}[/cyan] [dim]{machine.os_platform:10}[/dim] {machine.id}')
else:
write_record(machine.to_json())
count += 1
rprint(f'[dim]Listed {count} machines[/dim]', file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage Machine records (read-only, system-managed)."""
pass
@main.command('list')
@click.option('--hostname__icontains', help='Filter by hostname contains')
@click.option('--os-platform', help='Filter by OS platform')
@click.option('--limit', '-n', type=int, help='Limit number of results')
def list_cmd(hostname__icontains: Optional[str], os_platform: Optional[str], limit: Optional[int]):
"""List Machines as JSONL."""
sys.exit(list_machines(
hostname__icontains=hostname__icontains,
os_platform=os_platform,
limit=limit,
))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,356 @@
#!/usr/bin/env python3
__package__ = 'archivebox.cli'
from typing import Optional
from pathlib import Path
import rich_click as click
from archivebox.misc.util import docstring, enforce_types
# State Machine ASCII Art Diagrams
CRAWL_MACHINE_DIAGRAM = """
┌─────────────────────────────────────────────────────────────────────────────┐
│ CrawlMachine │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ QUEUED │◄────────────────┐ │
│ │ (initial) │ │ │
│ └──────┬──────┘ │ │
│ │ │ tick() unless can_start() │
│ │ tick() when │ │
│ │ can_start() │ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ STARTED │─────────────────┘ │
│ │ │◄────────────────┐ │
│ │ enter: │ │ │
│ │ crawl.run()│ │ tick() unless is_finished() │
│ │ (discover │ │ │
│ │ Crawl │─────────────────┘ │
│ │ hooks) │ │
│ └──────┬──────┘ │
│ │ │
│ │ tick() when is_finished() │
│ ▼ │
│ ┌─────────────┐ │
│ │ SEALED │ │
│ │ (final) │ │
│ │ │ │
│ │ enter: │ │
│ │ cleanup() │ │
│ └─────────────┘ │
│ │
│ Hooks triggered: on_Crawl__* (during STARTED.enter via crawl.run()) │
│ on_CrawlEnd__* (during SEALED.enter via cleanup()) │
└─────────────────────────────────────────────────────────────────────────────┘
"""
SNAPSHOT_MACHINE_DIAGRAM = """
┌─────────────────────────────────────────────────────────────────────────────┐
│ SnapshotMachine │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ QUEUED │◄────────────────┐ │
│ │ (initial) │ │ │
│ └──────┬──────┘ │ │
│ │ │ tick() unless can_start() │
│ │ tick() when │ │
│ │ can_start() │ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ STARTED │─────────────────┘ │
│ │ │◄────────────────┐ │
│ │ enter: │ │ │
│ │ snapshot │ │ tick() unless is_finished() │
│ │ .run() │ │ │
│ │ (discover │─────────────────┘ │
│ │ Snapshot │ │
│ │ hooks, │ │
│ │ create │ │
│ │ pending │ │
│ │ results) │ │
│ └──────┬──────┘ │
│ │ │
│ │ tick() when is_finished() │
│ ▼ │
│ ┌─────────────┐ │
│ │ SEALED │ │
│ │ (final) │ │
│ │ │ │
│ │ enter: │ │
│ │ cleanup() │ │
│ └─────────────┘ │
│ │
│ Hooks triggered: on_Snapshot__* (creates ArchiveResults in STARTED.enter) │
└─────────────────────────────────────────────────────────────────────────────┘
"""
ARCHIVERESULT_MACHINE_DIAGRAM = """
┌─────────────────────────────────────────────────────────────────────────────┐
│ ArchiveResultMachine │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ QUEUED │◄────────────────┐ │
│ │ (initial) │ │ │
│ └──────┬──────┘ │ │
│ │ │ tick() unless can_start() │
│ │ tick() when │ │
│ │ can_start() │ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ STARTED │─────────────────┘ │
│ │ │◄────────────────┐ │
│ │ enter: │ │ tick() unless is_finished() │
│ │ result.run()│─────────────────┘ │
│ │ (execute │ │
│ │ hook via │ │
│ │ run_hook())│ │
│ └──────┬──────┘ │
│ │ │
│ │ tick() checks status set by hook output │
│ ├────────────────┬────────────────┬────────────────┐ │
│ ▼ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ SUCCEEDED │ │ FAILED │ │ SKIPPED │ │ BACKOFF │ │
│ │ (final) │ │ (final) │ │ (final) │ │ │ │
│ └───────────┘ └───────────┘ └───────────┘ └─────┬─────┘ │
│ │ │
│ can_start()───┘ │
│ loops back to STARTED │
│ │
│ Each ArchiveResult runs ONE specific hook (stored in .hook_name field) │
└─────────────────────────────────────────────────────────────────────────────┘
"""
BINARY_MACHINE_DIAGRAM = """
┌─────────────────────────────────────────────────────────────────────────────┐
│ BinaryMachine │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ QUEUED │◄────────────────┐ │
│ │ (initial) │ │ │
│ └──────┬──────┘ │ │
│ │ │ tick() unless can_start() │
│ │ tick() when │ │
│ │ can_start() │ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ STARTED │─────────────────┘ │
│ │ │◄────────────────┐ │
│ │ enter: │ │ │
│ │ binary.run()│ │ tick() unless is_finished() │
│ │ (discover │─────────────────┘ │
│ │ Binary │ │
│ │ hooks, │ │
│ │ try each │ │
│ │ provider) │ │
│ └──────┬──────┘ │
│ │ │
│ │ tick() checks status set by hook output │
│ ├────────────────────────────────┐ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ SUCCEEDED │ │ FAILED │ │
│ │ (final) │ │ (final) │ │
│ │ │ │ │ │
│ │ abspath, │ │ no provider │ │
│ │ version set │ │ succeeded │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ Hooks triggered: on_Binary__* (provider hooks during STARTED.enter) │
│ Providers tried in sequence until one succeeds: apt, brew, pip, npm, etc. │
└─────────────────────────────────────────────────────────────────────────────┘
"""
@enforce_types
def pluginmap(
show_disabled: bool = False,
model: Optional[str] = None,
quiet: bool = False,
) -> dict:
"""
Show a map of all state machines and their associated plugin hooks.
Displays ASCII art diagrams of the core model state machines (Crawl, Snapshot,
ArchiveResult, Binary) and lists all auto-detected on_Modelname_xyz hooks
that will run for each model's transitions.
"""
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich import box
from archivebox.hooks import (
discover_hooks,
extract_step,
is_background_hook,
BUILTIN_PLUGINS_DIR,
USER_PLUGINS_DIR,
)
console = Console()
prnt = console.print
# Model event types that can have hooks
model_events = {
'Crawl': {
'description': 'Hooks run when a Crawl starts (QUEUED→STARTED)',
'machine': 'CrawlMachine',
'diagram': CRAWL_MACHINE_DIAGRAM,
},
'CrawlEnd': {
'description': 'Hooks run when a Crawl finishes (STARTED→SEALED)',
'machine': 'CrawlMachine',
'diagram': None, # Part of CrawlMachine
},
'Snapshot': {
'description': 'Hooks run for each Snapshot (creates ArchiveResults)',
'machine': 'SnapshotMachine',
'diagram': SNAPSHOT_MACHINE_DIAGRAM,
},
'Binary': {
'description': 'Hooks for installing binary dependencies (providers)',
'machine': 'BinaryMachine',
'diagram': BINARY_MACHINE_DIAGRAM,
},
}
# Filter to specific model if requested
if model:
model = model.title()
if model not in model_events:
prnt(f'[red]Error: Unknown model "{model}". Available: {", ".join(model_events.keys())}[/red]')
return {}
model_events = {model: model_events[model]}
result = {
'models': {},
'plugins_dir': str(BUILTIN_PLUGINS_DIR),
'user_plugins_dir': str(USER_PLUGINS_DIR),
}
if not quiet:
prnt()
prnt('[bold cyan]ArchiveBox Plugin Map[/bold cyan]')
prnt(f'[dim]Built-in plugins: {BUILTIN_PLUGINS_DIR}[/dim]')
prnt(f'[dim]User plugins: {USER_PLUGINS_DIR}[/dim]')
prnt()
# Show diagrams first (unless quiet mode)
if not quiet:
# Show ArchiveResult diagram separately since it's different
prnt(Panel(
ARCHIVERESULT_MACHINE_DIAGRAM,
title='[bold green]ArchiveResultMachine[/bold green]',
border_style='green',
expand=False,
))
prnt()
for event_name, info in model_events.items():
# Discover hooks for this event
hooks = discover_hooks(event_name, filter_disabled=not show_disabled)
# Build hook info list
hook_infos = []
for hook_path in hooks:
# Get plugin name from parent directory (e.g., 'wget' from 'plugins/wget/on_Snapshot__61_wget.py')
plugin_name = hook_path.parent.name
step = extract_step(hook_path.name)
is_bg = is_background_hook(hook_path.name)
hook_infos.append({
'path': str(hook_path),
'name': hook_path.name,
'plugin': plugin_name,
'step': step,
'is_background': is_bg,
'extension': hook_path.suffix,
})
result['models'][event_name] = {
'description': info['description'],
'machine': info['machine'],
'hooks': hook_infos,
'hook_count': len(hook_infos),
}
if not quiet:
# Show diagram if this model has one
if info.get('diagram'):
prnt(Panel(
info['diagram'],
title=f'[bold green]{info["machine"]}[/bold green]',
border_style='green',
expand=False,
))
prnt()
# Create hooks table
table = Table(
title=f'[bold yellow]on_{event_name}__* Hooks[/bold yellow] ({len(hooks)} found)',
box=box.ROUNDED,
show_header=True,
header_style='bold magenta',
)
table.add_column('Step', justify='center', width=6)
table.add_column('Plugin', style='cyan', width=20)
table.add_column('Hook Name', style='green')
table.add_column('BG', justify='center', width=4)
table.add_column('Type', justify='center', width=5)
# Sort by step then by name
sorted_hooks = sorted(hook_infos, key=lambda h: (h['step'], h['name']))
for hook in sorted_hooks:
bg_marker = '[yellow]bg[/yellow]' if hook['is_background'] else ''
ext = hook['extension'].lstrip('.')
table.add_row(
str(hook['step']),
hook['plugin'],
hook['name'],
bg_marker,
ext,
)
prnt(table)
prnt()
prnt(f'[dim]{info["description"]}[/dim]')
prnt()
# Summary
if not quiet:
total_hooks = sum(m['hook_count'] for m in result['models'].values())
prnt(f'[bold]Total hooks discovered: {total_hooks}[/bold]')
prnt()
prnt('[dim]Hook naming convention: on_{Model}__{XX}_{description}[.bg].{ext}[/dim]')
prnt('[dim] - XX: Two-digit order (first digit = step 0-9)[/dim]')
prnt('[dim] - .bg: Background hook (non-blocking)[/dim]')
prnt('[dim] - ext: py, sh, or js[/dim]')
prnt()
return result
@click.command()
@click.option('--show-disabled', '-a', is_flag=True, help='Show hooks from disabled plugins too')
@click.option('--model', '-m', type=str, default=None, help='Filter to specific model (Crawl, Snapshot, Binary, CrawlEnd)')
@click.option('--quiet', '-q', is_flag=True, help='Output JSON only, no ASCII diagrams')
@docstring(pluginmap.__doc__)
def main(**kwargs):
import json
result = pluginmap(**kwargs)
if kwargs.get('quiet'):
print(json.dumps(result, indent=2))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""
archivebox process <action> [--filters]
Manage Process records (system-managed, mostly read-only).
Process records track executions of binaries during extraction.
They are created automatically by the system and are primarily for debugging.
Actions:
list - List Processes as JSONL (with optional filters)
Examples:
# List all processes
archivebox process list
# List processes by binary
archivebox process list --binary-name=chrome
# List recent processes
archivebox process list --limit=10
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox process'
import sys
from typing import Optional
import rich_click as click
from rich import print as rprint
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
# LIST
# =============================================================================
def list_processes(
binary_name: Optional[str] = None,
machine_id: Optional[str] = None,
limit: Optional[int] = None,
) -> int:
"""
List Processes as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.machine.models import Process
is_tty = sys.stdout.isatty()
queryset = Process.objects.all().select_related('binary', 'machine').order_by('-start_ts')
# Apply filters
filter_kwargs = {}
if binary_name:
filter_kwargs['binary__name'] = binary_name
if machine_id:
filter_kwargs['machine_id'] = machine_id
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
count = 0
for process in queryset:
if is_tty:
binary_name_str = process.binary.name if process.binary else 'unknown'
exit_code = process.returncode if process.returncode is not None else '?'
status_color = 'green' if process.returncode == 0 else 'red' if process.returncode else 'yellow'
rprint(f'[{status_color}]exit={exit_code:3}[/{status_color}] [cyan]{binary_name_str:15}[/cyan] [dim]{process.id}[/dim]')
else:
write_record(process.to_json())
count += 1
rprint(f'[dim]Listed {count} processes[/dim]', file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage Process records (read-only, system-managed)."""
pass
@main.command('list')
@click.option('--binary-name', '-b', help='Filter by binary name')
@click.option('--machine-id', '-m', help='Filter by machine ID')
@click.option('--limit', '-n', type=int, help='Limit number of results')
def list_cmd(binary_name: Optional[str], machine_id: Optional[str], limit: Optional[int]):
"""List Processes as JSONL."""
sys.exit(list_processes(
binary_name=binary_name,
machine_id=machine_id,
limit=limit,
))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,207 @@
#!/usr/bin/env python3
"""
archivebox run [--daemon]
Unified command for processing queued work.
Modes:
- With stdin JSONL: Process piped records, exit when complete
- Without stdin (TTY): Run orchestrator in foreground until killed
Examples:
# Run orchestrator in foreground (replaces `archivebox orchestrator`)
archivebox run
# Run as daemon (don't exit on idle)
archivebox run --daemon
# Process specific records (pipe any JSONL type, exits when done)
archivebox snapshot list --status=queued | archivebox run
archivebox archiveresult list --status=failed | archivebox run
archivebox crawl list --status=queued | archivebox run
# Mixed types work too
cat mixed_records.jsonl | archivebox run
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox run'
import sys
import rich_click as click
from rich import print as rprint
def process_stdin_records() -> int:
"""
Process JSONL records from stdin.
Create-or-update behavior:
- Records WITHOUT id: Create via Model.from_json(), then queue
- Records WITH id: Lookup existing, re-queue for processing
Outputs JSONL of all processed records (for chaining).
Handles any record type: Crawl, Snapshot, ArchiveResult.
Auto-cascades: Crawl → Snapshots → ArchiveResults.
Returns exit code (0 = success, 1 = error).
"""
from django.utils import timezone
from archivebox.misc.jsonl import read_stdin, write_record, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.crawls.models import Crawl
from archivebox.workers.orchestrator import Orchestrator
records = list(read_stdin())
is_tty = sys.stdout.isatty()
if not records:
return 0 # Nothing to process
created_by_id = get_or_create_system_user_pk()
queued_count = 0
output_records = []
for record in records:
record_type = record.get('type', '')
record_id = record.get('id')
try:
if record_type == TYPE_CRAWL:
if record_id:
# Existing crawl - re-queue
try:
crawl = Crawl.objects.get(id=record_id)
except Crawl.DoesNotExist:
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
else:
# New crawl - create it
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
if crawl:
crawl.retry_at = timezone.now()
if crawl.status not in [Crawl.StatusChoices.SEALED]:
crawl.status = Crawl.StatusChoices.QUEUED
crawl.save()
output_records.append(crawl.to_json())
queued_count += 1
elif record_type == TYPE_SNAPSHOT or (record.get('url') and not record_type):
if record_id:
# Existing snapshot - re-queue
try:
snapshot = Snapshot.objects.get(id=record_id)
except Snapshot.DoesNotExist:
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
else:
# New snapshot - create it
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
if snapshot:
snapshot.retry_at = timezone.now()
if snapshot.status not in [Snapshot.StatusChoices.SEALED]:
snapshot.status = Snapshot.StatusChoices.QUEUED
snapshot.save()
output_records.append(snapshot.to_json())
queued_count += 1
elif record_type == TYPE_ARCHIVERESULT:
if record_id:
# Existing archiveresult - re-queue
try:
archiveresult = ArchiveResult.objects.get(id=record_id)
except ArchiveResult.DoesNotExist:
archiveresult = ArchiveResult.from_json(record)
else:
# New archiveresult - create it
archiveresult = ArchiveResult.from_json(record)
if archiveresult:
archiveresult.retry_at = timezone.now()
if archiveresult.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED, ArchiveResult.StatusChoices.BACKOFF]:
archiveresult.status = ArchiveResult.StatusChoices.QUEUED
archiveresult.save()
output_records.append(archiveresult.to_json())
queued_count += 1
else:
# Unknown type - pass through
output_records.append(record)
except Exception as e:
rprint(f'[yellow]Error processing record: {e}[/yellow]', file=sys.stderr)
continue
# Output all processed records (for chaining)
if not is_tty:
for rec in output_records:
write_record(rec)
if queued_count == 0:
rprint('[yellow]No records to process[/yellow]', file=sys.stderr)
return 0
rprint(f'[blue]Processing {queued_count} records...[/blue]', file=sys.stderr)
# Run orchestrator until all queued work is done
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
return 0
def run_orchestrator(daemon: bool = False) -> int:
"""
Run the orchestrator process.
The orchestrator:
1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
2. Spawns worker processes when there is work to do
3. Monitors worker health and restarts failed workers
4. Exits when all queues are empty (unless --daemon)
Args:
daemon: Run forever (don't exit when idle)
Returns exit code (0 = success, 1 = error).
"""
from archivebox.workers.orchestrator import Orchestrator
if Orchestrator.is_running():
rprint('[yellow]Orchestrator is already running[/yellow]', file=sys.stderr)
return 0
try:
orchestrator = Orchestrator(exit_on_idle=not daemon)
orchestrator.runloop()
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
rprint(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
@click.command()
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
def main(daemon: bool):
"""
Process queued work.
When stdin is piped: Process those specific records and exit.
When run standalone: Run orchestrator in foreground.
"""
# Check if stdin has data (non-TTY means piped input)
if not sys.stdin.isatty():
sys.exit(process_stdin_records())
else:
sys.exit(run_orchestrator(daemon=daemon))
if __name__ == '__main__':
main()

View File

@@ -1,95 +1,63 @@
#!/usr/bin/env python3
"""
archivebox snapshot [urls_or_crawl_ids...] [--tag=TAG] [--plugins=NAMES]
archivebox snapshot <action> [args...] [--filters]
Create Snapshots from URLs or Crawl jobs. Accepts URLs, Crawl JSONL, or Crawl IDs.
Manage Snapshot records.
Input formats:
- Plain URLs (one per line)
- JSONL: {"type": "Crawl", "id": "...", "urls": "..."}
- JSONL: {"type": "Snapshot", "url": "...", "title": "...", "tags": "..."}
- Crawl UUIDs (one per line)
Output (JSONL):
{"type": "Snapshot", "id": "...", "url": "...", "status": "queued", ...}
Actions:
create - Create Snapshots from URLs or Crawl JSONL
list - List Snapshots as JSONL (with optional filters)
update - Update Snapshots from stdin JSONL
delete - Delete Snapshots from stdin JSONL
Examples:
# Create snapshots from URLs directly
archivebox snapshot https://example.com https://foo.com
# Create
archivebox snapshot create https://example.com --tag=news
archivebox crawl create https://example.com | archivebox snapshot create
# Pipe from crawl command
archivebox crawl https://example.com | archivebox snapshot
# List with filters
archivebox snapshot list --status=queued
archivebox snapshot list --url__icontains=example.com
# Chain with extract
archivebox crawl https://example.com | archivebox snapshot | archivebox extract
# Update
archivebox snapshot list --tag=old | archivebox snapshot update --tag=new
# Run specific plugins after creating snapshots
archivebox snapshot --plugins=screenshot,singlefile https://example.com
# Process existing Snapshot by ID
archivebox snapshot 01234567-89ab-cdef-0123-456789abcdef
# Delete
archivebox snapshot list --url__icontains=spam.com | archivebox snapshot delete --yes
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox snapshot'
import sys
from typing import Optional
from typing import Optional, Iterable
import rich_click as click
from rich import print as rprint
from archivebox.misc.util import docstring
from archivebox.cli.cli_utils import apply_filters
def process_snapshot_by_id(snapshot_id: str) -> int:
"""
Process a single Snapshot by ID (used by workers).
Triggers the Snapshot's state machine tick() which will:
- Transition from queued -> started (creates pending ArchiveResults)
- Transition from started -> sealed (when all ArchiveResults done)
"""
from rich import print as rprint
from archivebox.core.models import Snapshot
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
except Snapshot.DoesNotExist:
rprint(f'[red]Snapshot {snapshot_id} not found[/red]', file=sys.stderr)
return 1
rprint(f'[blue]Processing Snapshot {snapshot.id} {snapshot.url[:50]} (status={snapshot.status})[/blue]', file=sys.stderr)
try:
snapshot.sm.tick()
snapshot.refresh_from_db()
rprint(f'[green]Snapshot complete (status={snapshot.status})[/green]', file=sys.stderr)
return 0
except Exception as e:
rprint(f'[red]Snapshot error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
# =============================================================================
# CREATE
# =============================================================================
def create_snapshots(
args: tuple,
urls: Iterable[str],
tag: str = '',
plugins: str = '',
status: str = 'queued',
depth: int = 0,
created_by_id: Optional[int] = None,
) -> int:
"""
Create Snapshots from URLs, Crawl JSONL, or Crawl IDs.
Reads from args or stdin, creates Snapshot objects, outputs JSONL.
If --plugins is passed, also runs specified plugins (blocking).
Create Snapshots from URLs or stdin JSONL (Crawl or Snapshot records).
Pass-through: Records that are not Crawl/Snapshot/URL are output unchanged.
Exit codes:
0: Success
1: Failure
"""
from rich import print as rprint
from django.utils import timezone
from archivebox.misc.jsonl import (
read_args_or_stdin, write_record,
TYPE_SNAPSHOT, TYPE_CRAWL
@@ -102,7 +70,7 @@ def create_snapshots(
is_tty = sys.stdout.isatty()
# Collect all input records
records = list(read_args_or_stdin(args))
records = list(read_args_or_stdin(urls))
if not records:
rprint('[yellow]No URLs or Crawls provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
@@ -110,11 +78,17 @@ def create_snapshots(
# Process each record - handle Crawls and plain URLs/Snapshots
created_snapshots = []
pass_through_count = 0
for record in records:
record_type = record.get('type')
record_type = record.get('type', '')
try:
if record_type == TYPE_CRAWL:
# Pass through the Crawl record itself first
if not is_tty:
write_record(record)
# Input is a Crawl - get or create it, then create Snapshots for its URLs
crawl = None
crawl_id = record.get('id')
@@ -122,145 +96,295 @@ def create_snapshots(
try:
crawl = Crawl.objects.get(id=crawl_id)
except Crawl.DoesNotExist:
# Crawl doesn't exist, create it
crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id})
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
else:
# No ID, create new crawl
crawl = Crawl.from_jsonl(record, overrides={'created_by_id': created_by_id})
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
if not crawl:
continue
# Create snapshots for each URL in the crawl
for url in crawl.get_urls_list():
# Merge CLI tags with crawl tags
merged_tags = crawl.tags_str
if tag:
if merged_tags:
merged_tags = f"{merged_tags},{tag}"
else:
merged_tags = tag
merged_tags = f"{merged_tags},{tag}" if merged_tags else tag
snapshot_record = {
'url': url,
'tags': merged_tags,
'crawl_id': str(crawl.id),
'depth': 0,
'depth': depth,
'status': status,
}
snapshot = Snapshot.from_jsonl(snapshot_record, overrides={'created_by_id': created_by_id})
snapshot = Snapshot.from_json(snapshot_record, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
if not is_tty:
write_record(snapshot.to_jsonl())
write_record(snapshot.to_json())
elif record_type == TYPE_SNAPSHOT or record.get('url'):
# Input is a Snapshot or plain URL
# Add tags if provided via CLI
if tag and not record.get('tags'):
record['tags'] = tag
if status:
record['status'] = status
record['depth'] = record.get('depth', depth)
snapshot = Snapshot.from_jsonl(record, overrides={'created_by_id': created_by_id})
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
if not is_tty:
write_record(snapshot.to_jsonl())
write_record(snapshot.to_json())
else:
# Pass-through: output records we don't handle
if not is_tty:
write_record(record)
pass_through_count += 1
except Exception as e:
rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr)
continue
if not created_snapshots:
if pass_through_count > 0:
rprint(f'[dim]Passed through {pass_through_count} records, no new snapshots[/dim]', file=sys.stderr)
return 0
rprint('[red]No snapshots created[/red]', file=sys.stderr)
return 1
rprint(f'[green]Created {len(created_snapshots)} snapshots[/green]', file=sys.stderr)
# If TTY, show human-readable output
if is_tty:
for snapshot in created_snapshots:
rprint(f' [dim]{snapshot.id}[/dim] {snapshot.url[:60]}', file=sys.stderr)
# If --plugins is passed, create ArchiveResults and run the orchestrator
if plugins:
from archivebox.core.models import ArchiveResult
from archivebox.workers.orchestrator import Orchestrator
# Parse comma-separated plugins list
plugins_list = [p.strip() for p in plugins.split(',') if p.strip()]
# Create ArchiveResults for the specific plugins on each snapshot
for snapshot in created_snapshots:
for plugin_name in plugins_list:
result, created = ArchiveResult.objects.get_or_create(
snapshot=snapshot,
plugin=plugin_name,
defaults={
'status': ArchiveResult.StatusChoices.QUEUED,
'retry_at': timezone.now(),
}
)
if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]:
# Reset for retry
result.status = ArchiveResult.StatusChoices.QUEUED
result.retry_at = timezone.now()
result.save()
rprint(f'[blue]Running plugins: {plugins}...[/blue]', file=sys.stderr)
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
return 0
def is_snapshot_id(value: str) -> bool:
"""Check if value looks like a Snapshot UUID."""
import re
uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I)
if not uuid_pattern.match(value):
return False
# Verify it's actually a Snapshot (not a Crawl or other object)
# =============================================================================
# LIST
# =============================================================================
def list_snapshots(
status: Optional[str] = None,
url__icontains: Optional[str] = None,
url__istartswith: Optional[str] = None,
tag: Optional[str] = None,
crawl_id: Optional[str] = None,
limit: Optional[int] = None,
) -> int:
"""
List Snapshots as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.core.models import Snapshot
return Snapshot.objects.filter(id=value).exists()
is_tty = sys.stdout.isatty()
queryset = Snapshot.objects.all().order_by('-created_at')
# Apply filters
filter_kwargs = {
'status': status,
'url__icontains': url__icontains,
'url__istartswith': url__istartswith,
'crawl_id': crawl_id,
}
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
# Tag filter requires special handling (M2M)
if tag:
queryset = queryset.filter(tags__name__iexact=tag)
count = 0
for snapshot in queryset:
if is_tty:
status_color = {
'queued': 'yellow',
'started': 'blue',
'sealed': 'green',
}.get(snapshot.status, 'dim')
rprint(f'[{status_color}]{snapshot.status:8}[/{status_color}] [dim]{snapshot.id}[/dim] {snapshot.url[:60]}')
else:
write_record(snapshot.to_json())
count += 1
rprint(f'[dim]Listed {count} snapshots[/dim]', file=sys.stderr)
return 0
@click.command()
@click.option('--tag', '-t', default='', help='Comma-separated tags to add to each snapshot')
@click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run after creating snapshots (e.g., screenshot,singlefile)')
@click.argument('args', nargs=-1)
def main(tag: str, plugins: str, args: tuple):
"""Create Snapshots from URLs/Crawls, or process existing Snapshots by ID"""
from archivebox.misc.jsonl import read_args_or_stdin
# =============================================================================
# UPDATE
# =============================================================================
# Read all input
records = list(read_args_or_stdin(args))
def update_snapshots(
status: Optional[str] = None,
tag: Optional[str] = None,
) -> int:
"""
Update Snapshots from stdin JSONL.
Reads Snapshot records from stdin and applies updates.
Uses PATCH semantics - only specified fields are updated.
Exit codes:
0: Success
1: No input or error
"""
from django.utils import timezone
from archivebox.misc.jsonl import read_stdin, write_record
from archivebox.core.models import Snapshot
is_tty = sys.stdout.isatty()
records = list(read_stdin())
if not records:
from rich import print as rprint
rprint('[yellow]No URLs, Crawl IDs, or Snapshot IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
sys.exit(1)
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
# Check if input looks like existing Snapshot IDs to process
# If ALL inputs are UUIDs with no URL and exist as Snapshots, process them
all_are_snapshot_ids = all(
is_snapshot_id(r.get('id') or r.get('url', ''))
for r in records
if r.get('type') != 'Crawl' # Don't check Crawl records as Snapshot IDs
)
updated_count = 0
for record in records:
snapshot_id = record.get('id')
if not snapshot_id:
continue
# But also check that we're not receiving Crawl JSONL
has_crawl_records = any(r.get('type') == 'Crawl' for r in records)
try:
snapshot = Snapshot.objects.get(id=snapshot_id)
if all_are_snapshot_ids and not has_crawl_records:
# Process existing Snapshots by ID
exit_code = 0
for record in records:
snapshot_id = record.get('id') or record.get('url')
result = process_snapshot_by_id(snapshot_id)
if result != 0:
exit_code = result
sys.exit(exit_code)
else:
# Create new Snapshots from URLs or Crawls
sys.exit(create_snapshots(args, tag=tag, plugins=plugins))
# Apply updates from CLI flags (override stdin values)
if status:
snapshot.status = status
snapshot.retry_at = timezone.now()
if tag:
# Add tag to existing tags
snapshot.save() # Ensure saved before M2M
from archivebox.core.models import Tag
tag_obj, _ = Tag.objects.get_or_create(name=tag)
snapshot.tags.add(tag_obj)
snapshot.save()
updated_count += 1
if not is_tty:
write_record(snapshot.to_json())
except Snapshot.DoesNotExist:
rprint(f'[yellow]Snapshot not found: {snapshot_id}[/yellow]', file=sys.stderr)
continue
rprint(f'[green]Updated {updated_count} snapshots[/green]', file=sys.stderr)
return 0
# =============================================================================
# DELETE
# =============================================================================
def delete_snapshots(yes: bool = False, dry_run: bool = False) -> int:
"""
Delete Snapshots from stdin JSONL.
Requires --yes flag to confirm deletion.
Exit codes:
0: Success
1: No input or missing --yes flag
"""
from archivebox.misc.jsonl import read_stdin
from archivebox.core.models import Snapshot
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
snapshot_ids = [r.get('id') for r in records if r.get('id')]
if not snapshot_ids:
rprint('[yellow]No valid snapshot IDs in input[/yellow]', file=sys.stderr)
return 1
snapshots = Snapshot.objects.filter(id__in=snapshot_ids)
count = snapshots.count()
if count == 0:
rprint('[yellow]No matching snapshots found[/yellow]', file=sys.stderr)
return 0
if dry_run:
rprint(f'[yellow]Would delete {count} snapshots (dry run)[/yellow]', file=sys.stderr)
for snapshot in snapshots:
rprint(f' [dim]{snapshot.id}[/dim] {snapshot.url[:60]}', file=sys.stderr)
return 0
if not yes:
rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
return 1
# Perform deletion
deleted_count, _ = snapshots.delete()
rprint(f'[green]Deleted {deleted_count} snapshots[/green]', file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage Snapshot records."""
pass
@main.command('create')
@click.argument('urls', nargs=-1)
@click.option('--tag', '-t', default='', help='Comma-separated tags to add')
@click.option('--status', '-s', default='queued', help='Initial status (default: queued)')
@click.option('--depth', '-d', type=int, default=0, help='Crawl depth (default: 0)')
def create_cmd(urls: tuple, tag: str, status: str, depth: int):
"""Create Snapshots from URLs or stdin JSONL."""
sys.exit(create_snapshots(urls, tag=tag, status=status, depth=depth))
@main.command('list')
@click.option('--status', '-s', help='Filter by status (queued, started, sealed)')
@click.option('--url__icontains', help='Filter by URL contains')
@click.option('--url__istartswith', help='Filter by URL starts with')
@click.option('--tag', '-t', help='Filter by tag name')
@click.option('--crawl-id', help='Filter by crawl ID')
@click.option('--limit', '-n', type=int, help='Limit number of results')
def list_cmd(status: Optional[str], url__icontains: Optional[str], url__istartswith: Optional[str],
tag: Optional[str], crawl_id: Optional[str], limit: Optional[int]):
"""List Snapshots as JSONL."""
sys.exit(list_snapshots(
status=status,
url__icontains=url__icontains,
url__istartswith=url__istartswith,
tag=tag,
crawl_id=crawl_id,
limit=limit,
))
@main.command('update')
@click.option('--status', '-s', help='Set status')
@click.option('--tag', '-t', help='Add tag')
def update_cmd(status: Optional[str], tag: Optional[str]):
"""Update Snapshots from stdin JSONL."""
sys.exit(update_snapshots(status=status, tag=tag))
@main.command('delete')
@click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
@click.option('--dry-run', is_flag=True, help='Show what would be deleted')
def delete_cmd(yes: bool, dry_run: bool):
"""Delete Snapshots from stdin JSONL."""
sys.exit(delete_snapshots(yes=yes, dry_run=dry_run))
if __name__ == '__main__':

View File

@@ -0,0 +1,293 @@
#!/usr/bin/env python3
"""
archivebox tag <action> [args...] [--filters]
Manage Tag records.
Actions:
create - Create Tags
list - List Tags as JSONL (with optional filters)
update - Update Tags from stdin JSONL
delete - Delete Tags from stdin JSONL
Examples:
# Create
archivebox tag create news tech science
archivebox tag create "important stuff"
# List
archivebox tag list
archivebox tag list --name__icontains=news
# Update (rename tags)
archivebox tag list --name=oldname | archivebox tag update --name=newname
# Delete
archivebox tag list --name=unused | archivebox tag delete --yes
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox tag'
import sys
from typing import Optional, Iterable
import rich_click as click
from rich import print as rprint
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
# CREATE
# =============================================================================
def create_tags(names: Iterable[str]) -> int:
"""
Create Tags from names.
Exit codes:
0: Success
1: Failure
"""
from archivebox.misc.jsonl import write_record
from archivebox.core.models import Tag
is_tty = sys.stdout.isatty()
# Convert to list if needed
name_list = list(names) if names else []
if not name_list:
rprint('[yellow]No tag names provided. Pass names as arguments.[/yellow]', file=sys.stderr)
return 1
created_count = 0
for name in name_list:
name = name.strip()
if not name:
continue
tag, created = Tag.objects.get_or_create(name=name)
if not is_tty:
write_record(tag.to_json())
if created:
created_count += 1
rprint(f'[green]Created tag: {name}[/green]', file=sys.stderr)
else:
rprint(f'[dim]Tag already exists: {name}[/dim]', file=sys.stderr)
rprint(f'[green]Created {created_count} new tags[/green]', file=sys.stderr)
return 0
# =============================================================================
# LIST
# =============================================================================
def list_tags(
name: Optional[str] = None,
name__icontains: Optional[str] = None,
limit: Optional[int] = None,
) -> int:
"""
List Tags as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.core.models import Tag
is_tty = sys.stdout.isatty()
queryset = Tag.objects.all().order_by('name')
# Apply filters
filter_kwargs = {
'name': name,
'name__icontains': name__icontains,
}
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
count = 0
for tag in queryset:
snapshot_count = tag.snapshot_set.count()
if is_tty:
rprint(f'[cyan]{tag.name:30}[/cyan] [dim]({snapshot_count} snapshots)[/dim]')
else:
write_record(tag.to_json())
count += 1
rprint(f'[dim]Listed {count} tags[/dim]', file=sys.stderr)
return 0
# =============================================================================
# UPDATE
# =============================================================================
def update_tags(name: Optional[str] = None) -> int:
"""
Update Tags from stdin JSONL.
Reads Tag records from stdin and applies updates.
Uses PATCH semantics - only specified fields are updated.
Exit codes:
0: Success
1: No input or error
"""
from archivebox.misc.jsonl import read_stdin, write_record
from archivebox.core.models import Tag
is_tty = sys.stdout.isatty()
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
updated_count = 0
for record in records:
tag_id = record.get('id')
old_name = record.get('name')
if not tag_id and not old_name:
continue
try:
if tag_id:
tag = Tag.objects.get(id=tag_id)
else:
tag = Tag.objects.get(name=old_name)
# Apply updates from CLI flags
if name:
tag.name = name
tag.save()
updated_count += 1
if not is_tty:
write_record(tag.to_json())
except Tag.DoesNotExist:
rprint(f'[yellow]Tag not found: {tag_id or old_name}[/yellow]', file=sys.stderr)
continue
rprint(f'[green]Updated {updated_count} tags[/green]', file=sys.stderr)
return 0
# =============================================================================
# DELETE
# =============================================================================
def delete_tags(yes: bool = False, dry_run: bool = False) -> int:
"""
Delete Tags from stdin JSONL.
Requires --yes flag to confirm deletion.
Exit codes:
0: Success
1: No input or missing --yes flag
"""
from archivebox.misc.jsonl import read_stdin
from archivebox.core.models import Tag
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
# Collect tag IDs or names
tag_ids = []
tag_names = []
for r in records:
if r.get('id'):
tag_ids.append(r['id'])
elif r.get('name'):
tag_names.append(r['name'])
if not tag_ids and not tag_names:
rprint('[yellow]No valid tag IDs or names in input[/yellow]', file=sys.stderr)
return 1
from django.db.models import Q
query = Q()
if tag_ids:
query |= Q(id__in=tag_ids)
if tag_names:
query |= Q(name__in=tag_names)
tags = Tag.objects.filter(query)
count = tags.count()
if count == 0:
rprint('[yellow]No matching tags found[/yellow]', file=sys.stderr)
return 0
if dry_run:
rprint(f'[yellow]Would delete {count} tags (dry run)[/yellow]', file=sys.stderr)
for tag in tags:
rprint(f' {tag.name}', file=sys.stderr)
return 0
if not yes:
rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
return 1
# Perform deletion
deleted_count, _ = tags.delete()
rprint(f'[green]Deleted {deleted_count} tags[/green]', file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage Tag records."""
pass
@main.command('create')
@click.argument('names', nargs=-1)
def create_cmd(names: tuple):
"""Create Tags from names."""
sys.exit(create_tags(names))
@main.command('list')
@click.option('--name', help='Filter by exact name')
@click.option('--name__icontains', help='Filter by name contains')
@click.option('--limit', '-n', type=int, help='Limit number of results')
def list_cmd(name: Optional[str], name__icontains: Optional[str], limit: Optional[int]):
"""List Tags as JSONL."""
sys.exit(list_tags(name=name, name__icontains=name__icontains, limit=limit))
@main.command('update')
@click.option('--name', '-n', help='Set new name')
def update_cmd(name: Optional[str]):
"""Update Tags from stdin JSONL."""
sys.exit(update_tags(name=name))
@main.command('delete')
@click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
@click.option('--dry-run', is_flag=True, help='Show what would be deleted')
def delete_cmd(yes: bool, dry_run: bool):
"""Delete Tags from stdin JSONL."""
sys.exit(delete_tags(yes=yes, dry_run=dry_run))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,46 @@
"""
Shared CLI utilities for ArchiveBox commands.
This module contains common utilities used across multiple CLI commands,
extracted to avoid code duplication.
"""
__package__ = 'archivebox.cli'
from typing import Optional
def apply_filters(queryset, filter_kwargs: dict, limit: Optional[int] = None):
"""
Apply Django-style filters from CLI kwargs to a QuerySet.
Supports: --status=queued, --url__icontains=example, --id__in=uuid1,uuid2
Args:
queryset: Django QuerySet to filter
filter_kwargs: Dict of filter key-value pairs from CLI
limit: Optional limit on results
Returns:
Filtered QuerySet
Example:
queryset = Snapshot.objects.all()
filter_kwargs = {'status': 'queued', 'url__icontains': 'example.com'}
filtered = apply_filters(queryset, filter_kwargs, limit=10)
"""
filters = {}
for key, value in filter_kwargs.items():
if value is None or key in ('limit', 'offset'):
continue
# Handle CSV lists for __in filters
if key.endswith('__in') and isinstance(value, str):
value = [v.strip() for v in value.split(',')]
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if limit:
queryset = queryset[:limit]
return queryset

View File

@@ -1,17 +1,18 @@
#!/usr/bin/env python3
"""
Tests for CLI piping workflow: crawl | snapshot | extract
Tests for CLI piping workflow: crawl | snapshot | archiveresult | run
This module tests the JSONL-based piping between CLI commands as described in:
https://github.com/ArchiveBox/ArchiveBox/issues/1363
Workflows tested:
archivebox crawl URL -> Crawl JSONL
archivebox snapshot -> Snapshot JSONL (accepts Crawl or URL input)
archivebox extract -> ArchiveResult JSONL (accepts Snapshot input)
archivebox crawl create URL -> Crawl JSONL
archivebox snapshot create -> Snapshot JSONL (accepts Crawl or URL input)
archivebox archiveresult create -> ArchiveResult JSONL (accepts Snapshot input)
archivebox run -> Process queued records (accepts any JSONL)
Pipeline:
archivebox crawl URL | archivebox snapshot | archivebox extract
archivebox crawl create URL | archivebox snapshot create | archivebox archiveresult create | archivebox run
Each command should:
- Accept URLs, IDs, or JSONL as input (args or stdin)
@@ -154,13 +155,13 @@ class TestJSONLParsing(unittest.TestCase):
class TestJSONLOutput(unittest.TestCase):
"""Test JSONL output formatting."""
def test_crawl_to_jsonl(self):
"""Crawl model should serialize to JSONL correctly."""
def test_crawl_to_json(self):
"""Crawl model should serialize to JSON correctly."""
from archivebox.misc.jsonl import TYPE_CRAWL
# Create a mock crawl with to_jsonl method configured
# Create a mock crawl with to_json method configured
mock_crawl = MagicMock()
mock_crawl.to_jsonl.return_value = {
mock_crawl.to_json.return_value = {
'type': TYPE_CRAWL,
'schema_version': '0.9.0',
'id': 'test-crawl-uuid',
@@ -172,7 +173,7 @@ class TestJSONLOutput(unittest.TestCase):
'created_at': None,
}
result = mock_crawl.to_jsonl()
result = mock_crawl.to_json()
self.assertEqual(result['type'], TYPE_CRAWL)
self.assertEqual(result['id'], 'test-crawl-uuid')
self.assertEqual(result['urls'], 'https://example.com')
@@ -351,8 +352,8 @@ class TestSnapshotCommand(unittest.TestCase):
# using real Snapshot instances.
class TestExtractCommand(unittest.TestCase):
"""Unit tests for archivebox extract command."""
class TestArchiveResultCommand(unittest.TestCase):
"""Unit tests for archivebox archiveresult command."""
def setUp(self):
"""Set up test environment."""
@@ -363,8 +364,8 @@ class TestExtractCommand(unittest.TestCase):
"""Clean up test environment."""
shutil.rmtree(self.test_dir, ignore_errors=True)
def test_extract_accepts_snapshot_id(self):
"""extract should accept snapshot IDs as input."""
def test_archiveresult_accepts_snapshot_id(self):
"""archiveresult should accept snapshot IDs as input."""
from archivebox.misc.jsonl import read_args_or_stdin
uuid = '01234567-89ab-cdef-0123-456789abcdef'
@@ -374,8 +375,8 @@ class TestExtractCommand(unittest.TestCase):
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['id'], uuid)
def test_extract_accepts_jsonl_snapshot(self):
"""extract should accept JSONL Snapshot records."""
def test_archiveresult_accepts_jsonl_snapshot(self):
"""archiveresult should accept JSONL Snapshot records."""
from archivebox.misc.jsonl import read_args_or_stdin, TYPE_SNAPSHOT
stdin = StringIO('{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}\n')
@@ -387,8 +388,8 @@ class TestExtractCommand(unittest.TestCase):
self.assertEqual(records[0]['type'], TYPE_SNAPSHOT)
self.assertEqual(records[0]['id'], 'abc123')
def test_extract_gathers_snapshot_ids(self):
"""extract should gather snapshot IDs from various input formats."""
def test_archiveresult_gathers_snapshot_ids(self):
"""archiveresult should gather snapshot IDs from various input formats."""
from archivebox.misc.jsonl import TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
records = [
@@ -529,7 +530,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Create crawl with multiple URLs (as newline-separated string)
urls = 'https://test-crawl-1.example.com\nhttps://test-crawl-2.example.com'
crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id})
crawl = Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id})
self.assertIsNotNone(crawl)
self.assertIsNotNone(crawl.id)
@@ -543,7 +544,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
self.assertIn('https://test-crawl-2.example.com', urls_list)
# Verify output format
output = crawl.to_jsonl()
output = crawl.to_json()
self.assertEqual(output['type'], TYPE_CRAWL)
self.assertIn('id', output)
self.assertEqual(output['urls'], urls)
@@ -566,8 +567,8 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Step 1: Create crawl (simulating 'archivebox crawl')
urls = 'https://crawl-to-snap-1.example.com\nhttps://crawl-to-snap-2.example.com'
crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id})
crawl_output = crawl.to_jsonl()
crawl = Crawl.from_json({'urls': urls}, overrides={'created_by_id': created_by_id})
crawl_output = crawl.to_json()
# Step 2: Parse crawl output as snapshot input
stdin = StringIO(json.dumps(crawl_output) + '\n')
@@ -581,7 +582,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Step 3: Create snapshots from crawl URLs
created_snapshots = []
for url in crawl.get_urls_list():
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
snapshot = Snapshot.from_json({'url': url}, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
@@ -589,7 +590,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Verify snapshot output
for snapshot in created_snapshots:
output = snapshot.to_jsonl()
output = snapshot.to_json()
self.assertEqual(output['type'], TYPE_SNAPSHOT)
self.assertIn(output['url'], [
'https://crawl-to-snap-1.example.com',
@@ -619,13 +620,13 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Create snapshot
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_jsonl(records[0], overrides=overrides)
snapshot = Snapshot.from_json(records[0], overrides=overrides)
self.assertIsNotNone(snapshot.id)
self.assertEqual(snapshot.url, url)
# Verify output format
output = snapshot.to_jsonl()
output = snapshot.to_json()
self.assertEqual(output['type'], TYPE_SNAPSHOT)
self.assertIn('id', output)
self.assertEqual(output['url'], url)
@@ -647,8 +648,8 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# Step 1: Create snapshot (simulating 'archivebox snapshot')
url = 'https://test-extract-1.example.com'
overrides = {'created_by_id': created_by_id}
snapshot = Snapshot.from_jsonl({'url': url}, overrides=overrides)
snapshot_output = snapshot.to_jsonl()
snapshot = Snapshot.from_json({'url': url}, overrides=overrides)
snapshot_output = snapshot.to_json()
# Step 2: Parse snapshot output as extract input
stdin = StringIO(json.dumps(snapshot_output) + '\n')
@@ -686,8 +687,8 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
# === archivebox crawl https://example.com ===
url = 'https://test-pipeline-full.example.com'
crawl = Crawl.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
crawl_jsonl = json.dumps(crawl.to_jsonl())
crawl = Crawl.from_json({'url': url}, overrides={'created_by_id': created_by_id})
crawl_jsonl = json.dumps(crawl.to_json())
# === | archivebox snapshot ===
stdin = StringIO(crawl_jsonl + '\n')
@@ -705,7 +706,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
if crawl_id:
db_crawl = Crawl.objects.get(id=crawl_id)
for crawl_url in db_crawl.get_urls_list():
snapshot = Snapshot.from_jsonl({'url': crawl_url}, overrides={'created_by_id': created_by_id})
snapshot = Snapshot.from_json({'url': crawl_url}, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
@@ -713,7 +714,7 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
self.assertEqual(created_snapshots[0].url, url)
# === | archivebox extract ===
snapshot_jsonl_lines = [json.dumps(s.to_jsonl()) for s in created_snapshots]
snapshot_jsonl_lines = [json.dumps(s.to_json()) for s in created_snapshots]
stdin = StringIO('\n'.join(snapshot_jsonl_lines) + '\n')
stdin.isatty = lambda: False
@@ -757,12 +758,12 @@ class TestDepthWorkflows(unittest.TestCase):
# Create crawl with depth 0
url = 'https://depth0-test.example.com'
crawl = Crawl.from_jsonl({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id})
crawl = Crawl.from_json({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id})
self.assertEqual(crawl.max_depth, 0)
# Create snapshot
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
snapshot = Snapshot.from_json({'url': url}, overrides={'created_by_id': created_by_id})
self.assertEqual(snapshot.url, url)
def test_depth_metadata_in_crawl(self):
@@ -773,7 +774,7 @@ class TestDepthWorkflows(unittest.TestCase):
created_by_id = get_or_create_system_user_pk()
# Create crawl with depth
crawl = Crawl.from_jsonl(
crawl = Crawl.from_json(
{'url': 'https://depth-meta-test.example.com', 'max_depth': 2},
overrides={'created_by_id': created_by_id}
)
@@ -781,7 +782,7 @@ class TestDepthWorkflows(unittest.TestCase):
self.assertEqual(crawl.max_depth, 2)
# Verify in JSONL output
output = crawl.to_jsonl()
output = crawl.to_json()
self.assertEqual(output['max_depth'], 2)
@@ -956,5 +957,129 @@ class TestEdgeCases(unittest.TestCase):
self.assertEqual(urls[2], 'https://url3.com')
# =============================================================================
# Pass-Through Behavior Tests
# =============================================================================
class TestPassThroughBehavior(unittest.TestCase):
"""Test pass-through behavior in CLI commands."""
def test_crawl_passes_through_other_types(self):
"""crawl create should pass through records with other types."""
from archivebox.misc.jsonl import TYPE_CRAWL
# Input: a Tag record (not a Crawl or URL)
tag_record = {'type': 'Tag', 'id': 'test-tag', 'name': 'example'}
url_record = {'url': 'https://example.com'}
# Mock stdin with both records
stdin = StringIO(
json.dumps(tag_record) + '\n' +
json.dumps(url_record)
)
stdin.isatty = lambda: False
# The Tag should be passed through, the URL should create a Crawl
# (This is a unit test of the pass-through logic)
from archivebox.misc.jsonl import read_args_or_stdin
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 2)
# First record is a Tag (other type)
self.assertEqual(records[0]['type'], 'Tag')
# Second record has a URL
self.assertIn('url', records[1])
def test_snapshot_passes_through_crawl(self):
"""snapshot create should pass through Crawl records."""
from archivebox.misc.jsonl import TYPE_CRAWL, TYPE_SNAPSHOT
crawl_record = {
'type': TYPE_CRAWL,
'id': 'test-crawl',
'urls': 'https://example.com',
}
# Crawl records should be passed through AND create snapshots
# This tests the accumulation behavior
self.assertEqual(crawl_record['type'], TYPE_CRAWL)
self.assertIn('urls', crawl_record)
def test_archiveresult_passes_through_snapshot(self):
"""archiveresult create should pass through Snapshot records."""
from archivebox.misc.jsonl import TYPE_SNAPSHOT
snapshot_record = {
'type': TYPE_SNAPSHOT,
'id': 'test-snapshot',
'url': 'https://example.com',
}
# Snapshot records should be passed through
self.assertEqual(snapshot_record['type'], TYPE_SNAPSHOT)
self.assertIn('url', snapshot_record)
def test_run_passes_through_unknown_types(self):
"""run should pass through records with unknown types."""
unknown_record = {'type': 'Unknown', 'id': 'test', 'data': 'value'}
# Unknown types should be passed through unchanged
self.assertEqual(unknown_record['type'], 'Unknown')
self.assertIn('data', unknown_record)
class TestPipelineAccumulation(unittest.TestCase):
"""Test that pipelines accumulate records correctly."""
def test_full_pipeline_output_types(self):
"""Full pipeline should output all record types."""
from archivebox.misc.jsonl import TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
# Simulated pipeline output after: crawl | snapshot | archiveresult | run
# Should contain Crawl, Snapshot, and ArchiveResult records
pipeline_output = [
{'type': TYPE_CRAWL, 'id': 'c1', 'urls': 'https://example.com'},
{'type': TYPE_SNAPSHOT, 'id': 's1', 'url': 'https://example.com'},
{'type': TYPE_ARCHIVERESULT, 'id': 'ar1', 'plugin': 'title'},
]
types = {r['type'] for r in pipeline_output}
self.assertIn(TYPE_CRAWL, types)
self.assertIn(TYPE_SNAPSHOT, types)
self.assertIn(TYPE_ARCHIVERESULT, types)
def test_pipeline_preserves_ids(self):
"""Pipeline should preserve record IDs through all stages."""
records = [
{'type': 'Crawl', 'id': 'c1', 'urls': 'https://example.com'},
{'type': 'Snapshot', 'id': 's1', 'url': 'https://example.com'},
]
# All records should have IDs
for record in records:
self.assertIn('id', record)
self.assertTrue(record['id'])
def test_jq_transform_pattern(self):
"""Test pattern for jq transforms in pipeline."""
# Simulated: archiveresult list --status=failed | jq 'del(.id) | .status = "queued"'
failed_record = {
'type': 'ArchiveResult',
'id': 'ar1',
'status': 'failed',
'plugin': 'wget',
}
# Transform: delete id, set status to queued
transformed = {
'type': failed_record['type'],
'status': 'queued',
'plugin': failed_record['plugin'],
}
self.assertNotIn('id', transformed)
self.assertEqual(transformed['status'], 'queued')
if __name__ == '__main__':
unittest.main()