diff --git a/README.md b/README.md index 40598258..8f5db3cd 100644 --- a/README.md +++ b/README.md @@ -690,7 +690,7 @@ echo 'any text with urls in it' | archivebox a See the [Usage: CLI](https://github.com/ArchiveBox/ArchiveBox/wiki/Usage#CLI-Usage) page for documentation and examples. -It also includes a built-in scheduled import feature with `archivebox schedule` and browser bookmarklet, so you can pull in URLs from RSS feeds, websites, or the filesystem regularly/on-demand. +It also includes a built-in scheduled import feature with `archivebox schedule`, handled by the same orchestrator that powers `archivebox server`, so you can pull in URLs from RSS feeds and websites regularly without a separate cron container.
diff --git a/archivebox/api/tests.py b/archivebox/api/tests.py index adaf49da..ee566a63 100644 --- a/archivebox/api/tests.py +++ b/archivebox/api/tests.py @@ -1,30 +1,41 @@ -__package__ = 'archivebox.api' +import os +import django +from io import StringIO +from types import SimpleNamespace -# from django.test import TestCase -# from ninja.testing import TestClient +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.settings') +django.setup() -# from .routes_cli import router +from django.contrib.auth.models import User +from django.test import TestCase -# class ArchiveBoxCLIAPITestCase(TestCase): -# def setUp(self): -# self.client = TestClient(router) +from archivebox.api.v1_cli import ScheduleCommandSchema, cli_schedule +from archivebox.crawls.models import CrawlSchedule -# def test_add_endpoint(self): -# response = self.client.post("/add", json={"urls": ["http://example.com"], "tag": "testTag1,testTag2"}) -# self.assertEqual(response.status_code, 200) -# self.assertTrue(response.json()["success"]) -# def test_remove_endpoint(self): -# response = self.client.post("/remove", json={"filter_patterns": ["http://example.com"]}) -# self.assertEqual(response.status_code, 200) -# self.assertTrue(response.json()["success"]) +class CLIScheduleAPITests(TestCase): + def setUp(self): + self.user = User.objects.create_user( + username='api-user', + password='testpass123', + email='api@example.com', + ) -# def test_update_endpoint(self): -# response = self.client.post("/update", json={}) -# self.assertEqual(response.status_code, 200) -# self.assertTrue(response.json()["success"]) + def test_schedule_api_creates_schedule(self): + request = SimpleNamespace( + user=self.user, + stdout=StringIO(), + stderr=StringIO(), + ) + args = ScheduleCommandSchema( + every='daily', + import_path='https://example.com/feed.xml', + quiet=True, + ) -# def test_list_all_endpoint(self): -# response = self.client.post("/list_all", json={}) -# self.assertEqual(response.status_code, 200) -# self.assertTrue(response.json()["success"]) + response = cli_schedule(request, args) + + self.assertTrue(response['success']) + self.assertEqual(response['result_format'], 'json') + self.assertEqual(CrawlSchedule.objects.count(), 1) + self.assertEqual(len(response['result']['created_schedule_ids']), 1) diff --git a/archivebox/api/v1_cli.py b/archivebox/api/v1_cli.py index 943063ce..2e84a02d 100644 --- a/archivebox/api/v1_cli.py +++ b/archivebox/api/v1_cli.py @@ -74,6 +74,10 @@ class UpdateCommandSchema(Schema): class ScheduleCommandSchema(Schema): import_path: Optional[str] = None add: bool = False + show: bool = False + foreground: bool = False + run_all: bool = False + quiet: bool = False every: Optional[str] = None tag: str = '' depth: int = 0 @@ -172,6 +176,9 @@ def cli_schedule(request, args: ScheduleCommandSchema): import_path=args.import_path, add=args.add, show=args.show, + foreground=args.foreground, + run_all=args.run_all, + quiet=args.quiet, clear=args.clear, every=args.every, tag=args.tag, @@ -184,6 +191,7 @@ def cli_schedule(request, args: ScheduleCommandSchema): "success": True, "errors": [], "result": result, + "result_format": "json", "stdout": ansi_to_html(request.stdout.getvalue().strip()), "stderr": ansi_to_html(request.stderr.getvalue().strip()), } @@ -230,19 +238,37 @@ def cli_search(request, args: ListCommandSchema): @router.post("/remove", response=CLICommandResponseSchema, summary='archivebox remove [args] [filter_patterns]') def cli_remove(request, args: RemoveCommandSchema): from archivebox.cli.archivebox_remove import remove + from archivebox.cli.archivebox_search import get_snapshots + from archivebox.core.models import Snapshot + + snapshots_to_remove = get_snapshots( + filter_patterns=args.filter_patterns, + filter_type=args.filter_type, + after=args.after, + before=args.before, + ) + removed_snapshot_ids = [str(snapshot_id) for snapshot_id in snapshots_to_remove.values_list('id', flat=True)] - result = remove( + remove( yes=True, # no way to interactively ask for confirmation via API, so we force yes delete=args.delete, + snapshots=snapshots_to_remove, before=args.before, after=args.after, filter_type=args.filter_type, filter_patterns=args.filter_patterns, ) + + result = { + "removed_count": len(removed_snapshot_ids), + "removed_snapshot_ids": removed_snapshot_ids, + "remaining_snapshots": Snapshot.objects.count(), + } return { "success": True, "errors": [], "result": result, + "result_format": "json", "stdout": ansi_to_html(request.stdout.getvalue().strip()), "stderr": ansi_to_html(request.stderr.getvalue().strip()), } diff --git a/archivebox/api/v1_core.py b/archivebox/api/v1_core.py index 60aa0387..73a0fbed 100644 --- a/archivebox/api/v1_core.py +++ b/archivebox/api/v1_core.py @@ -547,7 +547,7 @@ def tags_add_to_snapshot(request, data: TagSnapshotRequestSchema): raise HttpError(400, 'Either tag_name or tag_id is required') # Add the tag to the snapshot - snapshot.tags.add(tag) + snapshot.tags.add(tag.pk) return { 'success': True, @@ -586,7 +586,7 @@ def tags_remove_from_snapshot(request, data: TagSnapshotRequestSchema): raise HttpError(400, 'Either tag_name or tag_id is required') # Remove the tag from the snapshot - snapshot.tags.remove(tag) + snapshot.tags.remove(tag.pk) return { 'success': True, diff --git a/archivebox/api/v1_machine.py b/archivebox/api/v1_machine.py index 95a4a970..2f1e7098 100644 --- a/archivebox/api/v1_machine.py +++ b/archivebox/api/v1_machine.py @@ -106,6 +106,13 @@ def get_machines(request, filters: MachineFilterSchema = Query(...)): return filters.filter(Machine.objects.all()).distinct() +@router.get("/machine/current", response=MachineSchema, url_name="get_current_machine") +def get_current_machine(request): + """Get the current machine.""" + from archivebox.machine.models import Machine + return Machine.current() + + @router.get("/machine/{machine_id}", response=MachineSchema, url_name="get_machine") def get_machine(request, machine_id: str): """Get a specific machine by ID.""" @@ -114,13 +121,6 @@ def get_machine(request, machine_id: str): return Machine.objects.get(Q(id__startswith=machine_id) | Q(hostname__iexact=machine_id)) -@router.get("/machine/current", response=MachineSchema, url_name="get_current_machine") -def get_current_machine(request): - """Get the current machine.""" - from archivebox.machine.models import Machine - return Machine.current() - - # ============================================================================ @@ -133,18 +133,18 @@ def get_current_machine(request): def get_binaries(request, filters: BinaryFilterSchema = Query(...)): """List all binaries.""" from archivebox.machine.models import Binary - return filters.filter(Binary.objects.all().select_related('machine', 'dependency')).distinct() + return filters.filter(Binary.objects.all().select_related('machine')).distinct() @router.get("/binary/{binary_id}", response=BinarySchema, url_name="get_binary") def get_binary(request, binary_id: str): """Get a specific binary by ID.""" from archivebox.machine.models import Binary - return Binary.objects.select_related('machine', 'dependency').get(id__startswith=binary_id) + return Binary.objects.select_related('machine').get(id__startswith=binary_id) @router.get("/binary/by-name/{name}", response=List[BinarySchema], url_name="get_binaries_by_name") def get_binaries_by_name(request, name: str): """Get all binaries with the given name.""" from archivebox.machine.models import Binary - return list(Binary.objects.filter(name__iexact=name).select_related('machine', 'dependency')) + return list(Binary.objects.filter(name__iexact=name).select_related('machine')) diff --git a/archivebox/cli/__init__.py b/archivebox/cli/__init__.py index 4289b011..b0c84f56 100644 --- a/archivebox/cli/__init__.py +++ b/archivebox/cli/__init__.py @@ -157,6 +157,16 @@ def cli(ctx, help=False): if subcommand in ArchiveBoxGroup.archive_commands or subcommand in ArchiveBoxGroup.model_commands: # print('SETUP DJANGO AND CHECK DATA FOLDER') try: + if subcommand == 'server': + run_in_debug = '--reload' in sys.argv or os.environ.get('DEBUG') in ('1', 'true', 'True', 'TRUE', 'yes') + if run_in_debug: + os.environ['ARCHIVEBOX_RUNSERVER'] = '1' + if '--reload' in sys.argv: + os.environ['ARCHIVEBOX_AUTORELOAD'] = '1' + os.environ['ARCHIVEBOX_ORCHESTRATOR_MANAGED_BY_WATCHER'] = '1' + from archivebox.config.common import STORAGE_CONFIG + os.environ['ARCHIVEBOX_RUNSERVER_PIDFILE'] = str(STORAGE_CONFIG.TMP_DIR / 'runserver.pid') + from archivebox.config.django import setup_django from archivebox.misc.checks import check_data_folder setup_django() diff --git a/archivebox/cli/archivebox_init.py b/archivebox/cli/archivebox_init.py index 34b10faa..6b861e12 100755 --- a/archivebox/cli/archivebox_init.py +++ b/archivebox/cli/archivebox_init.py @@ -163,9 +163,19 @@ def init(force: bool=False, quick: bool=False, install: bool=False) -> None: (CONSTANTS.DEFAULT_LIB_DIR / 'bin').mkdir(parents=True, exist_ok=True) from archivebox.config.common import STORAGE_CONFIG + from archivebox.config.paths import get_or_create_working_tmp_dir, get_or_create_working_lib_dir STORAGE_CONFIG.TMP_DIR.mkdir(parents=True, exist_ok=True) STORAGE_CONFIG.LIB_DIR.mkdir(parents=True, exist_ok=True) (STORAGE_CONFIG.LIB_DIR / 'bin').mkdir(parents=True, exist_ok=True) + + working_tmp_dir = get_or_create_working_tmp_dir(autofix=True, quiet=True) + if working_tmp_dir: + working_tmp_dir.mkdir(parents=True, exist_ok=True) + + working_lib_dir = get_or_create_working_lib_dir(autofix=True, quiet=True) + if working_lib_dir: + working_lib_dir.mkdir(parents=True, exist_ok=True) + (working_lib_dir / 'bin').mkdir(parents=True, exist_ok=True) if install: from archivebox.cli.archivebox_install import install as install_method diff --git a/archivebox/cli/archivebox_schedule.py b/archivebox/cli/archivebox_schedule.py index 5e146358..9dd63abd 100644 --- a/archivebox/cli/archivebox_schedule.py +++ b/archivebox/cli/archivebox_schedule.py @@ -2,166 +2,171 @@ __package__ = 'archivebox.cli' -import sys -from pathlib import Path - import rich_click as click from rich import print from archivebox.misc.util import enforce_types, docstring -from archivebox.config import DATA_DIR, CONSTANTS from archivebox.config.common import ARCHIVING_CONFIG -from archivebox.config.permissions import USER - - -CRON_COMMENT = 'ArchiveBox' @enforce_types -def schedule(add: bool=False, - show: bool=False, - clear: bool=False, - foreground: bool=False, - run_all: bool=False, - quiet: bool=False, - every: str | None=None, - tag: str='', - depth: int | str=0, - overwrite: bool=False, - update: bool=not ARCHIVING_CONFIG.ONLY_NEW, - import_path: str | None=None, - out_dir: Path=DATA_DIR) -> None: - """Set ArchiveBox to regularly import URLs at specific times using cron""" - +def schedule(add: bool = False, + show: bool = False, + clear: bool = False, + foreground: bool = False, + run_all: bool = False, + quiet: bool = False, + every: str | None = None, + tag: str = '', + depth: int | str = 0, + overwrite: bool = False, + update: bool = not ARCHIVING_CONFIG.ONLY_NEW, + import_path: str | None = None): + """Manage database-backed scheduled crawls processed by the orchestrator.""" + + from django.utils import timezone + + from archivebox.base_models.models import get_or_create_system_user_pk + from archivebox.crawls.models import Crawl, CrawlSchedule + from archivebox.crawls.schedule_utils import validate_schedule + from archivebox.workers.orchestrator import Orchestrator + depth = int(depth) - - import shutil - from crontab import CronTab, CronSlices - from archivebox.misc.system import dedupe_cron_jobs - - # Find the archivebox binary path - ARCHIVEBOX_ABSPATH = shutil.which('archivebox') or sys.executable.replace('python', 'archivebox') + result: dict[str, object] = { + 'created_schedule_ids': [], + 'disabled_count': 0, + 'run_all_enqueued': 0, + 'active_schedule_ids': [], + } - Path(CONSTANTS.LOGS_DIR).mkdir(exist_ok=True) - - cron = CronTab(user=True) - cron = dedupe_cron_jobs(cron) + def _active_schedules(): + return CrawlSchedule.objects.filter(is_enabled=True).select_related('template').order_by('created_at') if clear: - print(cron.remove_all(comment=CRON_COMMENT)) - cron.write() - raise SystemExit(0) - - existing_jobs = list(cron.find_comment(CRON_COMMENT)) + disabled_count = CrawlSchedule.objects.filter(is_enabled=True).update( + is_enabled=False, + modified_at=timezone.now(), + ) + result['disabled_count'] = disabled_count + print(f'[green]\\[√] Disabled {disabled_count} scheduled crawl(s).[/green]') if every or add: - every = every or 'day' - quoted = lambda s: f'"{s}"' if (s and ' ' in str(s)) else str(s) - cmd = [ - 'cd', - quoted(out_dir), - '&&', - quoted(ARCHIVEBOX_ABSPATH), - *([ - 'add', - *(['--overwrite'] if overwrite else []), - *(['--update'] if update else []), - *([f'--tag={tag}'] if tag else []), - f'--depth={depth}', - f'"{import_path}"', - ] if import_path else ['update']), - '>>', - quoted(Path(CONSTANTS.LOGS_DIR) / 'schedule.log'), - '2>&1', - ] - new_job = cron.new(command=' '.join(cmd), comment=CRON_COMMENT) + schedule_str = (every or 'day').strip() + validate_schedule(schedule_str) - if every in ('minute', 'hour', 'day', 'month', 'year'): - set_every = getattr(new_job.every(), every) - set_every() - elif CronSlices.is_valid(every): - new_job.setall(every) + created_by_id = get_or_create_system_user_pk() + is_update_schedule = not import_path + template_urls = import_path or 'archivebox://update' + template_label = ( + f'Scheduled import: {template_urls}' + if import_path else + 'Scheduled ArchiveBox update' + )[:64] + template_notes = ( + f'Created by archivebox schedule for {template_urls}' + if import_path else + 'Created by archivebox schedule to queue recurring archivebox://update maintenance crawls.' + ) + + template = Crawl.objects.create( + urls=template_urls, + max_depth=0 if is_update_schedule else depth, + tags_str='' if is_update_schedule else tag, + label=template_label, + notes=template_notes, + created_by_id=created_by_id, + status=Crawl.StatusChoices.SEALED, + retry_at=None, + config={ + 'ONLY_NEW': not update, + 'OVERWRITE': overwrite, + 'DEPTH': 0 if is_update_schedule else depth, + 'SCHEDULE_KIND': 'update' if is_update_schedule else 'crawl', + }, + ) + crawl_schedule = CrawlSchedule.objects.create( + template=template, + schedule=schedule_str, + is_enabled=True, + label=template_label, + notes=template_notes, + created_by_id=created_by_id, + ) + result['created_schedule_ids'] = [str(crawl_schedule.id)] + + schedule_type = 'maintenance update' if is_update_schedule else 'crawl' + print(f'[green]\\[√] Created scheduled {schedule_type}.[/green]') + print(f' id={crawl_schedule.id}') + print(f' every={crawl_schedule.schedule}') + print(f' next_run={crawl_schedule.next_run_at.isoformat()}') + if import_path: + print(f' source={import_path}') + + schedules = list(_active_schedules()) + result['active_schedule_ids'] = [str(schedule.id) for schedule in schedules] + + if show: + if schedules: + print(f'[green]\\[*] Active scheduled crawls: {len(schedules)}[/green]') + for scheduled_crawl in schedules: + template = scheduled_crawl.template + print( + f' - id={scheduled_crawl.id} every={scheduled_crawl.schedule} ' + f'next_run={scheduled_crawl.next_run_at.isoformat()} ' + f'source={template.urls.splitlines()[0] if template.urls else ""}' + ) else: - print('[red]\\[X] Got invalid timeperiod for cron task.[/red]') - print(' It must be one of minute/hour/day/month') - print(' or a quoted cron-format schedule like:') - print(' archivebox init --every=day --depth=1 https://example.com/some/rss/feed.xml') - print(' archivebox init --every="0/5 * * * *" --depth=1 https://example.com/some/rss/feed.xml') - raise SystemExit(1) + print('[yellow]\\[*] No scheduled crawls are enabled.[/yellow]') - cron = dedupe_cron_jobs(cron) - print(cron) - cron.write() + if run_all: + enqueued = 0 + now = timezone.now() + for scheduled_crawl in schedules: + scheduled_crawl.enqueue(queued_at=now) + enqueued += 1 + result['run_all_enqueued'] = enqueued + print(f'[green]\\[*] Enqueued {enqueued} scheduled crawl(s) immediately.[/green]') + if enqueued and not Orchestrator.is_running(): + print('[yellow]\\[*] No orchestrator is running yet. Start `archivebox server` or `archivebox schedule --foreground` to process the queued crawls.[/yellow]') - total_runs = sum(j.frequency_per_year() for j in cron) - existing_jobs = list(cron.find_command('archivebox')) - - print() - print('[green]\\[√] Scheduled new ArchiveBox cron job for user: {} ({} jobs are active).[/green]'.format(USER, len(existing_jobs))) - print('\n'.join(f' > {cmd}' if str(cmd) == str(new_job) else f' {cmd}' for cmd in existing_jobs)) - if total_runs > 60 and not quiet: - print() - print('[yellow]\\[!] With the current cron config, ArchiveBox is estimated to run >{} times per year.[/yellow]'.format(total_runs)) - print(' Congrats on being an enthusiastic internet archiver! 👌') - print() - print(' [violet]Make sure you have enough storage space available to hold all the data.[/violet]') - print(' Using a compressed/deduped filesystem like ZFS is recommended if you plan on archiving a lot.') - print() - elif show: - if existing_jobs: - print('\n'.join(str(cmd) for cmd in existing_jobs)) + if foreground: + print('[green]\\[*] Starting global orchestrator in foreground mode. It will materialize scheduled crawls and process queued work.[/green]') + if Orchestrator.is_running(): + print('[yellow]\\[*] Orchestrator is already running.[/yellow]') else: - print('[red]\\[X] There are no ArchiveBox cron jobs scheduled for your user ({}).[/red]'.format(USER)) - print(' To schedule a new job, run:') - print(' archivebox schedule --every=[timeperiod] --depth=1 https://example.com/some/rss/feed.xml') - raise SystemExit(0) + orchestrator = Orchestrator(exit_on_idle=False) + orchestrator.runloop() - if foreground or run_all: - if not existing_jobs: - print('[red]\\[X] You must schedule some jobs first before running in foreground mode.[/red]') - print(' archivebox schedule --every=hour --depth=1 https://example.com/some/rss/feed.xml') - raise SystemExit(1) + if quiet: + return result - print('[green]\\[*] Running {} ArchiveBox jobs in foreground task scheduler...[/green]'.format(len(existing_jobs))) - if run_all: - try: - for job in existing_jobs: - sys.stdout.write(f' > {job.command.split("/archivebox ")[0].split(" && ")[0]}\n') - sys.stdout.write(f' > {job.command.split("/archivebox ")[-1].split(" >> ")[0]}') - sys.stdout.flush() - job.run() - sys.stdout.write(f'\r √ {job.command.split("/archivebox ")[-1]}\n') - except KeyboardInterrupt: - print('\n[green]\\[√] Stopped.[/green] (Ctrl+C)') - raise SystemExit(1) + if not any((every, add, show, clear, foreground, run_all)): + if schedules: + print('[green]\\[*] Active scheduled crawls:[/green]') + for scheduled_crawl in schedules: + print(f' - {scheduled_crawl.id} every={scheduled_crawl.schedule} next_run={scheduled_crawl.next_run_at.isoformat()}') + else: + print('[yellow]\\[*] No scheduled crawls are enabled.[/yellow]') - if foreground: - try: - for job in existing_jobs: - print(f' > {job.command.split("/archivebox ")[-1].split(" >> ")[0]}') - for result in cron.run_scheduler(): - print(result) - except KeyboardInterrupt: - print('\n[green]\\[√] Stopped.[/green] (Ctrl+C)') - raise SystemExit(1) + return result @click.command() -@click.option('--quiet', '-q', is_flag=True, help="Don't warn about storage space") -@click.option('--add', is_flag=True, help='Add a new scheduled ArchiveBox update job to cron') -@click.option('--every', type=str, help='Run ArchiveBox once every [timeperiod] (hour/day/month/year or cron format e.g. "0 0 * * *")') -@click.option('--tag', '-t', default='', help='Tag the added URLs with the provided tags e.g. --tag=tag1,tag2,tag3') -@click.option('--depth', type=click.Choice(['0', '1']), default='0', help='Depth to archive to [0] or 1') -@click.option('--overwrite', is_flag=True, help='Re-archive any URLs that have been previously archived, overwriting existing Snapshots') -@click.option('--update', is_flag=True, help='Re-pull any URLs that have been previously added, as needed to fill missing ArchiveResults') -@click.option('--clear', is_flag=True, help='Stop all ArchiveBox scheduled runs (remove cron jobs)') -@click.option('--show', is_flag=True, help='Print a list of currently active ArchiveBox cron jobs') -@click.option('--foreground', '-f', is_flag=True, help='Launch ArchiveBox scheduler as a long-running foreground task instead of using cron') -@click.option('--run-all', is_flag=True, help='Run all the scheduled jobs once immediately, independent of their configured schedules') +@click.option('--quiet', '-q', is_flag=True, help="Return structured results without extra summary output") +@click.option('--add', is_flag=True, help='Create a new scheduled crawl') +@click.option('--every', type=str, help='Run on an alias like daily/weekly/monthly or a cron expression such as "0 */6 * * *"') +@click.option('--tag', '-t', default='', help='Comma-separated tags to apply to scheduled crawl snapshots') +@click.option('--depth', type=click.Choice([str(i) for i in range(5)]), default='0', help='Recursively archive linked pages up to N hops away') +@click.option('--overwrite', is_flag=True, help='Overwrite existing data if URLs have been archived previously') +@click.option('--update', is_flag=True, help='Retry previously failed/skipped URLs when scheduled crawls run') +@click.option('--clear', is_flag=True, help='Disable all currently enabled schedules') +@click.option('--show', is_flag=True, help='Print all currently enabled schedules') +@click.option('--foreground', '-f', is_flag=True, help='Run the global orchestrator in the foreground (no crontab required)') +@click.option('--run-all', is_flag=True, help='Enqueue all enabled schedules immediately and process them once') @click.argument('import_path', required=False) @docstring(schedule.__doc__) def main(**kwargs): - """Set ArchiveBox to regularly import URLs at specific times using cron""" + """Manage database-backed scheduled crawls processed by the orchestrator.""" schedule(**kwargs) diff --git a/archivebox/cli/archivebox_server.py b/archivebox/cli/archivebox_server.py index afc4542a..6e6401cd 100644 --- a/archivebox/cli/archivebox_server.py +++ b/archivebox/cli/archivebox_server.py @@ -39,6 +39,27 @@ def server(runserver_args: Iterable[str]=(SERVER_CONFIG.BIND_ADDR,), if debug or reload: SHELL_CONFIG.DEBUG = True + if run_in_debug: + os.environ['ARCHIVEBOX_RUNSERVER'] = '1' + if reload: + os.environ['ARCHIVEBOX_AUTORELOAD'] = '1' + os.environ['ARCHIVEBOX_ORCHESTRATOR_MANAGED_BY_WATCHER'] = '1' + from archivebox.config.common import STORAGE_CONFIG + pidfile = str(STORAGE_CONFIG.TMP_DIR / 'runserver.pid') + os.environ['ARCHIVEBOX_RUNSERVER_PIDFILE'] = pidfile + + from django.utils.autoreload import DJANGO_AUTORELOAD_ENV + is_reloader_child = os.environ.get(DJANGO_AUTORELOAD_ENV) == 'true' + if not is_reloader_child: + env = os.environ.copy() + env['ARCHIVEBOX_ORCHESTRATOR_WATCHER'] = '1' + subprocess.Popen( + [sys.executable, '-m', 'archivebox', 'manage', 'orchestrator_watch', f'--pidfile={pidfile}'], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + from django.contrib.auth.models import User if not User.objects.filter(is_superuser=True).exclude(username='system').exists(): @@ -63,26 +84,6 @@ def server(runserver_args: Iterable[str]=(SERVER_CONFIG.BIND_ADDR,), pass if run_in_debug: - os.environ['ARCHIVEBOX_RUNSERVER'] = '1' - if reload: - os.environ['ARCHIVEBOX_AUTORELOAD'] = '1' - os.environ['ARCHIVEBOX_ORCHESTRATOR_MANAGED_BY_WATCHER'] = '1' - from archivebox.config.common import STORAGE_CONFIG - pidfile = str(STORAGE_CONFIG.TMP_DIR / 'runserver.pid') - os.environ['ARCHIVEBOX_RUNSERVER_PIDFILE'] = pidfile - - from django.utils.autoreload import DJANGO_AUTORELOAD_ENV - is_reloader_child = os.environ.get(DJANGO_AUTORELOAD_ENV) == 'true' - if not is_reloader_child: - env = os.environ.copy() - env['ARCHIVEBOX_ORCHESTRATOR_WATCHER'] = '1' - subprocess.Popen( - [sys.executable, '-m', 'archivebox', 'manage', 'orchestrator_watch', f'--pidfile={pidfile}'], - env=env, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - from django.core.management import call_command print('[green][+] Starting ArchiveBox webserver in DEBUG mode...[/green]') print(f' [blink][green]>[/green][/blink] Starting ArchiveBox webserver on [deep_sky_blue4][link=http://{host}:{port}]http://{host}:{port}[/link][/deep_sky_blue4]') diff --git a/archivebox/cli/archivebox_status.py b/archivebox/cli/archivebox_status.py index e8e91b2d..c0622f0d 100644 --- a/archivebox/cli/archivebox_status.py +++ b/archivebox/cli/archivebox_status.py @@ -31,39 +31,61 @@ def status(out_dir: Path=DATA_DIR) -> None: print(f' Index size: {size} across {num_files} files') print() - links = Snapshot.objects.all() - num_sql_links = links.count() + links = list(Snapshot.objects.all()) + num_sql_links = len(links) num_link_details = sum(1 for link in parse_json_links_details(out_dir=out_dir)) print(f' > SQL Main Index: {num_sql_links} links'.ljust(36), f'(found in {CONSTANTS.SQL_INDEX_FILENAME})') print(f' > JSON Link Details: {num_link_details} links'.ljust(36), f'(found in {ARCHIVE_DIR.name}/*/index.json)') print() print('[green]\\[*] Scanning archive data directories...[/green]') - print(f'[yellow] {ARCHIVE_DIR}/*[/yellow]') - num_bytes, num_dirs, num_files = get_dir_size(ARCHIVE_DIR) + users_dir = out_dir / 'users' + scan_roots = [root for root in (ARCHIVE_DIR, users_dir) if root.exists()] + scan_roots_display = ', '.join(str(root) for root in scan_roots) if scan_roots else str(ARCHIVE_DIR) + print(f'[yellow] {scan_roots_display}[/yellow]') + num_bytes = num_dirs = num_files = 0 + for root in scan_roots: + root_bytes, root_dirs, root_files = get_dir_size(root) + num_bytes += root_bytes + num_dirs += root_dirs + num_files += root_files size = printable_filesize(num_bytes) print(f' Size: {size} across {num_files} files in {num_dirs} directories') # Use DB as source of truth for snapshot status - num_indexed = links.count() - num_archived = links.filter(status='archived').count() or links.exclude(downloaded_at=None).count() - num_unarchived = links.filter(status='queued').count() or links.filter(downloaded_at=None).count() + num_indexed = len(links) + num_archived = sum(1 for snapshot in links if snapshot.is_archived) + num_unarchived = max(num_indexed - num_archived, 0) print(f' > indexed: {num_indexed}'.ljust(36), '(total snapshots in DB)') print(f' > archived: {num_archived}'.ljust(36), '(snapshots with archived content)') print(f' > unarchived: {num_unarchived}'.ljust(36), '(snapshots pending archiving)') - # Count directories on filesystem - num_present = 0 - orphaned_dirs = [] - if ARCHIVE_DIR.exists(): - for entry in ARCHIVE_DIR.iterdir(): - if entry.is_dir(): - num_present += 1 - if not links.filter(timestamp=entry.name).exists(): - orphaned_dirs.append(str(entry)) + # Count snapshot directories on filesystem across both legacy and current layouts. + expected_snapshot_dirs = { + str(Path(snapshot.output_dir).resolve()) + for snapshot in links + if Path(snapshot.output_dir).exists() + } + discovered_snapshot_dirs = set() - num_valid = min(num_present, num_indexed) # approximate + if ARCHIVE_DIR.exists(): + discovered_snapshot_dirs.update( + str(entry.resolve()) + for entry in ARCHIVE_DIR.iterdir() + if entry.is_dir() + ) + + if users_dir.exists(): + discovered_snapshot_dirs.update( + str(entry.resolve()) + for entry in users_dir.glob('*/snapshots/*/*/*') + if entry.is_dir() + ) + + orphaned_dirs = sorted(discovered_snapshot_dirs - expected_snapshot_dirs) + num_present = len(discovered_snapshot_dirs) + num_valid = len(discovered_snapshot_dirs & expected_snapshot_dirs) print() - print(f' > present: {num_present}'.ljust(36), '(directories in archive/)') + print(f' > present: {num_present}'.ljust(36), '(snapshot directories on disk)') print(f' > [green]valid:[/green] {num_valid}'.ljust(36), ' (directories with matching DB entry)') num_orphaned = len(orphaned_dirs) @@ -95,7 +117,14 @@ def status(out_dir: Path=DATA_DIR) -> None: print(' [green]archivebox manage createsuperuser[/green]') print() - for snapshot in links.order_by('-downloaded_at')[:10]: + recent_snapshots = sorted( + links, + key=lambda snapshot: ( + snapshot.downloaded_at or snapshot.modified_at or snapshot.created_at + ), + reverse=True, + )[:10] + for snapshot in recent_snapshots: if not snapshot.downloaded_at: continue print( diff --git a/archivebox/config/constants.py b/archivebox/config/constants.py index c1f6ae44..2a4ceb49 100644 --- a/archivebox/config/constants.py +++ b/archivebox/config/constants.py @@ -80,7 +80,6 @@ class ConstantsDict(Mapping): ARCHIVE_DIR_NAME: str = 'archive' SOURCES_DIR_NAME: str = 'sources' PERSONAS_DIR_NAME: str = 'personas' - CRONTABS_DIR_NAME: str = 'crontabs' CACHE_DIR_NAME: str = 'cache' LOGS_DIR_NAME: str = 'logs' CUSTOM_PLUGINS_DIR_NAME: str = 'custom_plugins' @@ -177,7 +176,6 @@ class ConstantsDict(Mapping): PERSONAS_DIR_NAME, CUSTOM_TEMPLATES_DIR_NAME, CUSTOM_PLUGINS_DIR_NAME, - CRONTABS_DIR_NAME, "invalid", "users", "machine", diff --git a/archivebox/config/paths.py b/archivebox/config/paths.py index 74d50c86..8a4d81fe 100644 --- a/archivebox/config/paths.py +++ b/archivebox/config/paths.py @@ -141,6 +141,11 @@ def create_and_chown_dir(dir_path: Path) -> None: os.system(f'chown {ARCHIVEBOX_USER} "{dir_path}" 2>/dev/null') os.system(f'chown {ARCHIVEBOX_USER} "{dir_path}"/* 2>/dev/null &') + +def tmp_dir_socket_path_is_short_enough(dir_path: Path) -> bool: + socket_file = dir_path.absolute().resolve() / 'supervisord.sock' + return len(f'file://{socket_file}') <= 96 + @cache def get_or_create_working_tmp_dir(autofix=True, quiet=True): from archivebox import CONSTANTS @@ -158,6 +163,7 @@ def get_or_create_working_tmp_dir(autofix=True, quiet=True): Path(tempfile.gettempdir()) / 'archivebox' / get_collection_id()[:4], # /var/folders/qy/6tpfrpx100j1t4l312nz683m0000gn/T/archivebox/abc5d Path(tempfile.gettempdir()) / 'abx' / get_collection_id()[:4], # /var/folders/qy/6tpfrpx100j1t4l312nz683m0000gn/T/abx/abc5 ] + fallback_candidate = None for candidate in CANDIDATES: try: create_and_chown_dir(candidate) @@ -167,6 +173,19 @@ def get_or_create_working_tmp_dir(autofix=True, quiet=True): if autofix and STORAGE_CONFIG.TMP_DIR != candidate: STORAGE_CONFIG.update_in_place(TMP_DIR=candidate) return candidate + try: + if fallback_candidate is None and candidate.exists() and dir_is_writable(candidate) and tmp_dir_socket_path_is_short_enough(candidate): + fallback_candidate = candidate + except Exception: + pass + + # Some sandboxed environments disallow AF_UNIX binds entirely. + # Fall back to the shortest writable path so read-only CLI commands can still run, + # and let later permission checks surface the missing socket support if needed. + if fallback_candidate: + if autofix and STORAGE_CONFIG.TMP_DIR != fallback_candidate: + STORAGE_CONFIG.update_in_place(TMP_DIR=fallback_candidate) + return fallback_candidate if not quiet: raise OSError(f'ArchiveBox is unable to find a writable TMP_DIR, tried {CANDIDATES}!') @@ -205,6 +224,11 @@ def get_or_create_working_lib_dir(autofix=True, quiet=False): def get_data_locations(): from archivebox.config import CONSTANTS from archivebox.config.common import STORAGE_CONFIG + + try: + tmp_dir = get_or_create_working_tmp_dir(autofix=True, quiet=True) or STORAGE_CONFIG.TMP_DIR + except Exception: + tmp_dir = STORAGE_CONFIG.TMP_DIR return benedict({ "DATA_DIR": { @@ -246,9 +270,9 @@ def get_data_locations(): "is_valid": os.path.isdir(CONSTANTS.LOGS_DIR) and os.access(CONSTANTS.LOGS_DIR, os.R_OK) and os.access(CONSTANTS.LOGS_DIR, os.W_OK), # read + write }, 'TMP_DIR': { - 'path': STORAGE_CONFIG.TMP_DIR.resolve(), + 'path': tmp_dir.resolve(), 'enabled': True, - 'is_valid': os.path.isdir(STORAGE_CONFIG.TMP_DIR) and os.access(STORAGE_CONFIG.TMP_DIR, os.R_OK) and os.access(STORAGE_CONFIG.TMP_DIR, os.W_OK), # read + write + 'is_valid': os.path.isdir(tmp_dir) and os.access(tmp_dir, os.R_OK) and os.access(tmp_dir, os.W_OK), # read + write }, # "CACHE_DIR": { # "path": CACHE_DIR.resolve(), @@ -262,6 +286,13 @@ def get_code_locations(): from archivebox.config import CONSTANTS from archivebox.config.common import STORAGE_CONFIG + try: + lib_dir = get_or_create_working_lib_dir(autofix=True, quiet=True) or STORAGE_CONFIG.LIB_DIR + except Exception: + lib_dir = STORAGE_CONFIG.LIB_DIR + + lib_bin_dir = lib_dir / 'bin' + return benedict({ 'PACKAGE_DIR': { 'path': (PACKAGE_DIR).resolve(), @@ -284,14 +315,14 @@ def get_code_locations(): 'is_valid': os.path.isdir(CONSTANTS.USER_PLUGINS_DIR) and os.access(CONSTANTS.USER_PLUGINS_DIR, os.R_OK), # read }, 'LIB_DIR': { - 'path': STORAGE_CONFIG.LIB_DIR.resolve(), + 'path': lib_dir.resolve(), 'enabled': True, - 'is_valid': os.path.isdir(STORAGE_CONFIG.LIB_DIR) and os.access(STORAGE_CONFIG.LIB_DIR, os.R_OK) and os.access(STORAGE_CONFIG.LIB_DIR, os.W_OK), # read + write + 'is_valid': os.path.isdir(lib_dir) and os.access(lib_dir, os.R_OK) and os.access(lib_dir, os.W_OK), # read + write }, 'LIB_BIN_DIR': { - 'path': STORAGE_CONFIG.LIB_BIN_DIR.resolve(), + 'path': lib_bin_dir.resolve(), 'enabled': True, - 'is_valid': os.path.isdir(STORAGE_CONFIG.LIB_BIN_DIR) and os.access(STORAGE_CONFIG.LIB_BIN_DIR, os.R_OK) and os.access(STORAGE_CONFIG.LIB_BIN_DIR, os.W_OK), # read + write + 'is_valid': os.path.isdir(lib_bin_dir) and os.access(lib_bin_dir, os.R_OK) and os.access(lib_bin_dir, os.W_OK), # read + write }, }) @@ -409,4 +440,3 @@ def get_code_locations(): # print(f'[red]:cross_mark: ERROR: SYSTEM_TMP_DIR {run_dir} is not writable by archivebox user {ARCHIVEBOX_USER}:{ARCHIVEBOX_GROUP}[/red]', file=sys.stderr) # return run_dir - diff --git a/archivebox/core/forms.py b/archivebox/core/forms.py index 2ab7539e..a1a83ed7 100644 --- a/archivebox/core/forms.py +++ b/archivebox/core/forms.py @@ -5,6 +5,7 @@ from django import forms from archivebox.misc.util import URL_REGEX from taggit.utils import edit_string_for_tags, parse_tags from archivebox.base_models.admin import KeyValueWidget +from archivebox.crawls.schedule_utils import validate_schedule DEPTH_CHOICES = ( ('0', 'depth = 0 (archive just these URLs)'), @@ -197,6 +198,18 @@ class AddLinkForm(forms.Form): return cleaned_data + def clean_schedule(self): + schedule = (self.cleaned_data.get('schedule') or '').strip() + if not schedule: + return '' + + try: + validate_schedule(schedule) + except ValueError as err: + raise forms.ValidationError(str(err)) + + return schedule + class TagWidgetMixin: def format_value(self, value): if value is not None and not isinstance(value, str): diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 193e13be..f9c6cc5f 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -2963,6 +2963,8 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi for file_path in plugin_dir.rglob('*'): if not file_path.is_file(): continue + if '.hooks' in file_path.parts: + continue if file_path.name in exclude_names: continue diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index a002d6dc..d7d54d64 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -18,6 +18,7 @@ from rich import print from archivebox.config import CONSTANTS from archivebox.base_models.models import ModelWithUUID, ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, get_or_create_system_user_pk from archivebox.workers.models import ModelWithStateMachine, BaseStateMachine +from archivebox.crawls.schedule_utils import next_run_for_schedule, validate_schedule if TYPE_CHECKING: from archivebox.core.models import Snapshot, ArchiveResult @@ -51,12 +52,50 @@ class CrawlSchedule(ModelWithUUID, ModelWithNotes): return reverse_lazy('api-1:get_any', args=[self.id]) def save(self, *args, **kwargs): + self.schedule = (self.schedule or '').strip() + validate_schedule(self.schedule) self.label = self.label or (self.template.label if self.template else '') super().save(*args, **kwargs) if self.template: self.template.schedule = self self.template.save() + @property + def last_run_at(self): + latest_crawl = self.crawl_set.order_by('-created_at').first() + if latest_crawl: + return latest_crawl.created_at + if self.template: + return self.template.created_at + return self.created_at + + @property + def next_run_at(self): + return next_run_for_schedule(self.schedule, self.last_run_at) + + def is_due(self, now=None) -> bool: + now = now or timezone.now() + return self.is_enabled and self.next_run_at <= now + + def enqueue(self, queued_at=None) -> 'Crawl': + queued_at = queued_at or timezone.now() + template = self.template + label = template.label or self.label + + return Crawl.objects.create( + urls=template.urls, + config=template.config or {}, + max_depth=template.max_depth, + tags_str=template.tags_str, + persona_id=template.persona_id, + label=label, + notes=template.notes, + schedule=self, + status=Crawl.StatusChoices.QUEUED, + retry_at=queued_at, + created_by=template.created_by, + ) + class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWithStateMachine): id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True) @@ -204,6 +243,15 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith if url.strip() and not url.strip().startswith('#') ] + def get_system_task(self) -> str | None: + urls = self.get_urls_list() + if len(urls) != 1: + return None + system_url = urls[0].strip().lower() + if system_url.startswith('archivebox://'): + return system_url + return None + def add_url(self, entry: dict) -> bool: """ @@ -345,6 +393,13 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith def get_runtime_config(): return get_config(crawl=self) + system_task = self.get_system_task() + if system_task == 'archivebox://update': + from archivebox.cli.archivebox_update import process_all_db_snapshots + + process_all_db_snapshots() + return None + machine = Machine.current() declared_binary_names: set[str] = set() @@ -446,6 +501,12 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith print(f'[cyan]📝 Processing {len(records)} records from {hook.name}[/cyan]') for record in records[:3]: print(f' Record: type={record.get("type")}, keys={list(record.keys())[:5]}') + if system_task: + records = [ + record + for record in records + if record.get('type') in ('Binary', 'Machine') + ] overrides = {'crawl': self} stats = process_hook_records(records, overrides=overrides) if stats: @@ -519,6 +580,18 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith install_declared_binaries(declared_binary_names) # Create snapshots from all URLs in self.urls + if system_task: + leaked_snapshots = self.snapshot_set.all() + if leaked_snapshots.exists(): + leaked_count = leaked_snapshots.count() + leaked_snapshots.delete() + print(f'[yellow]⚠️ Removed {leaked_count} leaked snapshot(s) created during system crawl {system_task}[/yellow]') + with open(debug_log, 'a') as f: + f.write(f'Skipping snapshot creation for system crawl: {system_task}\n') + f.write(f'=== Crawl.run() complete ===\n\n') + f.flush() + return None + with open(debug_log, 'a') as f: f.write(f'Creating snapshots from URLs...\n') f.flush() diff --git a/archivebox/crawls/schedule_utils.py b/archivebox/crawls/schedule_utils.py new file mode 100644 index 00000000..1df66ac9 --- /dev/null +++ b/archivebox/crawls/schedule_utils.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from datetime import datetime + +from croniter import croniter + + +SCHEDULE_ALIASES: dict[str, str] = { + "minute": "* * * * *", + "minutely": "* * * * *", + "hour": "0 * * * *", + "hourly": "0 * * * *", + "day": "0 0 * * *", + "daily": "0 0 * * *", + "week": "0 0 * * 0", + "weekly": "0 0 * * 0", + "month": "0 0 1 * *", + "monthly": "0 0 1 * *", + "year": "0 0 1 1 *", + "yearly": "0 0 1 1 *", +} + + +def normalize_schedule(schedule: str) -> str: + normalized = (schedule or "").strip() + if not normalized: + raise ValueError("Schedule cannot be empty.") + + return SCHEDULE_ALIASES.get(normalized.lower(), normalized) + + +def validate_schedule(schedule: str) -> str: + normalized = normalize_schedule(schedule) + if not croniter.is_valid(normalized): + raise ValueError( + "Invalid schedule. Use an alias like daily/weekly/monthly or a cron expression such as '0 */6 * * *'." + ) + return normalized + + +def next_run_for_schedule(schedule: str, after: datetime) -> datetime: + normalized = validate_schedule(schedule) + return croniter(normalized, after).get_next(datetime) diff --git a/archivebox/machine/models.py b/archivebox/machine/models.py index 9629b3aa..f92ac02b 100755 --- a/archivebox/machine/models.py +++ b/archivebox/machine/models.py @@ -1376,22 +1376,54 @@ class Process(models.Model): @property def pid_file(self) -> Path: """Path to PID file for this process.""" - return Path(self.pwd) / 'process.pid' if self.pwd else None + runtime_dir = self.runtime_dir + return runtime_dir / 'process.pid' if runtime_dir else None @property def cmd_file(self) -> Path: """Path to cmd.sh script for this process.""" - return Path(self.pwd) / 'cmd.sh' if self.pwd else None + runtime_dir = self.runtime_dir + return runtime_dir / 'cmd.sh' if runtime_dir else None @property def stdout_file(self) -> Path: """Path to stdout log.""" - return Path(self.pwd) / 'stdout.log' if self.pwd else None + runtime_dir = self.runtime_dir + return runtime_dir / 'stdout.log' if runtime_dir else None @property def stderr_file(self) -> Path: """Path to stderr log.""" - return Path(self.pwd) / 'stderr.log' if self.pwd else None + runtime_dir = self.runtime_dir + return runtime_dir / 'stderr.log' if runtime_dir else None + + @property + def hook_script_name(self) -> str | None: + """Best-effort hook filename extracted from the process command.""" + if self.process_type != self.TypeChoices.HOOK or not self.cmd: + return None + + for arg in self.cmd: + arg = str(arg) + if arg.startswith('-'): + continue + candidate = Path(arg).name + if candidate.startswith('on_') and Path(candidate).suffix in {'.py', '.js', '.sh'}: + return candidate + + return None + + @property + def runtime_dir(self) -> Path | None: + """Directory where this process stores runtime logs/pid/cmd metadata.""" + if not self.pwd: + return None + + base_dir = Path(self.pwd) + hook_name = self.hook_script_name + if hook_name: + return base_dir / '.hooks' / hook_name + return base_dir def tail_stdout(self, lines: int = 50, follow: bool = False): """ @@ -1518,6 +1550,7 @@ class Process(models.Model): def _write_pid_file(self) -> None: """Write PID file with mtime set to process start time.""" if self.pid and self.started_at and self.pid_file: + self.pid_file.parent.mkdir(parents=True, exist_ok=True) # Write PID to file self.pid_file.write_text(str(self.pid)) # Set mtime to process start time for validation @@ -1530,6 +1563,7 @@ class Process(models.Model): def _write_cmd_file(self) -> None: """Write cmd.sh script for debugging/validation.""" if self.cmd and self.cmd_file: + self.cmd_file.parent.mkdir(parents=True, exist_ok=True) # Escape shell arguments (quote if contains space, ", or $) def escape(arg: str) -> str: return f'"{arg.replace(chr(34), chr(92)+chr(34))}"' if any(c in arg for c in ' "$') else arg @@ -1544,16 +1578,19 @@ class Process(models.Model): def ensure_log_files(self) -> None: """Ensure stdout/stderr log files exist for this process.""" - if not self.pwd: + runtime_dir = self.runtime_dir + if not runtime_dir: return try: - Path(self.pwd).mkdir(parents=True, exist_ok=True) + runtime_dir.mkdir(parents=True, exist_ok=True) except OSError: return try: if self.stdout_file: + self.stdout_file.parent.mkdir(parents=True, exist_ok=True) self.stdout_file.touch(exist_ok=True) if self.stderr_file: + self.stderr_file.parent.mkdir(parents=True, exist_ok=True) self.stderr_file.touch(exist_ok=True) except OSError: return @@ -1602,14 +1639,15 @@ class Process(models.Model): # Use provided cwd or default to pwd working_dir = cwd or self.pwd - # Ensure output directory exists - Path(self.pwd).mkdir(parents=True, exist_ok=True) - # Write cmd.sh for debugging self._write_cmd_file() stdout_path = self.stdout_file stderr_path = self.stderr_file + if stdout_path: + stdout_path.parent.mkdir(parents=True, exist_ok=True) + if stderr_path: + stderr_path.parent.mkdir(parents=True, exist_ok=True) with open(stdout_path, 'a') as out, open(stderr_path, 'a') as err: proc = subprocess.Popen( diff --git a/archivebox/misc/checks.py b/archivebox/misc/checks.py index 09929d36..bf97e838 100644 --- a/archivebox/misc/checks.py +++ b/archivebox/misc/checks.py @@ -131,6 +131,7 @@ def check_data_dir_permissions(): from archivebox import DATA_DIR from archivebox.misc.logging import STDERR from archivebox.config.permissions import ARCHIVEBOX_USER, ARCHIVEBOX_GROUP, DEFAULT_PUID, DEFAULT_PGID, IS_ROOT, USER + from archivebox.config.paths import get_or_create_working_tmp_dir, get_or_create_working_lib_dir data_dir_stat = Path(DATA_DIR).stat() data_dir_uid, data_dir_gid = data_dir_stat.st_uid, data_dir_stat.st_gid @@ -156,11 +157,21 @@ def check_data_dir_permissions(): from archivebox.config.common import STORAGE_CONFIG + try: + tmp_dir = get_or_create_working_tmp_dir(autofix=True, quiet=True) or STORAGE_CONFIG.TMP_DIR + except Exception: + tmp_dir = STORAGE_CONFIG.TMP_DIR + + try: + lib_dir = get_or_create_working_lib_dir(autofix=True, quiet=True) or STORAGE_CONFIG.LIB_DIR + except Exception: + lib_dir = STORAGE_CONFIG.LIB_DIR + # Check /tmp dir permissions - check_tmp_dir(STORAGE_CONFIG.TMP_DIR, throw=False, must_exist=True) + check_tmp_dir(tmp_dir, throw=False, must_exist=True) # Check /lib dir permissions - check_lib_dir(STORAGE_CONFIG.LIB_DIR, throw=False, must_exist=True) + check_lib_dir(lib_dir, throw=False, must_exist=True) os.umask(0o777 - int(STORAGE_CONFIG.DIR_OUTPUT_PERMISSIONS, base=8)) # noqa: F821 diff --git a/archivebox/misc/logging_util.py b/archivebox/misc/logging_util.py index a3ad4566..7e5b707c 100644 --- a/archivebox/misc/logging_util.py +++ b/archivebox/misc/logging_util.py @@ -426,14 +426,15 @@ def log_removal_started(snapshots, yes: bool, delete: bool): except (KeyboardInterrupt, EOFError, AssertionError): raise SystemExit(0) -def log_removal_finished(all_links: int, to_remove: int): - if all_links == 0: +def log_removal_finished(remaining_links: int, removed_links: int): + if remaining_links == 0 and removed_links == 0: print() print('[red1][X] No matching links found.[/]') else: + total_before = remaining_links + removed_links print() - print(f'[red1][√] Removed {to_remove} out of {all_links} links from the archive index.[/]') - print(f' Index now contains {all_links - to_remove} links.') + print(f'[red1][√] Removed {removed_links} out of {total_before} links from the archive index.[/]') + print(f' Index now contains {remaining_links} links.') ### Search Indexing Stage diff --git a/archivebox/misc/system.py b/archivebox/misc/system.py index 695d0ac6..a1a55d9b 100644 --- a/archivebox/misc/system.py +++ b/archivebox/misc/system.py @@ -10,7 +10,6 @@ from pathlib import Path from typing import Optional, Union, Set, Tuple from subprocess import _mswindows, PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired -from crontab import CronTab from atomicwrites import atomic_write as lib_atomic_write from archivebox.config.common import STORAGE_CONFIG @@ -170,28 +169,6 @@ def get_dir_size(path: Union[str, Path], recursive: bool=True, pattern: Optional pass return num_bytes, num_dirs, num_files - -CRON_COMMENT = 'archivebox_schedule' - - -@enforce_types -def dedupe_cron_jobs(cron: CronTab) -> CronTab: - deduped: Set[Tuple[str, str]] = set() - - for job in list(cron): - unique_tuple = (str(job.slices), str(job.command)) - if unique_tuple not in deduped: - deduped.add(unique_tuple) - cron.remove(job) - - for schedule, command in deduped: - job = cron.new(command=command, comment=CRON_COMMENT) - job.setall(schedule) - job.enable() - - return cron - - class suppress_output(object): """ A context manager for doing a "deep suppression" of stdout and stderr in diff --git a/archivebox/tests/test_cli_install.py b/archivebox/tests/test_cli_install.py index 6578575c..d839772f 100644 --- a/archivebox/tests/test_cli_install.py +++ b/archivebox/tests/test_cli_install.py @@ -7,6 +7,7 @@ Verify install detects and records binary dependencies in DB. import os import subprocess import sqlite3 +from pathlib import Path from .fixtures import * @@ -94,24 +95,41 @@ def test_install_shows_binary_status(tmp_path, process): assert len(output) > 50 -def test_install_updates_binary_table(tmp_path, process, disable_extractors_dict): - """Test that install command runs successfully. - - Binary records are created lazily when binaries are first used, not during install. - """ +def test_install_updates_binary_table(tmp_path, process): + """Test that install completes and only mutates dependency state.""" os.chdir(tmp_path) + env = os.environ.copy() + tmp_short = Path('/tmp') / f'abx-install-{tmp_path.name}' + tmp_short.mkdir(parents=True, exist_ok=True) + env.update({ + 'TMP_DIR': str(tmp_short), + 'ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS': 'true', + }) - # Run install - it should complete without errors or timeout (which is expected) - # The install command starts the orchestrator which runs continuously - try: - result = subprocess.run( - ['archivebox', 'install'], - capture_output=True, - timeout=30, - env=disable_extractors_dict, - ) - # If it completes, should be successful - assert result.returncode == 0 - except subprocess.TimeoutExpired: - # Timeout is expected since orchestrator runs continuously - pass + result = subprocess.run( + ['archivebox', 'install'], + capture_output=True, + text=True, + timeout=420, + env=env, + ) + + output = result.stdout + result.stderr + assert result.returncode == 0, output + + conn = sqlite3.connect("index.sqlite3") + c = conn.cursor() + + binary_counts = dict(c.execute( + "SELECT status, COUNT(*) FROM machine_binary GROUP BY status" + ).fetchall()) + snapshot_count = c.execute("SELECT COUNT(*) FROM core_snapshot").fetchone()[0] + sealed_crawls = c.execute( + "SELECT COUNT(*) FROM crawls_crawl WHERE status='sealed'" + ).fetchone()[0] + conn.close() + + assert sealed_crawls >= 1 + assert snapshot_count == 0 + assert binary_counts.get('queued', 0) == 0 + assert binary_counts.get('installed', 0) > 0 diff --git a/archivebox/tests/test_cli_remove.py b/archivebox/tests/test_cli_remove.py index 10d1d192..7fa66209 100644 --- a/archivebox/tests/test_cli_remove.py +++ b/archivebox/tests/test_cli_remove.py @@ -99,6 +99,8 @@ def test_remove_yes_flag_skips_confirmation(tmp_path, process, disable_extractor ) assert result.returncode == 0 + output = result.stdout.decode("utf-8") + result.stderr.decode("utf-8") + assert "Index now contains 0 links." in output def test_remove_multiple_snapshots(tmp_path, process, disable_extractors_dict): @@ -173,6 +175,30 @@ def test_remove_nonexistent_url_fails_gracefully(tmp_path, process, disable_extr assert result.returncode != 0 or 'not found' in result.stdout.lower() or 'no matches' in result.stdout.lower() +def test_remove_reports_remaining_link_count_correctly(tmp_path, process, disable_extractors_dict): + """Test remove reports the remaining snapshot count after deletion.""" + os.chdir(tmp_path) + + for url in ['https://example.com', 'https://example.org']: + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', url], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + result = subprocess.run( + ['archivebox', 'remove', 'https://example.org', '--yes'], + capture_output=True, + env=disable_extractors_dict, + check=True, + ) + + output = result.stdout.decode("utf-8") + result.stderr.decode("utf-8") + assert "Removed 1 out of 2 links" in output + assert "Index now contains 1 links." in output + + def test_remove_after_flag(tmp_path, process, disable_extractors_dict): """Test remove --after flag removes snapshots after date.""" os.chdir(tmp_path) diff --git a/archivebox/tests/test_cli_schedule.py b/archivebox/tests/test_cli_schedule.py index ed6f2f5a..47e32c98 100644 --- a/archivebox/tests/test_cli_schedule.py +++ b/archivebox/tests/test_cli_schedule.py @@ -1,56 +1,62 @@ #!/usr/bin/env python3 -""" -Tests for archivebox schedule command. -Verify schedule creates scheduled crawl records. -""" +"""CLI-specific tests for archivebox schedule.""" import os -import subprocess import sqlite3 +import subprocess -from .fixtures import * +from .fixtures import process, disable_extractors_dict -def test_schedule_creates_scheduled_crawl(tmp_path, process, disable_extractors_dict): - """Test that schedule command creates a scheduled crawl.""" +def test_schedule_run_all_enqueues_scheduled_crawl(tmp_path, process, disable_extractors_dict): os.chdir(tmp_path) - result = subprocess.run( - ['archivebox', 'schedule', '--every=day', '--depth=0', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - timeout=30, - ) - - # Should complete (creating schedule or showing usage) - assert result.returncode in [0, 1, 2] - - -def test_schedule_with_every_flag(tmp_path, process, disable_extractors_dict): - """Test schedule with --every flag.""" - os.chdir(tmp_path) - - result = subprocess.run( - ['archivebox', 'schedule', '--every=week', '--depth=0', 'https://example.com'], - capture_output=True, - env=disable_extractors_dict, - timeout=30, - ) - - assert result.returncode in [0, 1, 2] - - -def test_schedule_list_shows_schedules(tmp_path, process): - """Test that schedule can list existing schedules.""" - os.chdir(tmp_path) - - # Try to list schedules - result = subprocess.run( - ['archivebox', 'schedule', '--list'], + subprocess.run( + ['archivebox', 'schedule', '--every=daily', '--depth=0', 'https://example.com'], capture_output=True, text=True, - timeout=30, + check=True, ) - # Should show schedules or empty list - assert result.returncode in [0, 1, 2] + result = subprocess.run( + ['archivebox', 'schedule', '--run-all'], + capture_output=True, + text=True, + env=disable_extractors_dict, + ) + + assert result.returncode == 0 + assert 'Enqueued 1 scheduled crawl' in result.stdout + + conn = sqlite3.connect(tmp_path / "index.sqlite3") + try: + crawl_count = conn.execute("SELECT COUNT(*) FROM crawls_crawl").fetchone()[0] + queued_count = conn.execute("SELECT COUNT(*) FROM crawls_crawl WHERE status = 'queued'").fetchone()[0] + finally: + conn.close() + + assert crawl_count >= 2 + assert queued_count >= 1 + + +def test_schedule_without_import_path_creates_maintenance_schedule(tmp_path, process): + os.chdir(tmp_path) + + result = subprocess.run( + ['archivebox', 'schedule', '--every=day'], + capture_output=True, + text=True, + ) + + assert result.returncode == 0 + assert 'Created scheduled maintenance update' in result.stdout + + conn = sqlite3.connect(tmp_path / "index.sqlite3") + try: + row = conn.execute( + "SELECT urls, status FROM crawls_crawl ORDER BY created_at DESC LIMIT 1" + ).fetchone() + finally: + conn.close() + + assert row == ('archivebox://update', 'sealed') diff --git a/archivebox/tests/test_cli_status.py b/archivebox/tests/test_cli_status.py index 0baac241..97538f5f 100644 --- a/archivebox/tests/test_cli_status.py +++ b/archivebox/tests/test_cli_status.py @@ -7,10 +7,25 @@ Verify status reports accurate collection state from DB and filesystem. import os import subprocess import sqlite3 +from pathlib import Path from .fixtures import * +def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None: + candidates = {snapshot_id} + if len(snapshot_id) == 32: + candidates.add(f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}") + elif len(snapshot_id) == 36 and '-' in snapshot_id: + candidates.add(snapshot_id.replace('-', '')) + + for needle in candidates: + for path in data_dir.rglob(needle): + if path.is_dir(): + return path + return None + + def test_status_runs_successfully(tmp_path, process): """Test that status command runs without error.""" os.chdir(tmp_path) @@ -117,6 +132,37 @@ def test_status_detects_orphaned_directories(tmp_path, process, disable_extracto assert 'orphan' in result.stdout.lower() or '1' in result.stdout +def test_status_counts_new_snapshot_output_dirs_as_archived(tmp_path, process, disable_extractors_dict): + """Test status reads archived/present counts from the current snapshot output layout.""" + os.chdir(tmp_path) + env = disable_extractors_dict.copy() + env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true" + + subprocess.run( + ['archivebox', 'add', '--index-only', '--depth=0', 'https://example.com'], + capture_output=True, + env=env, + check=True, + ) + + conn = sqlite3.connect("index.sqlite3") + c = conn.cursor() + snapshot_id = c.execute("SELECT id FROM core_snapshot WHERE url = ?", ('https://example.com',)).fetchone()[0] + conn.close() + + snapshot_dir = _find_snapshot_dir(tmp_path, str(snapshot_id)) + assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id}" + title_dir = snapshot_dir / "title" + title_dir.mkdir(parents=True, exist_ok=True) + (title_dir / "title.txt").write_text("Example Domain") + + result = subprocess.run(['archivebox', 'status'], capture_output=True, text=True, env=env) + + assert result.returncode == 0, result.stdout + result.stderr + assert 'archived: 1' in result.stdout + assert 'present: 1' in result.stdout + + def test_status_shows_user_info(tmp_path, process): """Test status shows user/login information.""" os.chdir(tmp_path) diff --git a/archivebox/tests/test_cli_version.py b/archivebox/tests/test_cli_version.py index 99bb5051..46382e27 100644 --- a/archivebox/tests/test_cli_version.py +++ b/archivebox/tests/test_cli_version.py @@ -5,12 +5,63 @@ Verify version output and system information reporting. """ import os +import re +import sys +import tempfile import subprocess -import sqlite3 +from pathlib import Path from .fixtures import * +def _archivebox_cli() -> str: + cli = Path(sys.executable).with_name("archivebox") + return str(cli if cli.exists() else "archivebox") + + +def _run_real_cli( + args: list[str], + cwd: Path, + *, + home_dir: Path, + timeout: int = 180, + extra_env: dict[str, str] | None = None, +) -> subprocess.CompletedProcess[str]: + env = os.environ.copy() + env.pop("DATA_DIR", None) + env["HOME"] = str(home_dir) + env["USE_COLOR"] = "False" + env["SHOW_PROGRESS"] = "False" + if extra_env: + env.update(extra_env) + return subprocess.run( + [_archivebox_cli(), *args], + capture_output=True, + text=True, + cwd=cwd, + env=env, + timeout=timeout, + ) + + +def _make_deep_collection_dir(tmp_path: Path) -> Path: + deep_dir = tmp_path / "deep-collection" + for idx in range(6): + deep_dir /= f"segment-{idx}-1234567890abcdef" + deep_dir.mkdir(parents=True) + return deep_dir + + +def _extract_location_path(output: str, key: str) -> Path: + for line in output.splitlines(): + if key not in line: + continue + columns = [column for column in re.split(r"\s{2,}", line.strip()) if column] + if len(columns) >= 5 and columns[1] == key: + return Path(os.path.expanduser(columns[-1])) + raise AssertionError(f"Did not find a {key} location line in output:\n{output}") + + def test_version_quiet_outputs_version_number(tmp_path): """Test that version --quiet outputs just the version number.""" os.chdir(tmp_path) @@ -66,3 +117,32 @@ def test_version_in_uninitialized_dir_still_works(tmp_path): # Should still output version assert result.returncode == 0 assert len(result.stdout.strip()) > 0 + + +def test_version_auto_selects_short_tmp_dir_for_deep_collection_path(tmp_path): + """Test the real CLI init/version flow auto-selects a short TMP_DIR outside deep collections.""" + data_dir = _make_deep_collection_dir(tmp_path) + default_tmp_dir = data_dir / "tmp" + extra_env = {"ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS": "true"} + + with tempfile.TemporaryDirectory(prefix="abx-home-") as home_tmp: + home_dir = Path(home_tmp) + + init_result = _run_real_cli(["init", "--quick"], cwd=data_dir, home_dir=home_dir, extra_env=extra_env) + assert init_result.returncode == 0, init_result.stdout + init_result.stderr + + version_result = _run_real_cli(["version"], cwd=data_dir, home_dir=home_dir, extra_env=extra_env) + output = version_result.stdout + version_result.stderr + + assert version_result.returncode == 0, output + assert "ArchiveBox" in output + assert "TMP_DIR" in output + assert "Error with configured TMP_DIR" not in output + + reported_tmp_dir = _extract_location_path(output, "TMP_DIR") + if not reported_tmp_dir.is_absolute(): + reported_tmp_dir = (data_dir / reported_tmp_dir).resolve() + + assert reported_tmp_dir.exists() + assert not reported_tmp_dir.is_relative_to(default_tmp_dir) + assert len(f"file://{reported_tmp_dir / 'supervisord.sock'}") <= 96 diff --git a/archivebox/tests/test_process_runtime_paths.py b/archivebox/tests/test_process_runtime_paths.py new file mode 100644 index 00000000..da781b4d --- /dev/null +++ b/archivebox/tests/test_process_runtime_paths.py @@ -0,0 +1,38 @@ +import os +import unittest +from pathlib import Path + + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.settings') + + +from archivebox.machine.models import Process + + +class TestProcessRuntimePaths(unittest.TestCase): + def test_hook_processes_use_isolated_runtime_dir(self): + process = Process( + process_type=Process.TypeChoices.HOOK, + pwd='/tmp/archive/example/chrome', + cmd=['node', '/plugins/chrome/on_Snapshot__11_chrome_wait.js', '--url=https://example.com'], + ) + + expected_dir = Path('/tmp/archive/example/chrome/.hooks/on_Snapshot__11_chrome_wait.js') + self.assertEqual(process.runtime_dir, expected_dir) + self.assertEqual(process.stdout_file, expected_dir / 'stdout.log') + self.assertEqual(process.stderr_file, expected_dir / 'stderr.log') + self.assertEqual(process.pid_file, expected_dir / 'process.pid') + + def test_non_hook_processes_keep_runtime_files_in_pwd(self): + process = Process( + process_type=Process.TypeChoices.WORKER, + pwd='/tmp/archive/example', + cmd=['archivebox', 'run', '--snapshot-id', '123'], + ) + + expected_dir = Path('/tmp/archive/example') + self.assertEqual(process.runtime_dir, expected_dir) + self.assertEqual(process.stdout_file, expected_dir / 'stdout.log') + self.assertEqual(process.stderr_file, expected_dir / 'stderr.log') + self.assertEqual(process.pid_file, expected_dir / 'process.pid') + diff --git a/archivebox/tests/test_schedule.py b/archivebox/tests/test_schedule.py index 45e2d222..9ec5166a 100644 --- a/archivebox/tests/test_schedule.py +++ b/archivebox/tests/test_schedule.py @@ -1,44 +1,102 @@ #!/usr/bin/env python3 -"""Integration tests for archivebox schedule command.""" +"""Integration tests for the database-backed archivebox schedule command.""" import os +import sqlite3 import subprocess import pytest -from .fixtures import process, disable_extractors_dict +from .fixtures import process -def test_schedule_show_lists_jobs(tmp_path, process): - """Test that --show lists current scheduled jobs.""" +def _fetchone(tmp_path, query): + conn = sqlite3.connect(tmp_path / "index.sqlite3") + try: + return conn.execute(query).fetchone() + finally: + conn.close() + + +def test_schedule_creates_enabled_db_schedule(tmp_path, process): os.chdir(tmp_path) + result = subprocess.run( + ['archivebox', 'schedule', '--every=daily', '--depth=1', 'https://example.com/feed.xml'], + capture_output=True, + text=True, + ) + + assert result.returncode == 0 + + schedule_row = _fetchone( + tmp_path, + "SELECT schedule, is_enabled, label FROM crawls_crawlschedule ORDER BY created_at DESC LIMIT 1", + ) + crawl_row = _fetchone( + tmp_path, + "SELECT urls, status, max_depth FROM crawls_crawl ORDER BY created_at DESC LIMIT 1", + ) + + assert schedule_row == ('daily', 1, 'Scheduled import: https://example.com/feed.xml') + assert crawl_row == ('https://example.com/feed.xml', 'sealed', 1) + + +def test_schedule_show_lists_enabled_schedules(tmp_path, process): + os.chdir(tmp_path) + + subprocess.run( + ['archivebox', 'schedule', '--every=weekly', 'https://example.com/feed.xml'], + capture_output=True, + text=True, + check=True, + ) + result = subprocess.run( ['archivebox', 'schedule', '--show'], capture_output=True, text=True, ) - # Should either show jobs or indicate no jobs - assert 'no' in result.stdout.lower() or 'archivebox' in result.stdout.lower() or result.returncode == 0 + assert result.returncode == 0 + assert 'Active scheduled crawls' in result.stdout + assert 'https://example.com/feed.xml' in result.stdout + assert 'weekly' in result.stdout -def test_schedule_clear_removes_jobs(tmp_path, process): - """Test that --clear removes scheduled jobs.""" +def test_schedule_clear_disables_existing_schedules(tmp_path, process): os.chdir(tmp_path) + subprocess.run( + ['archivebox', 'schedule', '--every=daily', 'https://example.com/feed.xml'], + capture_output=True, + text=True, + check=True, + ) + result = subprocess.run( ['archivebox', 'schedule', '--clear'], capture_output=True, text=True, ) - # Should complete successfully (may have no jobs to clear) assert result.returncode == 0 + assert 'Disabled 1 scheduled crawl' in result.stdout + + disabled_count = _fetchone( + tmp_path, + "SELECT COUNT(*) FROM crawls_crawlschedule WHERE is_enabled = 0", + )[0] + enabled_count = _fetchone( + tmp_path, + "SELECT COUNT(*) FROM crawls_crawlschedule WHERE is_enabled = 1", + )[0] + + assert disabled_count == 1 + assert enabled_count == 0 def test_schedule_every_requires_valid_period(tmp_path, process): - """Test that --every requires valid time period.""" os.chdir(tmp_path) result = subprocess.run( @@ -47,15 +105,12 @@ def test_schedule_every_requires_valid_period(tmp_path, process): text=True, ) - # Should fail with invalid period - assert result.returncode != 0 or 'invalid' in result.stdout.lower() + assert result.returncode != 0 + assert 'Invalid schedule' in result.stderr or 'Invalid schedule' in result.stdout class TestScheduleCLI: - """Test the CLI interface for schedule command.""" - def test_cli_help(self, tmp_path, process): - """Test that --help works for schedule command.""" os.chdir(tmp_path) result = subprocess.run( @@ -68,7 +123,7 @@ class TestScheduleCLI: assert '--every' in result.stdout assert '--show' in result.stdout assert '--clear' in result.stdout - assert '--depth' in result.stdout + assert '--run-all' in result.stdout if __name__ == '__main__': diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index c83d4a55..d969acc9 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -336,6 +336,7 @@ class Orchestrator: queue_sizes = {} self._enforce_hard_timeouts() + self._materialize_due_schedules() # Check Binary queue machine = Machine.current() @@ -399,6 +400,24 @@ class Orchestrator: return queue_sizes + def _should_process_schedules(self) -> bool: + return (not self.exit_on_idle) and (self.crawl_id is None) + + def _materialize_due_schedules(self) -> None: + if not self._should_process_schedules(): + return + + from archivebox.crawls.models import CrawlSchedule + + now = timezone.now() + due_schedules = CrawlSchedule.objects.filter(is_enabled=True).select_related('template', 'template__created_by') + + for schedule in due_schedules: + if not schedule.is_due(now): + continue + + schedule.enqueue(queued_at=now) + def _enforce_hard_timeouts(self) -> None: """Force-kill and seal hooks/archiveresults/snapshots that exceed hard limits.""" import time diff --git a/archivebox/workers/tests/test_scheduled_crawls.py b/archivebox/workers/tests/test_scheduled_crawls.py new file mode 100644 index 00000000..e0db1c77 --- /dev/null +++ b/archivebox/workers/tests/test_scheduled_crawls.py @@ -0,0 +1,65 @@ +from datetime import timedelta + +from django.contrib.auth import get_user_model +from django.test import TestCase +from django.utils import timezone + +from archivebox.crawls.models import Crawl, CrawlSchedule +from archivebox.workers.orchestrator import Orchestrator + + +class TestScheduledCrawlMaterialization(TestCase): + def setUp(self): + self.user = get_user_model().objects.create_user( + username='schedule-user', + password='password', + ) + + def _create_due_schedule(self) -> CrawlSchedule: + template = Crawl.objects.create( + urls='https://example.com/feed.xml', + max_depth=1, + tags_str='scheduled', + label='Scheduled Feed', + notes='template', + created_by=self.user, + status=Crawl.StatusChoices.SEALED, + retry_at=None, + ) + schedule = CrawlSchedule.objects.create( + template=template, + schedule='daily', + is_enabled=True, + label='Scheduled Feed', + notes='template', + created_by=self.user, + ) + past = timezone.now() - timedelta(days=2) + Crawl.objects.filter(pk=template.pk).update(created_at=past, modified_at=past) + template.refresh_from_db() + schedule.refresh_from_db() + return schedule + + def test_global_orchestrator_materializes_due_schedule(self): + schedule = self._create_due_schedule() + + orchestrator = Orchestrator(exit_on_idle=False) + orchestrator._materialize_due_schedules() + + scheduled_crawls = Crawl.objects.filter(schedule=schedule).order_by('created_at') + self.assertEqual(scheduled_crawls.count(), 2) + + queued_crawl = scheduled_crawls.last() + self.assertEqual(queued_crawl.status, Crawl.StatusChoices.QUEUED) + self.assertEqual(queued_crawl.urls, 'https://example.com/feed.xml') + self.assertEqual(queued_crawl.max_depth, 1) + self.assertEqual(queued_crawl.tags_str, 'scheduled') + + def test_one_shot_orchestrator_does_not_materialize_due_schedule(self): + schedule = self._create_due_schedule() + + Orchestrator(exit_on_idle=True)._materialize_due_schedules() + self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1) + + Orchestrator(exit_on_idle=False, crawl_id=str(schedule.template_id))._materialize_due_schedules() + self.assertEqual(Crawl.objects.filter(schedule=schedule).count(), 1) diff --git a/archivebox/workers/tests/test_snapshot_worker.py b/archivebox/workers/tests/test_snapshot_worker.py new file mode 100644 index 00000000..4233e69c --- /dev/null +++ b/archivebox/workers/tests/test_snapshot_worker.py @@ -0,0 +1,75 @@ +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import patch + +from django.test import SimpleTestCase + +from archivebox.workers.worker import SnapshotWorker + + +class TestSnapshotWorkerRetryForegroundHooks(SimpleTestCase): + def _make_worker(self): + worker = SnapshotWorker.__new__(SnapshotWorker) + worker.pid = 12345 + worker.snapshot = SimpleNamespace( + status='started', + refresh_from_db=lambda: None, + ) + worker._snapshot_exceeded_hard_timeout = lambda: False + worker._seal_snapshot_due_to_timeout = lambda: None + worker._run_hook = lambda *args, **kwargs: SimpleNamespace() + worker._wait_for_hook = lambda *args, **kwargs: None + return worker + + @patch('archivebox.workers.worker.log_worker_event') + def test_retry_skips_successful_hook_with_only_inline_output(self, mock_log): + worker = self._make_worker() + archive_result = SimpleNamespace( + status='succeeded', + output_files={}, + output_str='scrolled 600px', + output_json=None, + refresh_from_db=lambda: None, + ) + + worker._retry_failed_empty_foreground_hooks( + [(Path('/tmp/on_Snapshot__45_infiniscroll.js'), archive_result)], + config={}, + ) + + mock_log.assert_not_called() + + @patch('archivebox.workers.worker.log_worker_event') + def test_retry_replays_failed_hook_with_no_outputs(self, mock_log): + worker = self._make_worker() + run_calls = [] + wait_calls = [] + + def run_hook(*args, **kwargs): + run_calls.append((args, kwargs)) + return SimpleNamespace() + + def wait_for_hook(process, archive_result): + wait_calls.append((process, archive_result)) + archive_result.status = 'succeeded' + archive_result.output_files = {'singlefile.html': {}} + + archive_result = SimpleNamespace( + status='failed', + output_files={}, + output_str='', + output_json=None, + refresh_from_db=lambda: None, + ) + + worker._run_hook = run_hook + worker._wait_for_hook = wait_for_hook + + worker._retry_failed_empty_foreground_hooks( + [(Path('/tmp/on_Snapshot__50_singlefile.py'), archive_result)], + config={}, + ) + + assert len(run_calls) == 1 + assert len(wait_calls) == 1 + mock_log.assert_called_once() diff --git a/archivebox/workers/worker.py b/archivebox/workers/worker.py index e7ceba52..37a920b7 100644 --- a/archivebox/workers/worker.py +++ b/archivebox/workers/worker.py @@ -776,7 +776,7 @@ class SnapshotWorker(Worker): def runloop(self) -> None: """Execute all hooks sequentially.""" - from archivebox.hooks import discover_hooks, is_background_hook + from archivebox.hooks import discover_hooks, is_background_hook, is_finite_background_hook from archivebox.core.models import ArchiveResult, Snapshot from archivebox.config.configset import get_config @@ -797,7 +797,7 @@ class SnapshotWorker(Worker): hooks = sorted(hooks, key=lambda h: h.name) # Sort by name (includes step prefix) foreground_hooks: list[tuple[Path, ArchiveResult]] = [] - launched_background_hooks = False + launched_finite_background_hooks = False # Execute each hook sequentially for hook_path in hooks: @@ -835,7 +835,8 @@ class SnapshotWorker(Worker): process = self._run_hook(hook_path, ar, config) if is_background: - launched_background_hooks = True + if is_finite_background_hook(hook_name): + launched_finite_background_hooks = True # Track but don't wait self.background_processes[hook_name] = process log_worker_event( @@ -860,7 +861,7 @@ class SnapshotWorker(Worker): # All hooks launched (or completed) - terminate bg hooks and seal self._finalize_background_hooks() - if launched_background_hooks: + if launched_finite_background_hooks: self._retry_failed_empty_foreground_hooks(foreground_hooks, config) if self.snapshot.status != Snapshot.StatusChoices.SEALED: # This triggers enter_sealed() which calls cleanup() and checks parent crawl sealing @@ -961,9 +962,13 @@ class SnapshotWorker(Worker): window before giving up. """ import time - from archivebox.core.models import Snapshot + from archivebox.core.models import ArchiveResult, Snapshot retry_delays = (0.0, 0.25, 0.5, 1.0) + retryable_statuses = { + ArchiveResult.StatusChoices.FAILED, + ArchiveResult.StatusChoices.SKIPPED, + } for hook_path, ar in hooks: for attempt, delay in enumerate(retry_delays, start=1): @@ -975,7 +980,9 @@ class SnapshotWorker(Worker): return ar.refresh_from_db() - if ar.output_files: + if ar.status not in retryable_statuses: + break + if ar.output_files or ar.output_str or ar.output_json: break if delay: diff --git a/bin/docker_entrypoint.sh b/bin/docker_entrypoint.sh index b9e10297..efae35aa 100755 --- a/bin/docker_entrypoint.sh +++ b/bin/docker_entrypoint.sh @@ -188,19 +188,6 @@ else fi fi -# symlink etc crontabs into place -mkdir -p "$DATA_DIR"/crontabs -if ! test -L /var/spool/cron/crontabs; then - # move files from old location into new data dir location - for existing_file in /var/spool/cron/crontabs/*; do - mv "$existing_file" "$DATA_DIR/crontabs/" - done - # replace old system path with symlink to data dir location - rm -Rf /var/spool/cron/crontabs - ln -sf "$DATA_DIR/crontabs" /var/spool/cron/crontabs -fi -chown -R $PUID "$DATA_DIR"/crontabs - # set DBUS_SYSTEM_BUS_ADDRESS & DBUS_SESSION_BUS_ADDRESS # (dbus is not actually needed, it makes chrome log fewer warnings but isn't worth making our docker images bigger) # service dbus start >/dev/null 2>&1 & diff --git a/docker-compose.yml b/docker-compose.yml index 6b00c59c..975f5064 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,32 +47,12 @@ services: ######## Optional Addons: tweak examples below as needed for your specific use case ######## - ### This optional container runs scheduled jobs in the background (and retries failed ones). To add a new job: + ### `archivebox server` now runs the orchestrator itself, so scheduled crawls and queued UI/API jobs + # are processed by the main container without needing a separate scheduler sidecar. To add a new job: # $ docker compose run archivebox schedule --add --every=day --depth=1 'https://example.com/some/rss/feed.xml' - # then restart the scheduler container to apply any changes to the scheduled task list: - # $ docker compose restart archivebox_scheduler + # the running server orchestrator will pick it up automatically at the next due time. # https://github.com/ArchiveBox/ArchiveBox/wiki/Scheduled-Archiving - archivebox_scheduler: - - image: archivebox/archivebox:latest - command: schedule --foreground --update --every=day - environment: - # - PUID=911 # set to your host user's UID & GID if you encounter permissions issues - # - PGID=911 - - TIMEOUT=120 # use a higher timeout than the main container to give slow tasks more time when retrying - - SEARCH_BACKEND_ENGINE=sonic # tells ArchiveBox to use sonic container below for fast full-text search - - SEARCH_BACKEND_HOST_NAME=sonic - - SEARCH_BACKEND_PASSWORD=SomeSecretPassword - # For other config it's better to set using `docker compose run archivebox config --set SOME_KEY=someval` instead of setting here - # ... - # For more info, see: https://github.com/ArchiveBox/ArchiveBox/wiki/Docker#configuration - volumes: - - ./data:/data - # cpus: 2 # uncomment / edit these values to limit scheduler container resource consumption - # mem_limit: 2048m - # restart: always - ### This runs the optional Sonic full-text search backend (much faster than default rg backend). # If Sonic is ever started after not running for a while, update its full-text index by running: diff --git a/docs b/docs index 79a8c9bc..a9e347fa 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit 79a8c9bc4ef236f80a099201508c2a89347c1b4c +Subproject commit a9e347fac6fb37f7c5194379aca8aca44839f446 diff --git a/etc/crontabs/archivebox b/etc/crontabs/archivebox deleted file mode 100644 index fbb0acd3..00000000 --- a/etc/crontabs/archivebox +++ /dev/null @@ -1,8 +0,0 @@ -# DO NOT EDIT THIS FILE - edit the master and reinstall. -# (/tmp/tmpe3dawo9u installed on Tue Jun 13 23:21:48 2023) -# (Cron version -- $Id: crontab.c,v 2.13 1994/01/17 03:20:37 vixie Exp $) - -@daily cd /data && /usr/local/bin/archivebox add --depth=0 "https://example.com/3" >> /data/logs/schedule.log 2>&1 # archivebox_schedule -@daily cd /data && /usr/local/bin/archivebox add --depth=0 "https://example.com/2" >> /data/logs/schedule.log 2>&1 # archivebox_schedule -@daily cd /data && /usr/local/bin/archivebox add --depth=0 "https://example.com" >> /data/logs/schedule.log 2>&1 # archivebox_schedule -@daily cd /data && /usr/local/bin/archivebox add --depth=0 "update" >> /data/logs/schedule.log 2>&1 # archivebox_schedule diff --git a/pyproject.toml b/pyproject.toml index 9d89da6d..4c85e91f 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,8 +61,7 @@ dependencies = [ "platformdirs>=4.3.6", # for: finding a xdg-config dir to store tmp/lib files in "py-machineid>=0.6.0", # for: machine/detect.py calculating unique machine guid "atomicwrites==1.4.1", # for: config file writes, index.json file writes, etc. (TODO: remove this deprecated lib in favor of archivebox.filestore.util/os.rename/os.replace) - "python-crontab>=3.2.0", # for: archivebox schedule (TODO: remove this in favor of our own custom archivebox scheduler) - "croniter>=3.0.3", # for: archivebox schedule (TODO: remove this in favor of our own custom archivebox scheduler) + "croniter>=3.0.3", # for: archivebox schedule alias/cron parsing ### Base Types "pydantic>=2.8.0", # for: archivebox.api (django-ninja), archivebox.config (pydantic-settings), and archivebox.index.schema (pydantic) "pydantic-settings>=2.5.2", # for: archivebox.config diff --git a/uv.lock b/uv.lock index 008fab44..89bae3b1 100644 --- a/uv.lock +++ b/uv.lock @@ -110,7 +110,6 @@ dependencies = [ { name = "pydantic-settings", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "pytest-django", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "python-benedict", extra = ["io", "parse"], marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "python-crontab", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "python-statemachine", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "requests", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "rich", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -212,7 +211,6 @@ requires-dist = [ { name = "pydantic-settings", specifier = ">=2.5.2" }, { name = "pytest-django", specifier = ">=4.11.1" }, { name = "python-benedict", extras = ["io", "parse"], specifier = ">=0.33.2" }, - { name = "python-crontab", specifier = ">=3.2.0" }, { name = "python-ldap", marker = "extra == 'ldap'", specifier = ">=3.4.3" }, { name = "python-statemachine", specifier = ">=2.3.6" }, { name = "requests", specifier = ">=2.32.3" }, @@ -2027,15 +2025,6 @@ parse = [ { name = "python-dateutil", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] -[[package]] -name = "python-crontab" -version = "3.3.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/99/7f/c54fb7e70b59844526aa4ae321e927a167678660ab51dda979955eafb89a/python_crontab-3.3.0.tar.gz", hash = "sha256:007c8aee68dddf3e04ec4dce0fac124b93bd68be7470fc95d2a9617a15de291b", size = 57626, upload-time = "2025-07-13T20:05:35.535Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/47/42/bb4afa5b088f64092036221843fc989b7db9d9d302494c1f8b024ee78a46/python_crontab-3.3.0-py3-none-any.whl", hash = "sha256:739a778b1a771379b75654e53fd4df58e5c63a9279a63b5dfe44c0fcc3ee7884", size = 27533, upload-time = "2025-07-13T20:05:34.266Z" }, -] - [[package]] name = "python-dateutil" version = "2.9.0.post0"