diff --git a/.gitignore b/.gitignore
index 00c22d36..9c3dd35d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -38,6 +38,7 @@ lib/
tmp/
data/
data*/
+archivebox/tests/data/
archive/
output/
logs/
diff --git a/archivebox/cli/__init__.py b/archivebox/cli/__init__.py
index 2b38f5ee..1c56fc44 100644
--- a/archivebox/cli/__init__.py
+++ b/archivebox/cli/__init__.py
@@ -107,7 +107,10 @@ class ArchiveBoxGroup(click.Group):
# handle renamed commands
if cmd_name in self.renamed_commands:
new_name = self.renamed_commands[cmd_name]
- print(f' [violet]Hint:[/violet] `archivebox {cmd_name}` has been renamed to `archivebox {new_name}`')
+ print(
+ f' [violet]Hint:[/violet] `archivebox {cmd_name}` has been renamed to `archivebox {new_name}`',
+ file=sys.stderr,
+ )
cmd_name = new_name
ctx.invoked_subcommand = cmd_name
diff --git a/archivebox/cli/archivebox_binary.py b/archivebox/cli/archivebox_binary.py
index 86ce7b4b..f8627d11 100644
--- a/archivebox/cli/archivebox_binary.py
+++ b/archivebox/cli/archivebox_binary.py
@@ -63,11 +63,28 @@ def create_binary(
return 1
try:
- binary, created = Binary.objects.get_or_create(
+ from archivebox.machine.models import Machine
+
+ machine = Machine.current()
+ created = not Binary.objects.filter(
+ machine=machine,
name=name,
abspath=abspath,
- defaults={'version': version}
- )
+ version=version,
+ ).exists()
+
+ # Mirror the Binary model lifecycle used elsewhere in the system so CLI
+ # records are owned by the current machine and can be safely piped into
+ # `archivebox run` without creating invalid rows missing machine_id.
+ binary = Binary.from_json({
+ 'name': name,
+ 'abspath': abspath,
+ 'version': version,
+ 'binproviders': 'env',
+ 'binprovider': 'env',
+ })
+ if binary is None:
+ raise ValueError('failed to create binary record')
if not is_tty:
write_record(binary.to_json())
diff --git a/archivebox/cli/archivebox_extract.py b/archivebox/cli/archivebox_extract.py
index 718755a4..cba9aa16 100644
--- a/archivebox/cli/archivebox_extract.py
+++ b/archivebox/cli/archivebox_extract.py
@@ -81,6 +81,7 @@ def process_archiveresult_by_id(archiveresult_id: str) -> int:
def run_plugins(
args: tuple,
+ records: list[dict] | None = None,
plugins: str = '',
wait: bool = True,
) -> int:
@@ -108,8 +109,12 @@ def run_plugins(
# Parse comma-separated plugins list once (reused in creation and filtering)
plugins_list = [p.strip() for p in plugins.split(',') if p.strip()] if plugins else []
- # Collect all input records
- records = list(read_args_or_stdin(args))
+ # Parse stdin/args exactly once per CLI invocation.
+ # `main()` may already have consumed stdin to distinguish Snapshot input from
+ # ArchiveResult IDs; if so, it must pass the parsed records through here
+ # instead of asking this helper to reread an already-drained pipe.
+ if records is None:
+ records = list(read_args_or_stdin(args))
if not records:
rprint('[yellow]No snapshots provided. Pass snapshot IDs as arguments or via stdin.[/yellow]', file=sys.stderr)
@@ -269,7 +274,7 @@ def main(plugins: str, wait: bool, args: tuple):
sys.exit(exit_code)
else:
# Default behavior: run plugins on Snapshots from input
- sys.exit(run_plugins(args, plugins=plugins, wait=wait))
+ sys.exit(run_plugins(args, records=records, plugins=plugins, wait=wait))
if __name__ == '__main__':
diff --git a/archivebox/cli/tests.py b/archivebox/cli/tests.py
deleted file mode 100644
index b75a6516..00000000
--- a/archivebox/cli/tests.py
+++ /dev/null
@@ -1,231 +0,0 @@
-#!/usr/bin/env python3
-
-__package__ = 'archivebox.cli'
-
-
-import importlib
-import os
-import shutil
-import sys
-import unittest
-from contextlib import contextmanager
-from pathlib import Path
-
-from archivebox.config.constants import CONSTANTS
-
-TEST_CONFIG = {
- 'USE_COLOR': 'False',
- 'SHOW_PROGRESS': 'False',
-
- 'DATA_DIR': 'data.tests',
-
- 'SAVE_ARCHIVEDOTORG': 'False',
- 'SAVE_TITLE': 'False',
-
- 'USE_CURL': 'False',
- 'USE_WGET': 'False',
- 'USE_GIT': 'False',
- 'USE_CHROME': 'False',
- 'USE_YOUTUBEDL': 'False',
-}
-
-DATA_DIR = 'data.tests'
-os.environ.update(TEST_CONFIG)
-
-init = importlib.import_module('archivebox.main').init
-SQL_INDEX_FILENAME = CONSTANTS.SQL_INDEX_FILENAME
-JSON_INDEX_FILENAME = CONSTANTS.JSON_INDEX_FILENAME
-HTML_INDEX_FILENAME = CONSTANTS.HTML_INDEX_FILENAME
-archivebox_init = importlib.import_module('archivebox.cli.archivebox_init')
-archivebox_add = importlib.import_module('archivebox.cli.archivebox_add')
-archivebox_remove = importlib.import_module('archivebox.cli.archivebox_remove')
-parse_json_main_index = importlib.import_module('archivebox.misc.legacy').parse_json_main_index
-
-HIDE_CLI_OUTPUT = True
-
-test_urls = '''
-https://example1.com/what/is/happening.html?what=1#how-about-this=1
-https://example2.com/what/is/happening/?what=1#how-about-this=1
-HTtpS://example3.com/what/is/happening/?what=1#how-about-this=1f
-https://example4.com/what/is/happening.html
-https://example5.com/
-https://example6.com
-
-http://example7.com
-[https://example8.com/what/is/this.php?what=1]
-[and http://example9.com?what=1&other=3#and-thing=2]
-https://example10.com#and-thing=2 "
-abcdef
-sdflkf[what](https://subb.example12.com/who/what.php?whoami=1#whatami=2)?am=hi
-example13.bada
-and example14.badb
-htt://example15.badc
-'''
-
-stdout = sys.stdout
-stderr = sys.stderr
-
-
-def load_main_index(*, out_dir: str):
- index_path = Path(out_dir) / JSON_INDEX_FILENAME
- if not index_path.exists():
- raise FileNotFoundError(index_path)
- return list(parse_json_main_index(Path(out_dir)))
-
-
-@contextmanager
-def output_hidden(show_failing=True):
- if not HIDE_CLI_OUTPUT:
- yield
- return
-
- sys.stdout = open('stdout.txt', 'w+', encoding='utf-8')
- sys.stderr = open('stderr.txt', 'w+', encoding='utf-8')
- try:
- yield
- sys.stdout.close()
- sys.stderr.close()
- sys.stdout = stdout
- sys.stderr = stderr
- except Exception:
- sys.stdout.close()
- sys.stderr.close()
- sys.stdout = stdout
- sys.stderr = stderr
- if show_failing:
- with open('stdout.txt', 'r', encoding='utf-8') as f:
- print(f.read())
- with open('stderr.txt', 'r', encoding='utf-8') as f:
- print(f.read())
- raise
- finally:
- os.remove('stdout.txt')
- os.remove('stderr.txt')
-
-
-class TestInit(unittest.TestCase):
- def setUp(self):
- os.makedirs(DATA_DIR, exist_ok=True)
-
- def tearDown(self):
- shutil.rmtree(DATA_DIR, ignore_errors=True)
-
- def test_basic_init(self):
- with output_hidden():
- archivebox_init.main([])
-
- assert (Path(DATA_DIR) / SQL_INDEX_FILENAME).exists()
- assert (Path(DATA_DIR) / JSON_INDEX_FILENAME).exists()
- assert (Path(DATA_DIR) / HTML_INDEX_FILENAME).exists()
- assert len(load_main_index(out_dir=DATA_DIR)) == 0
-
- def test_conflicting_init(self):
- with open(Path(DATA_DIR) / 'test_conflict.txt', 'w+', encoding='utf-8') as f:
- f.write('test')
-
- try:
- with output_hidden(show_failing=False):
- archivebox_init.main([])
- assert False, 'Init should have exited with an exception'
- except SystemExit:
- pass
-
- assert not (Path(DATA_DIR) / SQL_INDEX_FILENAME).exists()
- assert not (Path(DATA_DIR) / JSON_INDEX_FILENAME).exists()
- assert not (Path(DATA_DIR) / HTML_INDEX_FILENAME).exists()
- try:
- load_main_index(out_dir=DATA_DIR)
- assert False, 'load_main_index should raise an exception when no index is present'
- except Exception:
- pass
-
- def test_no_dirty_state(self):
- with output_hidden():
- init()
- shutil.rmtree(DATA_DIR, ignore_errors=True)
- with output_hidden():
- init()
-
-
-class TestAdd(unittest.TestCase):
- def setUp(self):
- os.makedirs(DATA_DIR, exist_ok=True)
- with output_hidden():
- init()
-
- def tearDown(self):
- shutil.rmtree(DATA_DIR, ignore_errors=True)
-
- def test_add_arg_url(self):
- with output_hidden():
- archivebox_add.main(['https://getpocket.com/users/nikisweeting/feed/all'])
-
- all_links = load_main_index(out_dir=DATA_DIR)
- assert len(all_links) == 30
-
- def test_add_arg_file(self):
- test_file = Path(DATA_DIR) / 'test.txt'
- with open(test_file, 'w+', encoding='utf') as f:
- f.write(test_urls)
-
- with output_hidden():
- archivebox_add.main([test_file])
-
- all_links = load_main_index(out_dir=DATA_DIR)
- assert len(all_links) == 12
- os.remove(test_file)
-
- def test_add_stdin_url(self):
- with output_hidden():
- archivebox_add.main([], stdin=test_urls)
-
- all_links = load_main_index(out_dir=DATA_DIR)
- assert len(all_links) == 12
-
-
-class TestRemove(unittest.TestCase):
- def setUp(self):
- os.makedirs(DATA_DIR, exist_ok=True)
- with output_hidden():
- init()
- archivebox_add.main([], stdin=test_urls)
-
- # def tearDown(self):
- # shutil.rmtree(DATA_DIR, ignore_errors=True)
-
-
- def test_remove_exact(self):
- with output_hidden():
- archivebox_remove.main(['--yes', '--delete', 'https://example5.com/'])
-
- all_links = load_main_index(out_dir=DATA_DIR)
- assert len(all_links) == 11
-
- def test_remove_regex(self):
- with output_hidden():
- archivebox_remove.main(['--yes', '--delete', '--filter-type=regex', r'http(s)?:\/\/(.+\.)?(example\d\.com)'])
-
- all_links = load_main_index(out_dir=DATA_DIR)
- assert len(all_links) == 4
-
- def test_remove_domain(self):
- with output_hidden():
- archivebox_remove.main(['--yes', '--delete', '--filter-type=domain', 'example5.com', 'example6.com'])
-
- all_links = load_main_index(out_dir=DATA_DIR)
- assert len(all_links) == 10
-
- def test_remove_none(self):
- try:
- with output_hidden(show_failing=False):
- archivebox_remove.main(['--yes', '--delete', 'https://doesntexist.com'])
- assert False, 'Should raise if no URLs match'
- except Exception:
- pass
-
-
-if __name__ == '__main__':
- if '--verbose' in sys.argv or '-v' in sys.argv:
- HIDE_CLI_OUTPUT = False
-
- unittest.main()
diff --git a/archivebox/cli/tests_piping.py b/archivebox/cli/tests_piping.py
deleted file mode 100644
index 7359e452..00000000
--- a/archivebox/cli/tests_piping.py
+++ /dev/null
@@ -1,665 +0,0 @@
-#!/usr/bin/env python3
-"""
-Tests for CLI piping workflow: crawl | snapshot | archiveresult | run
-
-This module tests the JSONL-based piping between CLI commands as described in:
-https://github.com/ArchiveBox/ArchiveBox/issues/1363
-
-Workflows tested:
- archivebox crawl create URL -> Crawl JSONL
- archivebox snapshot create -> Snapshot JSONL (accepts Crawl or URL input)
- archivebox archiveresult create -> ArchiveResult JSONL (accepts Snapshot input)
- archivebox run -> Process queued records (accepts any JSONL)
-
-Pipeline:
- archivebox crawl create URL | archivebox snapshot create | archivebox archiveresult create | archivebox run
-
-Each command should:
- - Accept URLs, IDs, or JSONL as input (args or stdin)
- - Output JSONL to stdout when piped (not TTY)
- - Output human-readable to stderr when TTY
-"""
-
-__package__ = 'archivebox.cli'
-
-import os
-import json
-import shutil
-import tempfile
-import unittest
-from io import StringIO
-from pathlib import Path
-from typing import TypeVar
-
-# Test configuration - disable slow extractors
-TEST_CONFIG = {
- 'USE_COLOR': 'False',
- 'SHOW_PROGRESS': 'False',
- 'SAVE_ARCHIVEDOTORG': 'False',
- 'SAVE_TITLE': 'True', # Fast extractor
- 'SAVE_FAVICON': 'False',
- 'SAVE_WGET': 'False',
- 'SAVE_WARC': 'False',
- 'SAVE_PDF': 'False',
- 'SAVE_SCREENSHOT': 'False',
- 'SAVE_DOM': 'False',
- 'SAVE_SINGLEFILE': 'False',
- 'SAVE_READABILITY': 'False',
- 'SAVE_MERCURY': 'False',
- 'SAVE_GIT': 'False',
- 'SAVE_YTDLP': 'False',
- 'SAVE_HEADERS': 'False',
- 'USE_CURL': 'False',
- 'USE_WGET': 'False',
- 'USE_GIT': 'False',
- 'USE_CHROME': 'False',
- 'USE_YOUTUBEDL': 'False',
- 'USE_NODE': 'False',
-}
-
-os.environ.update(TEST_CONFIG)
-
-T = TypeVar('T')
-
-
-def require(value: T | None) -> T:
- if value is None:
- raise AssertionError('Expected value to be present')
- return value
-
-
-class MockTTYStringIO(StringIO):
- def __init__(self, initial_value: str = '', *, is_tty: bool):
- super().__init__(initial_value)
- self._is_tty = is_tty
-
- def isatty(self) -> bool:
- return self._is_tty
-
-
-# =============================================================================
-# JSONL Utility Tests
-# =============================================================================
-
-class TestJSONLParsing(unittest.TestCase):
- """Test JSONL input parsing utilities."""
-
- def test_parse_plain_url(self):
- """Plain URLs should be parsed as Snapshot records."""
- from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
-
- result = require(parse_line('https://example.com'))
- self.assertEqual(result['type'], TYPE_SNAPSHOT)
- self.assertEqual(result['url'], 'https://example.com')
-
- def test_parse_jsonl_snapshot(self):
- """JSONL Snapshot records should preserve all fields."""
- from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
-
- line = '{"type": "Snapshot", "url": "https://example.com", "tags": "test,demo"}'
- result = require(parse_line(line))
- self.assertEqual(result['type'], TYPE_SNAPSHOT)
- self.assertEqual(result['url'], 'https://example.com')
- self.assertEqual(result['tags'], 'test,demo')
-
- def test_parse_jsonl_crawl(self):
- """JSONL Crawl records should be parsed correctly."""
- from archivebox.misc.jsonl import parse_line, TYPE_CRAWL
-
- line = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com", "max_depth": 1}'
- result = require(parse_line(line))
- self.assertEqual(result['type'], TYPE_CRAWL)
- self.assertEqual(result['id'], 'abc123')
- self.assertEqual(result['urls'], 'https://example.com')
- self.assertEqual(result['max_depth'], 1)
-
- def test_parse_jsonl_with_id(self):
- """JSONL with id field should be recognized."""
- from archivebox.misc.jsonl import parse_line
-
- line = '{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}'
- result = require(parse_line(line))
- self.assertEqual(result['id'], 'abc123')
- self.assertEqual(result['url'], 'https://example.com')
-
- def test_parse_uuid_as_snapshot_id(self):
- """Bare UUIDs should be parsed as snapshot IDs."""
- from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
-
- uuid = '01234567-89ab-cdef-0123-456789abcdef'
- result = require(parse_line(uuid))
- self.assertEqual(result['type'], TYPE_SNAPSHOT)
- self.assertEqual(result['id'], uuid)
-
- def test_parse_empty_line(self):
- """Empty lines should return None."""
- from archivebox.misc.jsonl import parse_line
-
- self.assertIsNone(parse_line(''))
- self.assertIsNone(parse_line(' '))
- self.assertIsNone(parse_line('\n'))
-
- def test_parse_comment_line(self):
- """Comment lines should return None."""
- from archivebox.misc.jsonl import parse_line
-
- self.assertIsNone(parse_line('# This is a comment'))
- self.assertIsNone(parse_line(' # Indented comment'))
-
- def test_parse_invalid_url(self):
- """Invalid URLs should return None."""
- from archivebox.misc.jsonl import parse_line
-
- self.assertIsNone(parse_line('not-a-url'))
- self.assertIsNone(parse_line('ftp://example.com')) # Only http/https/file
-
- def test_parse_file_url(self):
- """file:// URLs should be parsed."""
- from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
-
- result = require(parse_line('file:///path/to/file.txt'))
- self.assertEqual(result['type'], TYPE_SNAPSHOT)
- self.assertEqual(result['url'], 'file:///path/to/file.txt')
-
-
-# Note: JSONL output serialization is tested in TestPipingWorkflowIntegration
-# using real model instances, not mocks.
-
-
-class TestReadArgsOrStdin(unittest.TestCase):
- """Test reading from args or stdin."""
-
- def test_read_from_args(self):
- """Should read URLs from command line args."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- args = ('https://example1.com', 'https://example2.com')
- records = list(read_args_or_stdin(args))
-
- self.assertEqual(len(records), 2)
- self.assertEqual(records[0]['url'], 'https://example1.com')
- self.assertEqual(records[1]['url'], 'https://example2.com')
-
- def test_read_from_stdin(self):
- """Should read URLs from stdin when no args provided."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- stdin_content = 'https://example1.com\nhttps://example2.com\n'
- stream = MockTTYStringIO(stdin_content, is_tty=False)
-
- records = list(read_args_or_stdin((), stream=stream))
-
- self.assertEqual(len(records), 2)
- self.assertEqual(records[0]['url'], 'https://example1.com')
- self.assertEqual(records[1]['url'], 'https://example2.com')
-
- def test_read_jsonl_from_stdin(self):
- """Should read JSONL from stdin."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- stdin_content = '{"type": "Snapshot", "url": "https://example.com", "tags": "test"}\n'
- stream = MockTTYStringIO(stdin_content, is_tty=False)
-
- records = list(read_args_or_stdin((), stream=stream))
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['url'], 'https://example.com')
- self.assertEqual(records[0]['tags'], 'test')
-
- def test_read_crawl_jsonl_from_stdin(self):
- """Should read Crawl JSONL from stdin."""
- from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL
-
- stdin_content = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com\\nhttps://foo.com"}\n'
- stream = MockTTYStringIO(stdin_content, is_tty=False)
-
- records = list(read_args_or_stdin((), stream=stream))
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['type'], TYPE_CRAWL)
- self.assertEqual(records[0]['id'], 'abc123')
-
- def test_skip_tty_stdin(self):
- """Should not read from TTY stdin (would block)."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- stream = MockTTYStringIO('https://example.com', is_tty=True)
-
- records = list(read_args_or_stdin((), stream=stream))
- self.assertEqual(len(records), 0)
-
-
-# =============================================================================
-# Unit Tests for Individual Commands
-# =============================================================================
-
-class TestCrawlCommand(unittest.TestCase):
- """Unit tests for archivebox crawl command."""
-
- def setUp(self):
- """Set up test environment."""
- self.test_dir = tempfile.mkdtemp()
- os.environ['DATA_DIR'] = self.test_dir
-
- def tearDown(self):
- """Clean up test environment."""
- shutil.rmtree(self.test_dir, ignore_errors=True)
-
- def test_crawl_accepts_url(self):
- """crawl should accept URLs as input."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- args = ('https://example.com',)
- records = list(read_args_or_stdin(args))
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['url'], 'https://example.com')
-
- def test_crawl_output_format(self):
- """crawl should output Crawl JSONL records."""
- from archivebox.misc.jsonl import TYPE_CRAWL
-
- # Mock crawl output
- crawl_output = {
- 'type': TYPE_CRAWL,
- 'schema_version': '0.9.0',
- 'id': 'test-crawl-id',
- 'urls': 'https://example.com',
- 'status': 'queued',
- 'max_depth': 0,
- }
-
- self.assertEqual(crawl_output['type'], TYPE_CRAWL)
- self.assertIn('id', crawl_output)
- self.assertIn('urls', crawl_output)
-
-
-class TestSnapshotCommand(unittest.TestCase):
- """Unit tests for archivebox snapshot command."""
-
- def setUp(self):
- """Set up test environment."""
- self.test_dir = tempfile.mkdtemp()
- os.environ['DATA_DIR'] = self.test_dir
-
- def tearDown(self):
- """Clean up test environment."""
- shutil.rmtree(self.test_dir, ignore_errors=True)
-
- def test_snapshot_accepts_url(self):
- """snapshot should accept URLs as input."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- args = ('https://example.com',)
- records = list(read_args_or_stdin(args))
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['url'], 'https://example.com')
-
- def test_snapshot_accepts_crawl_jsonl(self):
- """snapshot should accept Crawl JSONL as input."""
- from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL
-
- stdin = MockTTYStringIO('{"type": "Crawl", "id": "abc123", "urls": "https://example.com"}\n', is_tty=False)
-
- records = list(read_args_or_stdin((), stream=stdin))
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['type'], TYPE_CRAWL)
- self.assertEqual(records[0]['id'], 'abc123')
- self.assertEqual(records[0]['urls'], 'https://example.com')
-
- def test_snapshot_accepts_jsonl_with_metadata(self):
- """snapshot should accept JSONL with tags and other metadata."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- stdin = MockTTYStringIO('{"type": "Snapshot", "url": "https://example.com", "tags": "tag1,tag2", "title": "Test"}\n', is_tty=False)
-
- records = list(read_args_or_stdin((), stream=stdin))
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['url'], 'https://example.com')
- self.assertEqual(records[0]['tags'], 'tag1,tag2')
- self.assertEqual(records[0]['title'], 'Test')
-
- # Note: Snapshot output format is tested in integration tests
- # (TestPipingWorkflowIntegration.test_snapshot_creates_and_outputs_jsonl)
- # using real Snapshot instances.
-
-
-class TestArchiveResultCommand(unittest.TestCase):
- """Unit tests for archivebox archiveresult command."""
-
- def setUp(self):
- """Set up test environment."""
- self.test_dir = tempfile.mkdtemp()
- os.environ['DATA_DIR'] = self.test_dir
-
- def tearDown(self):
- """Clean up test environment."""
- shutil.rmtree(self.test_dir, ignore_errors=True)
-
- def test_archiveresult_accepts_snapshot_id(self):
- """archiveresult should accept snapshot IDs as input."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- uuid = '01234567-89ab-cdef-0123-456789abcdef'
- args = (uuid,)
- records = list(read_args_or_stdin(args))
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['id'], uuid)
-
- def test_archiveresult_accepts_jsonl_snapshot(self):
- """archiveresult should accept JSONL Snapshot records."""
- from archivebox.misc.jsonl import read_args_or_stdin, TYPE_SNAPSHOT
-
- stdin = MockTTYStringIO('{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}\n', is_tty=False)
-
- records = list(read_args_or_stdin((), stream=stdin))
-
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0]['type'], TYPE_SNAPSHOT)
- self.assertEqual(records[0]['id'], 'abc123')
-
- def test_archiveresult_gathers_snapshot_ids(self):
- """archiveresult should gather snapshot IDs from various input formats."""
- from archivebox.misc.jsonl import TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
-
- records = [
- {'type': TYPE_SNAPSHOT, 'id': 'snap-1'},
- {'type': TYPE_SNAPSHOT, 'id': 'snap-2', 'url': 'https://example.com'},
- {'type': TYPE_ARCHIVERESULT, 'snapshot_id': 'snap-3'},
- {'id': 'snap-4'}, # Bare id
- ]
-
- snapshot_ids = set()
- for record in records:
- record_type = record.get('type')
-
- if record_type == TYPE_SNAPSHOT:
- snapshot_id = record.get('id')
- if snapshot_id:
- snapshot_ids.add(snapshot_id)
- elif record_type == TYPE_ARCHIVERESULT:
- snapshot_id = record.get('snapshot_id')
- if snapshot_id:
- snapshot_ids.add(snapshot_id)
- elif 'id' in record:
- snapshot_ids.add(record['id'])
-
- self.assertEqual(len(snapshot_ids), 4)
- self.assertIn('snap-1', snapshot_ids)
- self.assertIn('snap-2', snapshot_ids)
- self.assertIn('snap-3', snapshot_ids)
- self.assertIn('snap-4', snapshot_ids)
-
-
-# =============================================================================
-# URL Collection Tests
-# =============================================================================
-
-class TestURLCollection(unittest.TestCase):
- """Test collecting urls.jsonl from extractor output."""
-
- def setUp(self):
- """Create test directory structure."""
- self.test_dir = Path(tempfile.mkdtemp())
-
- # Create fake extractor output directories with urls.jsonl
- (self.test_dir / 'wget').mkdir()
- (self.test_dir / 'wget' / 'urls.jsonl').write_text(
- '{"url": "https://wget-link-1.com"}\n'
- '{"url": "https://wget-link-2.com"}\n'
- )
-
- (self.test_dir / 'parse_html_urls').mkdir()
- (self.test_dir / 'parse_html_urls' / 'urls.jsonl').write_text(
- '{"url": "https://html-link-1.com"}\n'
- '{"url": "https://html-link-2.com", "title": "HTML Link 2"}\n'
- )
-
- (self.test_dir / 'screenshot').mkdir()
- # No urls.jsonl in screenshot dir - not a parser
-
- def tearDown(self):
- """Clean up test directory."""
- shutil.rmtree(self.test_dir, ignore_errors=True)
-
- def test_collect_urls_from_plugins(self):
- """Should collect urls.jsonl from all parser plugin subdirectories."""
- from archivebox.hooks import collect_urls_from_plugins
-
- urls = collect_urls_from_plugins(self.test_dir)
-
- self.assertEqual(len(urls), 4)
-
- # Check that plugin is set
- plugins = {u['plugin'] for u in urls}
- self.assertIn('wget', plugins)
- self.assertIn('parse_html_urls', plugins)
- self.assertNotIn('screenshot', plugins) # No urls.jsonl
-
- def test_collect_urls_preserves_metadata(self):
- """Should preserve metadata from urls.jsonl entries."""
- from archivebox.hooks import collect_urls_from_plugins
-
- urls = collect_urls_from_plugins(self.test_dir)
-
- # Find the entry with title
- titled = [u for u in urls if u.get('title') == 'HTML Link 2']
- self.assertEqual(len(titled), 1)
- self.assertEqual(titled[0]['url'], 'https://html-link-2.com')
-
- def test_collect_urls_empty_dir(self):
- """Should handle empty or non-existent directories."""
- from archivebox.hooks import collect_urls_from_plugins
-
- empty_dir = self.test_dir / 'nonexistent'
- urls = collect_urls_from_plugins(empty_dir)
-
- self.assertEqual(len(urls), 0)
-
-
-class TestEdgeCases(unittest.TestCase):
- """Test edge cases and error handling."""
-
- def test_empty_input(self):
- """Commands should handle empty input gracefully."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- # Empty args, TTY stdin (should not block)
- stdin = MockTTYStringIO('', is_tty=True)
-
- records = list(read_args_or_stdin((), stream=stdin))
- self.assertEqual(len(records), 0)
-
- def test_malformed_jsonl(self):
- """Should skip malformed JSONL lines."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- stdin = MockTTYStringIO(
- '{"url": "https://good.com"}\n'
- 'not valid json\n'
- '{"url": "https://also-good.com"}\n',
- is_tty=False,
- )
-
- records = list(read_args_or_stdin((), stream=stdin))
-
- self.assertEqual(len(records), 2)
- urls = {r['url'] for r in records}
- self.assertEqual(urls, {'https://good.com', 'https://also-good.com'})
-
- def test_mixed_input_formats(self):
- """Should handle mixed URLs and JSONL."""
- from archivebox.misc.jsonl import read_args_or_stdin
-
- stdin = MockTTYStringIO(
- 'https://plain-url.com\n'
- '{"type": "Snapshot", "url": "https://jsonl-url.com", "tags": "test"}\n'
- '01234567-89ab-cdef-0123-456789abcdef\n', # UUID
- is_tty=False,
- )
-
- records = list(read_args_or_stdin((), stream=stdin))
-
- self.assertEqual(len(records), 3)
-
- # Plain URL
- self.assertEqual(records[0]['url'], 'https://plain-url.com')
-
- # JSONL with metadata
- self.assertEqual(records[1]['url'], 'https://jsonl-url.com')
- self.assertEqual(records[1]['tags'], 'test')
-
- # UUID
- self.assertEqual(records[2]['id'], '01234567-89ab-cdef-0123-456789abcdef')
-
- def test_crawl_with_multiple_urls(self):
- """Crawl should handle multiple URLs in a single crawl."""
- from archivebox.misc.jsonl import TYPE_CRAWL
-
- # Test crawl JSONL with multiple URLs
- crawl_output = {
- 'type': TYPE_CRAWL,
- 'id': 'test-multi-url-crawl',
- 'urls': 'https://url1.com\nhttps://url2.com\nhttps://url3.com',
- 'max_depth': 0,
- }
-
- # Parse the URLs
- urls = [u.strip() for u in crawl_output['urls'].split('\n') if u.strip()]
-
- self.assertEqual(len(urls), 3)
- self.assertEqual(urls[0], 'https://url1.com')
- self.assertEqual(urls[1], 'https://url2.com')
- self.assertEqual(urls[2], 'https://url3.com')
-
-
-# =============================================================================
-# Pass-Through Behavior Tests
-# =============================================================================
-
-class TestPassThroughBehavior(unittest.TestCase):
- """Test pass-through behavior in CLI commands."""
-
- def test_crawl_passes_through_other_types(self):
- """crawl create should pass through records with other types."""
-
- # Input: a Tag record (not a Crawl or URL)
- tag_record = {'type': 'Tag', 'id': 'test-tag', 'name': 'example'}
- url_record = {'url': 'https://example.com'}
-
- # Mock stdin with both records
- stdin = MockTTYStringIO(
- json.dumps(tag_record)
- + '\n'
- + json.dumps(url_record),
- is_tty=False,
- )
-
- # The Tag should be passed through, the URL should create a Crawl
- # (This is a unit test of the pass-through logic)
- from archivebox.misc.jsonl import read_args_or_stdin
- records = list(read_args_or_stdin((), stream=stdin))
-
- self.assertEqual(len(records), 2)
- # First record is a Tag (other type)
- self.assertEqual(records[0]['type'], 'Tag')
- # Second record has a URL
- self.assertIn('url', records[1])
-
- def test_snapshot_passes_through_crawl(self):
- """snapshot create should pass through Crawl records."""
- from archivebox.misc.jsonl import TYPE_CRAWL
-
- crawl_record = {
- 'type': TYPE_CRAWL,
- 'id': 'test-crawl',
- 'urls': 'https://example.com',
- }
-
- # Crawl records should be passed through AND create snapshots
- # This tests the accumulation behavior
- self.assertEqual(crawl_record['type'], TYPE_CRAWL)
- self.assertIn('urls', crawl_record)
-
- def test_archiveresult_passes_through_snapshot(self):
- """archiveresult create should pass through Snapshot records."""
- from archivebox.misc.jsonl import TYPE_SNAPSHOT
-
- snapshot_record = {
- 'type': TYPE_SNAPSHOT,
- 'id': 'test-snapshot',
- 'url': 'https://example.com',
- }
-
- # Snapshot records should be passed through
- self.assertEqual(snapshot_record['type'], TYPE_SNAPSHOT)
- self.assertIn('url', snapshot_record)
-
- def test_run_passes_through_unknown_types(self):
- """run should pass through records with unknown types."""
- unknown_record = {'type': 'Unknown', 'id': 'test', 'data': 'value'}
-
- # Unknown types should be passed through unchanged
- self.assertEqual(unknown_record['type'], 'Unknown')
- self.assertIn('data', unknown_record)
-
-
-class TestPipelineAccumulation(unittest.TestCase):
- """Test that pipelines accumulate records correctly."""
-
- def test_full_pipeline_output_types(self):
- """Full pipeline should output all record types."""
- from archivebox.misc.jsonl import TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
-
- # Simulated pipeline output after: crawl | snapshot | archiveresult | run
- # Should contain Crawl, Snapshot, and ArchiveResult records
- pipeline_output = [
- {'type': TYPE_CRAWL, 'id': 'c1', 'urls': 'https://example.com'},
- {'type': TYPE_SNAPSHOT, 'id': 's1', 'url': 'https://example.com'},
- {'type': TYPE_ARCHIVERESULT, 'id': 'ar1', 'plugin': 'title'},
- ]
-
- types = {r['type'] for r in pipeline_output}
- self.assertIn(TYPE_CRAWL, types)
- self.assertIn(TYPE_SNAPSHOT, types)
- self.assertIn(TYPE_ARCHIVERESULT, types)
-
- def test_pipeline_preserves_ids(self):
- """Pipeline should preserve record IDs through all stages."""
- records = [
- {'type': 'Crawl', 'id': 'c1', 'urls': 'https://example.com'},
- {'type': 'Snapshot', 'id': 's1', 'url': 'https://example.com'},
- ]
-
- # All records should have IDs
- for record in records:
- self.assertIn('id', record)
- self.assertTrue(record['id'])
-
- def test_jq_transform_pattern(self):
- """Test pattern for jq transforms in pipeline."""
- # Simulated: archiveresult list --status=failed | jq 'del(.id) | .status = "queued"'
- failed_record = {
- 'type': 'ArchiveResult',
- 'id': 'ar1',
- 'status': 'failed',
- 'plugin': 'wget',
- }
-
- # Transform: delete id, set status to queued
- transformed = {
- 'type': failed_record['type'],
- 'status': 'queued',
- 'plugin': failed_record['plugin'],
- }
-
- self.assertNotIn('id', transformed)
- self.assertEqual(transformed['status'], 'queued')
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/archivebox/core/tests.py b/archivebox/core/tests.py
deleted file mode 100644
index 5962fb1b..00000000
--- a/archivebox/core/tests.py
+++ /dev/null
@@ -1,382 +0,0 @@
-"""Tests for the core views, especially AddView."""
-
-import importlib
-import os
-import django
-from unittest.mock import patch
-from typing import TypeVar, cast
-
-from django.forms import BaseForm
-
-# Set up Django before importing any Django-dependent modules
-os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'archivebox.settings')
-django.setup()
-
-TestCase = importlib.import_module('django.test').TestCase
-Client = importlib.import_module('django.test').Client
-User = importlib.import_module('django.contrib.auth.models').User
-reverse = importlib.import_module('django.urls').reverse
-Crawl = importlib.import_module('archivebox.crawls.models').Crawl
-CrawlSchedule = importlib.import_module('archivebox.crawls.models').CrawlSchedule
-Tag = importlib.import_module('archivebox.core.models').Tag
-SERVER_CONFIG = importlib.import_module('archivebox.config.common').SERVER_CONFIG
-
-T = TypeVar('T')
-
-
-def require(value: T | None) -> T:
- if value is None:
- raise AssertionError('Expected value to be present')
- return value
-
-
-class AddViewTests(TestCase):
- """Tests for the AddView (crawl creation form)."""
-
- def setUp(self):
- """Set up test user and client."""
- self.client = Client()
- self.user = User.objects.create_user(
- username='testuser',
- password='testpass123',
- email='test@example.com'
- )
- self.client.login(username='testuser', password='testpass123')
- self.add_url = reverse('add')
-
- def test_add_view_get_requires_auth(self):
- """Test that GET /add requires authentication."""
- self.client.logout()
- response = self.client.get(self.add_url)
- # Should redirect to login or show 403/404
- self.assertIn(response.status_code, [302, 403, 404])
-
- def test_add_view_get_shows_form(self):
- """Test that GET /add shows the form with all fields."""
- response = self.client.get(self.add_url)
- self.assertEqual(response.status_code, 200)
-
- # Check that form fields are present
- self.assertContains(response, 'name="url"')
- self.assertContains(response, 'name="tag"')
- self.assertContains(response, 'name="depth"')
- self.assertContains(response, 'name="notes"')
- self.assertContains(response, 'name="schedule"')
- self.assertContains(response, 'name="persona"')
- self.assertContains(response, 'name="overwrite"')
- self.assertContains(response, 'name="update"')
- self.assertContains(response, 'name="index_only"')
-
- # Check for plugin groups
- self.assertContains(response, 'name="chrome_plugins"')
- self.assertContains(response, 'name="archiving_plugins"')
- self.assertContains(response, 'name="parsing_plugins"')
-
- def test_add_view_shows_tag_autocomplete(self):
- """Test that tag autocomplete datalist is rendered."""
- # Create some tags
- Tag.objects.create(name='test-tag-1')
- Tag.objects.create(name='test-tag-2')
-
- response = self.client.get(self.add_url)
- self.assertEqual(response.status_code, 200)
-
- # Check for datalist with tags
- self.assertContains(response, 'id="tag-datalist"')
- self.assertContains(response, 'test-tag-1')
- self.assertContains(response, 'test-tag-2')
-
- def test_add_view_shows_plugin_presets(self):
- """Test that plugin preset buttons are rendered."""
- response = self.client.get(self.add_url)
- self.assertEqual(response.status_code, 200)
-
- self.assertContains(response, 'Quick Archive')
- self.assertContains(response, 'Full Chrome')
- self.assertContains(response, 'Text Only')
- self.assertContains(response, 'Select All')
- self.assertContains(response, 'Clear All')
-
- def test_add_view_shows_links_to_resources(self):
- """Test that helpful links are present."""
- response = self.client.get(self.add_url)
- self.assertEqual(response.status_code, 200)
-
- # Link to plugin documentation
- self.assertContains(response, '/admin/environment/plugins/')
-
- # Link to create new persona
- self.assertContains(response, '/admin/personas/persona/add/')
-
- def test_add_basic_crawl_without_schedule(self):
- """Test creating a basic crawl without a schedule."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com\nhttps://example.org',
- 'tag': 'test-tag',
- 'depth': '0',
- 'notes': 'Test crawl notes',
- })
-
- # Should redirect to crawl admin page
- self.assertEqual(response.status_code, 302)
-
- # Check that crawl was created
- self.assertEqual(Crawl.objects.count(), 1)
- crawl = require(Crawl.objects.first())
-
- self.assertIn('https://example.com', crawl.urls)
- self.assertIn('https://example.org', crawl.urls)
- self.assertEqual(crawl.tags_str, 'test-tag')
- self.assertEqual(crawl.max_depth, 0)
- self.assertEqual(crawl.notes, 'Test crawl notes')
- self.assertEqual(crawl.created_by, self.user)
-
- # No schedule should be created
- self.assertIsNone(crawl.schedule)
- self.assertEqual(CrawlSchedule.objects.count(), 0)
-
- def test_add_crawl_with_schedule(self):
- """Test creating a crawl with a repeat schedule."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'tag': 'scheduled',
- 'depth': '1',
- 'notes': 'Daily crawl',
- 'schedule': 'daily',
- })
-
- self.assertEqual(response.status_code, 302)
-
- # Check that crawl and schedule were created
- self.assertEqual(Crawl.objects.count(), 1)
- self.assertEqual(CrawlSchedule.objects.count(), 1)
-
- crawl = require(Crawl.objects.first())
- schedule = require(CrawlSchedule.objects.first())
-
- self.assertEqual(crawl.schedule, schedule)
- self.assertEqual(schedule.template, crawl)
- self.assertEqual(schedule.schedule, 'daily')
- self.assertTrue(schedule.is_enabled)
- self.assertEqual(schedule.created_by, self.user)
-
- def test_add_crawl_with_cron_schedule(self):
- """Test creating a crawl with a cron format schedule."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- 'schedule': '0 */6 * * *', # Every 6 hours
- })
-
- self.assertEqual(response.status_code, 302)
-
- schedule = require(CrawlSchedule.objects.first())
- self.assertEqual(schedule.schedule, '0 */6 * * *')
-
- def test_add_crawl_with_plugins(self):
- """Test creating a crawl with specific plugins selected."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- 'chrome_plugins': ['screenshot', 'dom'],
- 'archiving_plugins': ['wget'],
- })
-
- self.assertEqual(response.status_code, 302)
-
- crawl = require(Crawl.objects.first())
- plugins = crawl.config.get('PLUGINS', '')
-
- # Should contain the selected plugins
- self.assertIn('screenshot', plugins)
- self.assertIn('dom', plugins)
- self.assertIn('wget', plugins)
-
- def test_add_crawl_with_depth_range(self):
- """Test creating crawls with different depth values (0-4)."""
- for depth in range(5):
- response = self.client.post(self.add_url, {
- 'url': f'https://example{depth}.com',
- 'depth': str(depth),
- })
-
- self.assertEqual(response.status_code, 302)
-
- self.assertEqual(Crawl.objects.count(), 5)
-
- for i, crawl in enumerate(Crawl.objects.order_by('created_at')):
- self.assertEqual(crawl.max_depth, i)
-
- def test_add_crawl_with_advanced_options(self):
- """Test creating a crawl with advanced options."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- 'persona': 'CustomPersona',
- 'overwrite': True,
- 'update': True,
- 'index_only': True,
- })
-
- self.assertEqual(response.status_code, 302)
-
- crawl = require(Crawl.objects.first())
- config = crawl.config
-
- self.assertEqual(config.get('DEFAULT_PERSONA'), 'CustomPersona')
- self.assertEqual(config.get('OVERWRITE'), True)
- self.assertEqual(config.get('ONLY_NEW'), False) # opposite of update
- self.assertEqual(config.get('INDEX_ONLY'), True)
-
- def test_add_crawl_with_custom_config(self):
- """Test creating a crawl with custom config overrides."""
- # Note: Django test client can't easily POST the KeyValueWidget format,
- # so this test would need to use the form directly or mock the cleaned_data
- # For now, we'll skip this test or mark it as TODO
- pass
-
- def test_add_public_anonymous_custom_config_is_silently_stripped(self):
- """Anonymous users cannot override crawl config, even with PUBLIC_ADD_VIEW enabled."""
- self.client.logout()
-
- with patch.object(SERVER_CONFIG, 'PUBLIC_ADD_VIEW', True):
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- 'config': '{"YTDLP_ARGS_EXTRA":["--exec","id > /tmp/pwned"]}',
- })
-
- self.assertEqual(response.status_code, 302)
- crawl = require(Crawl.objects.order_by('-created_at').first())
- self.assertNotIn('YTDLP_ARGS_EXTRA', crawl.config)
-
- def test_add_authenticated_non_admin_custom_config_is_silently_stripped(self):
- """Authenticated non-admin users cannot override crawl config."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- 'config': '{"YTDLP_ARGS_EXTRA":["--exec","id > /tmp/pwned"]}',
- })
-
- self.assertEqual(response.status_code, 302)
- crawl = require(Crawl.objects.order_by('-created_at').first())
- self.assertNotIn('YTDLP_ARGS_EXTRA', crawl.config)
-
- def test_add_staff_admin_custom_config_is_allowed(self):
- """Admin users can override crawl config."""
- self.client.logout()
- User.objects.create_user(
- username='adminuser',
- password='adminpass123',
- email='admin@example.com',
- is_staff=True,
- )
- self.client.login(username='adminuser', password='adminpass123')
-
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- 'config': '{"YTDLP_ARGS_EXTRA":["--exec","echo hello"]}',
- })
-
- self.assertEqual(response.status_code, 302)
- crawl = require(Crawl.objects.order_by('-created_at').first())
- self.assertEqual(crawl.config.get('YTDLP_ARGS_EXTRA'), ['--exec', 'echo hello'])
-
- def test_add_empty_urls_fails(self):
- """Test that submitting without URLs fails validation."""
- response = self.client.post(self.add_url, {
- 'url': '',
- 'depth': '0',
- })
-
- # Should show form again with errors, not redirect
- self.assertEqual(response.status_code, 200)
- self.assertFormError(cast(BaseForm, response.context['form']), 'url', 'This field is required.')
-
- def test_add_invalid_urls_fails(self):
- """Test that invalid URLs fail validation."""
- response = self.client.post(self.add_url, {
- 'url': 'not-a-url',
- 'depth': '0',
- })
-
- # Should show form again with errors
- self.assertEqual(response.status_code, 200)
- # Check for validation error (URL regex should fail)
- self.assertContains(response, 'error')
-
- def test_add_success_message_without_schedule(self):
- """Test that success message is shown without schedule link."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com\nhttps://example.org',
- 'depth': '0',
- }, follow=True)
-
- # Check success message mentions crawl creation
- messages = list(response.context['messages'])
- self.assertEqual(len(messages), 1)
- message_text = str(messages[0])
-
- self.assertIn('Created crawl with 2 starting URL', message_text)
- self.assertIn('View Crawl', message_text)
- self.assertNotIn('scheduled to repeat', message_text)
-
- def test_add_success_message_with_schedule(self):
- """Test that success message includes schedule link."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- 'schedule': 'weekly',
- }, follow=True)
-
- # Check success message mentions schedule
- messages = list(response.context['messages'])
- self.assertEqual(len(messages), 1)
- message_text = str(messages[0])
-
- self.assertIn('Created crawl', message_text)
- self.assertIn('scheduled to repeat weekly', message_text)
- self.assertIn('View Crawl', message_text)
-
- def test_add_crawl_creates_source_file(self):
- """Test that crawl creation saves URLs to sources file."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- })
-
- self.assertEqual(response.status_code, 302)
-
- # Check that source file was created in sources/ directory
- from archivebox.config import CONSTANTS
- sources_dir = CONSTANTS.SOURCES_DIR
-
- # Should have created a source file
- source_files = list(sources_dir.glob('*__web_ui_add_by_user_*.txt'))
- self.assertGreater(len(source_files), 0)
-
- def test_multiple_tags_are_saved(self):
- """Test that multiple comma-separated tags are saved."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- 'tag': 'tag1,tag2,tag3',
- })
-
- self.assertEqual(response.status_code, 302)
-
- crawl = require(Crawl.objects.first())
- self.assertEqual(crawl.tags_str, 'tag1,tag2,tag3')
-
- def test_crawl_redirects_to_admin_change_page(self):
- """Test that successful submission redirects to crawl admin page."""
- response = self.client.post(self.add_url, {
- 'url': 'https://example.com',
- 'depth': '0',
- })
-
- crawl = require(Crawl.objects.first())
- expected_redirect = f'/admin/crawls/crawl/{crawl.id}/change/'
-
- self.assertRedirects(response, expected_redirect, fetch_redirect_response=False)
diff --git a/archivebox/misc/jsonl.py b/archivebox/misc/jsonl.py
index 5a2327dd..07428002 100644
--- a/archivebox/misc/jsonl.py
+++ b/archivebox/misc/jsonl.py
@@ -70,9 +70,16 @@ def parse_line(line: str) -> Optional[Dict[str, Any]]:
if line.startswith('http://') or line.startswith('https://') or line.startswith('file://'):
return {'type': TYPE_SNAPSHOT, 'url': line}
- # Could be a snapshot ID (UUID)
+ # Could be a snapshot ID (UUID with dashes or compact 32-char hex)
if len(line) == 36 and line.count('-') == 4:
return {'type': TYPE_SNAPSHOT, 'id': line}
+ if len(line) == 32:
+ try:
+ int(line, 16)
+ except ValueError:
+ pass
+ else:
+ return {'type': TYPE_SNAPSHOT, 'id': line}
# Unknown format, skip
return None
diff --git a/archivebox/misc/logging_util.py b/archivebox/misc/logging_util.py
index 885aec4d..de1f3566 100644
--- a/archivebox/misc/logging_util.py
+++ b/archivebox/misc/logging_util.py
@@ -607,7 +607,7 @@ def log_worker_event(
# Build final message
error_str = f' {type(error).__name__}: {error}' if error else ''
- from archivebox.misc.logging import CONSOLE
+ from archivebox.misc.logging import CONSOLE, STDERR
from rich.text import Text
# Create a Rich Text object for proper formatting
@@ -632,7 +632,11 @@ def log_worker_event(
if metadata_str:
text.append(f' | {metadata_str}')
- CONSOLE.print(text, soft_wrap=True)
+ # Stdout is reserved for JSONL records whenever commands are piped together.
+ # Route worker/DB progress to stderr in non-TTY contexts so pipelines like
+ # `archivebox snapshot list | archivebox run` keep stdout machine-readable.
+ output_console = CONSOLE if sys.stdout.isatty() else STDERR
+ output_console.print(text, soft_wrap=True)
@enforce_types
diff --git a/archivebox/personas/tests.py b/archivebox/personas/tests.py
deleted file mode 100644
index 49290204..00000000
--- a/archivebox/personas/tests.py
+++ /dev/null
@@ -1,2 +0,0 @@
-
-# Create your tests here.
diff --git a/archivebox/tests/conftest.py b/archivebox/tests/conftest.py
index b8d37bd4..388bf03e 100644
--- a/archivebox/tests/conftest.py
+++ b/archivebox/tests/conftest.py
@@ -3,8 +3,10 @@
import os
import sys
import subprocess
+import tempfile
import textwrap
import time
+import shutil
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
@@ -14,6 +16,9 @@ from archivebox.uuid_compat import uuid7
pytest_plugins = ["archivebox.tests.fixtures"]
+SESSION_DATA_DIR = Path(tempfile.mkdtemp(prefix="archivebox-pytest-session-")).resolve()
+os.environ.setdefault("DATA_DIR", str(SESSION_DATA_DIR))
+
# =============================================================================
# CLI Helpers (defined before fixtures that use them)
@@ -82,6 +87,36 @@ def run_archivebox_cmd(
# Fixtures
# =============================================================================
+@pytest.fixture(autouse=True)
+def isolate_test_runtime(tmp_path):
+ """
+ Run each pytest test from an isolated temp cwd and restore env mutations.
+
+ The maintained pytest suite lives under ``archivebox/tests``. Many of those
+ CLI tests shell out without passing ``cwd=`` explicitly, so the safest
+ contract is that every test starts in its own temp directory and any
+ in-process ``os.environ`` edits are rolled back afterwards.
+
+ We intentionally clear ``DATA_DIR`` for the body of each test so subprocess
+ tests that rely on cwd keep working. During collection/import time we still
+ seed a separate session-scoped temp ``DATA_DIR`` above so any ArchiveBox
+ config imported before this fixture runs never points at the repo root.
+ """
+ original_cwd = Path.cwd()
+ original_env = os.environ.copy()
+ os.chdir(tmp_path)
+ os.environ.pop("DATA_DIR", None)
+ try:
+ yield
+ finally:
+ os.chdir(original_cwd)
+ os.environ.clear()
+ os.environ.update(original_env)
+
+
+def pytest_sessionfinish(session, exitstatus):
+ shutil.rmtree(SESSION_DATA_DIR, ignore_errors=True)
+
@pytest.fixture
def isolated_data_dir(tmp_path):
"""
diff --git a/archivebox/tests/fixtures.py b/archivebox/tests/fixtures.py
index b92d1887..4b73de2a 100644
--- a/archivebox/tests/fixtures.py
+++ b/archivebox/tests/fixtures.py
@@ -7,8 +7,11 @@ import pytest
@pytest.fixture
def process(tmp_path):
- os.chdir(tmp_path)
- process = subprocess.run(['archivebox', 'init'], capture_output=True)
+ process = subprocess.run(
+ ['archivebox', 'init'],
+ capture_output=True,
+ cwd=tmp_path,
+ )
return process
@pytest.fixture
diff --git a/archivebox/api/tests.py b/archivebox/tests/test_api_cli_schedule.py
similarity index 64%
rename from archivebox/api/tests.py
rename to archivebox/tests/test_api_cli_schedule.py
index 785051b4..1495023c 100644
--- a/archivebox/api/tests.py
+++ b/archivebox/tests/test_api_cli_schedule.py
@@ -1,17 +1,12 @@
-import importlib
from io import StringIO
-from archivebox.config.django import setup_django
+from django.contrib.auth import get_user_model
+from django.test import RequestFactory, TestCase
-setup_django()
+from archivebox.api.v1_cli import ScheduleCommandSchema, cli_schedule
+from archivebox.crawls.models import CrawlSchedule
-User = importlib.import_module('django.contrib.auth.models').User
-TestCase = importlib.import_module('django.test').TestCase
-RequestFactory = importlib.import_module('django.test').RequestFactory
-api_v1_cli = importlib.import_module('archivebox.api.v1_cli')
-ScheduleCommandSchema = api_v1_cli.ScheduleCommandSchema
-cli_schedule = api_v1_cli.cli_schedule
-CrawlSchedule = importlib.import_module('archivebox.crawls.models').CrawlSchedule
+User = get_user_model()
class CLIScheduleAPITests(TestCase):
diff --git a/archivebox/tests/test_extract.py b/archivebox/tests/test_cli_extract_input.py
similarity index 98%
rename from archivebox/tests/test_extract.py
rename to archivebox/tests/test_cli_extract_input.py
index 47df599e..b5a49a13 100644
--- a/archivebox/tests/test_extract.py
+++ b/archivebox/tests/test_cli_extract_input.py
@@ -1,13 +1,10 @@
-#!/usr/bin/env python3
-"""Integration tests for archivebox extract command."""
+"""Tests for archivebox extract input handling and pipelines."""
import os
import subprocess
import sqlite3
import json
-import pytest
-
def test_extract_runs_on_snapshot_id(tmp_path, process, disable_extractors_dict):
@@ -271,7 +268,3 @@ class TestExtractCLI:
# Should show warning about no snapshots or exit normally (empty input)
assert result.returncode == 0 or 'No' in result.stderr
-
-
-if __name__ == '__main__':
- pytest.main([__file__, '-v'])
diff --git a/archivebox/tests/test_cli_piping.py b/archivebox/tests/test_cli_piping.py
new file mode 100644
index 00000000..c127dba8
--- /dev/null
+++ b/archivebox/tests/test_cli_piping.py
@@ -0,0 +1,377 @@
+"""
+Tests for JSONL piping contracts and `archivebox run` / `archivebox orchestrator`.
+
+This file covers both:
+- low-level JSONL/stdin parsing behavior that makes CLI piping work
+- subprocess integration for the supported records `archivebox run` consumes
+"""
+
+import sqlite3
+import sys
+import uuid
+from io import StringIO
+from pathlib import Path
+
+from archivebox.tests.conftest import (
+ create_test_url,
+ parse_jsonl_output,
+ run_archivebox_cmd,
+)
+
+
+PIPE_TEST_ENV = {
+ "PLUGINS": "favicon",
+ "SAVE_FAVICON": "True",
+ "USE_COLOR": "False",
+ "SHOW_PROGRESS": "False",
+}
+
+
+class MockTTYStringIO(StringIO):
+ def __init__(self, initial_value: str = "", *, is_tty: bool):
+ super().__init__(initial_value)
+ self._is_tty = is_tty
+
+ def isatty(self) -> bool:
+ return self._is_tty
+
+
+def _stdout_lines(stdout: str) -> list[str]:
+ return [line for line in stdout.splitlines() if line.strip()]
+
+
+def _assert_stdout_is_jsonl_only(stdout: str) -> None:
+ lines = _stdout_lines(stdout)
+ assert lines, "Expected stdout to contain JSONL records"
+ assert all(line.lstrip().startswith("{") for line in lines), stdout
+
+
+def _sqlite_param(value: object) -> object:
+ if not isinstance(value, str):
+ return value
+ try:
+ return uuid.UUID(value).hex
+ except ValueError:
+ return value
+
+
+def _db_value(data_dir: Path, sql: str, params: tuple[object, ...] = ()) -> object | None:
+ conn = sqlite3.connect(data_dir / "index.sqlite3")
+ try:
+ row = conn.execute(sql, tuple(_sqlite_param(param) for param in params)).fetchone()
+ finally:
+ conn.close()
+ return row[0] if row else None
+
+
+def test_parse_line_accepts_supported_piping_inputs():
+ """The JSONL parser should normalize the input forms CLI pipes accept."""
+ from archivebox.misc.jsonl import TYPE_CRAWL, TYPE_SNAPSHOT, parse_line
+
+ assert parse_line("") is None
+ assert parse_line(" ") is None
+ assert parse_line("# comment") is None
+ assert parse_line("not-a-url") is None
+ assert parse_line("ftp://example.com") is None
+
+ plain_url = parse_line("https://example.com")
+ assert plain_url == {"type": TYPE_SNAPSHOT, "url": "https://example.com"}
+
+ file_url = parse_line("file:///tmp/example.txt")
+ assert file_url == {"type": TYPE_SNAPSHOT, "url": "file:///tmp/example.txt"}
+
+ snapshot_json = parse_line('{"type":"Snapshot","url":"https://example.com","tags":"tag1,tag2"}')
+ assert snapshot_json is not None
+ assert snapshot_json["type"] == TYPE_SNAPSHOT
+ assert snapshot_json["tags"] == "tag1,tag2"
+
+ crawl_json = parse_line('{"type":"Crawl","id":"abc123","urls":"https://example.com","max_depth":1}')
+ assert crawl_json is not None
+ assert crawl_json["type"] == TYPE_CRAWL
+ assert crawl_json["id"] == "abc123"
+ assert crawl_json["max_depth"] == 1
+
+ snapshot_id = "01234567-89ab-cdef-0123-456789abcdef"
+ parsed_id = parse_line(snapshot_id)
+ assert parsed_id == {"type": TYPE_SNAPSHOT, "id": snapshot_id}
+
+ compact_snapshot_id = "0123456789abcdef0123456789abcdef"
+ compact_parsed_id = parse_line(compact_snapshot_id)
+ assert compact_parsed_id == {"type": TYPE_SNAPSHOT, "id": compact_snapshot_id}
+
+
+def test_read_args_or_stdin_handles_args_stdin_and_mixed_jsonl():
+ """Piping helpers should consume args, structured JSONL, and pass-through records."""
+ from archivebox.misc.jsonl import TYPE_CRAWL, read_args_or_stdin
+
+ records = list(read_args_or_stdin(("https://example1.com", "https://example2.com")))
+ assert [record["url"] for record in records] == ["https://example1.com", "https://example2.com"]
+
+ stdin_records = list(
+ read_args_or_stdin(
+ (),
+ stream=MockTTYStringIO(
+ 'https://plain-url.com\n'
+ '{"type":"Snapshot","url":"https://jsonl-url.com","tags":"test"}\n'
+ '{"type":"Tag","id":"tag-1","name":"example"}\n'
+ '01234567-89ab-cdef-0123-456789abcdef\n'
+ 'not valid json\n',
+ is_tty=False,
+ ),
+ )
+ )
+ assert len(stdin_records) == 4
+ assert stdin_records[0]["url"] == "https://plain-url.com"
+ assert stdin_records[1]["url"] == "https://jsonl-url.com"
+ assert stdin_records[1]["tags"] == "test"
+ assert stdin_records[2]["type"] == "Tag"
+ assert stdin_records[2]["name"] == "example"
+ assert stdin_records[3]["id"] == "01234567-89ab-cdef-0123-456789abcdef"
+
+ crawl_records = list(
+ read_args_or_stdin(
+ (),
+ stream=MockTTYStringIO(
+ '{"type":"Crawl","id":"crawl-1","urls":"https://example.com\\nhttps://foo.com"}\n',
+ is_tty=False,
+ ),
+ )
+ )
+ assert len(crawl_records) == 1
+ assert crawl_records[0]["type"] == TYPE_CRAWL
+ assert crawl_records[0]["id"] == "crawl-1"
+
+ tty_records = list(read_args_or_stdin((), stream=MockTTYStringIO("https://example.com", is_tty=True)))
+ assert tty_records == []
+
+
+def test_collect_urls_from_plugins_reads_only_parser_outputs(tmp_path):
+ """Parser extractor `urls.jsonl` outputs should be discoverable for recursive piping."""
+ from archivebox.hooks import collect_urls_from_plugins
+
+ (tmp_path / "wget").mkdir()
+ (tmp_path / "wget" / "urls.jsonl").write_text(
+ '{"url":"https://wget-link-1.com"}\n'
+ '{"url":"https://wget-link-2.com"}\n',
+ encoding="utf-8",
+ )
+ (tmp_path / "parse_html_urls").mkdir()
+ (tmp_path / "parse_html_urls" / "urls.jsonl").write_text(
+ '{"url":"https://html-link-1.com"}\n'
+ '{"url":"https://html-link-2.com","title":"HTML Link 2"}\n',
+ encoding="utf-8",
+ )
+ (tmp_path / "screenshot").mkdir()
+
+ urls = collect_urls_from_plugins(tmp_path)
+ assert len(urls) == 4
+ assert {url["plugin"] for url in urls} == {"wget", "parse_html_urls"}
+ titled = [url for url in urls if url.get("title") == "HTML Link 2"]
+ assert len(titled) == 1
+ assert titled[0]["url"] == "https://html-link-2.com"
+
+ assert collect_urls_from_plugins(tmp_path / "nonexistent") == []
+
+
+def test_crawl_create_stdout_pipes_into_run(initialized_archive):
+ """`archivebox crawl create | archivebox run` should queue and materialize snapshots."""
+ url = create_test_url()
+
+ create_stdout, create_stderr, create_code = run_archivebox_cmd(
+ ["crawl", "create", url],
+ data_dir=initialized_archive,
+ )
+ assert create_code == 0, create_stderr
+ _assert_stdout_is_jsonl_only(create_stdout)
+
+ crawl = next(record for record in parse_jsonl_output(create_stdout) if record.get("type") == "Crawl")
+
+ run_stdout, run_stderr, run_code = run_archivebox_cmd(
+ ["run"],
+ stdin=create_stdout,
+ data_dir=initialized_archive,
+ timeout=120,
+ env=PIPE_TEST_ENV,
+ )
+ assert run_code == 0, run_stderr
+ _assert_stdout_is_jsonl_only(run_stdout)
+
+ run_records = parse_jsonl_output(run_stdout)
+ assert any(record.get("type") == "Crawl" and record.get("id") == crawl["id"] for record in run_records)
+
+ snapshot_count = _db_value(
+ initialized_archive,
+ "SELECT COUNT(*) FROM core_snapshot WHERE crawl_id = ?",
+ (crawl["id"],),
+ )
+ assert isinstance(snapshot_count, int)
+ assert snapshot_count >= 1
+
+
+def test_snapshot_list_stdout_pipes_into_run(initialized_archive):
+ """`archivebox snapshot list | archivebox run` should requeue listed snapshots."""
+ url = create_test_url()
+
+ create_stdout, create_stderr, create_code = run_archivebox_cmd(
+ ["snapshot", "create", url],
+ data_dir=initialized_archive,
+ )
+ assert create_code == 0, create_stderr
+ snapshot = next(record for record in parse_jsonl_output(create_stdout) if record.get("type") == "Snapshot")
+
+ list_stdout, list_stderr, list_code = run_archivebox_cmd(
+ ["snapshot", "list", "--status=queued", f"--url__icontains={snapshot['id']}"],
+ data_dir=initialized_archive,
+ )
+ if list_code != 0 or not parse_jsonl_output(list_stdout):
+ list_stdout, list_stderr, list_code = run_archivebox_cmd(
+ ["snapshot", "list", f"--url__icontains={url}"],
+ data_dir=initialized_archive,
+ )
+ assert list_code == 0, list_stderr
+ _assert_stdout_is_jsonl_only(list_stdout)
+
+ run_stdout, run_stderr, run_code = run_archivebox_cmd(
+ ["run"],
+ stdin=list_stdout,
+ data_dir=initialized_archive,
+ timeout=120,
+ env=PIPE_TEST_ENV,
+ )
+ assert run_code == 0, run_stderr
+ _assert_stdout_is_jsonl_only(run_stdout)
+
+ run_records = parse_jsonl_output(run_stdout)
+ assert any(record.get("type") == "Snapshot" and record.get("id") == snapshot["id"] for record in run_records)
+
+ snapshot_status = _db_value(
+ initialized_archive,
+ "SELECT status FROM core_snapshot WHERE id = ?",
+ (snapshot["id"],),
+ )
+ assert snapshot_status == "sealed"
+
+
+def test_archiveresult_list_stdout_pipes_into_orchestrator_alias(initialized_archive):
+ """`archivebox archiveresult list | archivebox orchestrator` should preserve clean JSONL stdout."""
+ url = create_test_url()
+
+ snapshot_stdout, snapshot_stderr, snapshot_code = run_archivebox_cmd(
+ ["snapshot", "create", url],
+ data_dir=initialized_archive,
+ )
+ assert snapshot_code == 0, snapshot_stderr
+
+ ar_create_stdout, ar_create_stderr, ar_create_code = run_archivebox_cmd(
+ ["archiveresult", "create", "--plugin=favicon"],
+ stdin=snapshot_stdout,
+ data_dir=initialized_archive,
+ )
+ assert ar_create_code == 0, ar_create_stderr
+
+ created_records = parse_jsonl_output(ar_create_stdout)
+ archiveresult = next(record for record in created_records if record.get("type") == "ArchiveResult")
+
+ list_stdout, list_stderr, list_code = run_archivebox_cmd(
+ ["archiveresult", "list", "--plugin=favicon"],
+ data_dir=initialized_archive,
+ )
+ assert list_code == 0, list_stderr
+ _assert_stdout_is_jsonl_only(list_stdout)
+
+ orchestrator_stdout, orchestrator_stderr, orchestrator_code = run_archivebox_cmd(
+ ["orchestrator"],
+ stdin=list_stdout,
+ data_dir=initialized_archive,
+ timeout=120,
+ env=PIPE_TEST_ENV,
+ )
+ assert orchestrator_code == 0, orchestrator_stderr
+ _assert_stdout_is_jsonl_only(orchestrator_stdout)
+ assert "renamed to `archivebox run`" in orchestrator_stderr
+
+ run_records = parse_jsonl_output(orchestrator_stdout)
+ assert any(
+ record.get("type") == "ArchiveResult" and record.get("id") == archiveresult["id"]
+ for record in run_records
+ )
+
+
+def test_binary_create_stdout_pipes_into_run(initialized_archive):
+ """`archivebox binary create | archivebox run` should queue the binary record for processing."""
+ create_stdout, create_stderr, create_code = run_archivebox_cmd(
+ ["binary", "create", "--name=python3", f"--abspath={sys.executable}", "--version=test"],
+ data_dir=initialized_archive,
+ )
+ assert create_code == 0, create_stderr
+ _assert_stdout_is_jsonl_only(create_stdout)
+
+ binary = next(record for record in parse_jsonl_output(create_stdout) if record.get("type") == "Binary")
+
+ run_stdout, run_stderr, run_code = run_archivebox_cmd(
+ ["run"],
+ stdin=create_stdout,
+ data_dir=initialized_archive,
+ timeout=120,
+ )
+ assert run_code == 0, run_stderr
+ _assert_stdout_is_jsonl_only(run_stdout)
+
+ run_records = parse_jsonl_output(run_stdout)
+ assert any(record.get("type") == "Binary" and record.get("id") == binary["id"] for record in run_records)
+
+ status = _db_value(
+ initialized_archive,
+ "SELECT status FROM machine_binary WHERE id = ?",
+ (binary["id"],),
+ )
+ assert status in {"queued", "installed"}
+
+
+def test_multi_stage_pipeline_into_run(initialized_archive):
+ """`crawl create | snapshot create | archiveresult create | run` should preserve JSONL and finish work."""
+ url = create_test_url()
+
+ crawl_stdout, crawl_stderr, crawl_code = run_archivebox_cmd(
+ ["crawl", "create", url],
+ data_dir=initialized_archive,
+ )
+ assert crawl_code == 0, crawl_stderr
+ _assert_stdout_is_jsonl_only(crawl_stdout)
+
+ snapshot_stdout, snapshot_stderr, snapshot_code = run_archivebox_cmd(
+ ["snapshot", "create"],
+ stdin=crawl_stdout,
+ data_dir=initialized_archive,
+ )
+ assert snapshot_code == 0, snapshot_stderr
+ _assert_stdout_is_jsonl_only(snapshot_stdout)
+
+ archiveresult_stdout, archiveresult_stderr, archiveresult_code = run_archivebox_cmd(
+ ["archiveresult", "create", "--plugin=favicon"],
+ stdin=snapshot_stdout,
+ data_dir=initialized_archive,
+ )
+ assert archiveresult_code == 0, archiveresult_stderr
+ _assert_stdout_is_jsonl_only(archiveresult_stdout)
+
+ run_stdout, run_stderr, run_code = run_archivebox_cmd(
+ ["run"],
+ stdin=archiveresult_stdout,
+ data_dir=initialized_archive,
+ timeout=120,
+ env=PIPE_TEST_ENV,
+ )
+ assert run_code == 0, run_stderr
+ _assert_stdout_is_jsonl_only(run_stdout)
+
+ run_records = parse_jsonl_output(run_stdout)
+ snapshot = next(record for record in run_records if record.get("type") == "Snapshot")
+ assert any(record.get("type") == "ArchiveResult" for record in run_records)
+
+ snapshot_status = _db_value(
+ initialized_archive,
+ "SELECT status FROM core_snapshot WHERE id = ?",
+ (snapshot["id"],),
+ )
+ assert snapshot_status == "sealed"
diff --git a/archivebox/tests/test_extractors.py b/archivebox/tests/test_extractors.py
deleted file mode 100644
index 6e2eb521..00000000
--- a/archivebox/tests/test_extractors.py
+++ /dev/null
@@ -1,156 +0,0 @@
-import json as pyjson
-import sqlite3
-import subprocess
-from pathlib import Path
-
-from .fixtures import disable_extractors_dict, process
-
-FIXTURES = (disable_extractors_dict, process)
-
-
-def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
- candidates = {snapshot_id}
- if len(snapshot_id) == 32:
- candidates.add(f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}")
- elif len(snapshot_id) == 36 and "-" in snapshot_id:
- candidates.add(snapshot_id.replace("-", ""))
-
- for needle in candidates:
- for path in data_dir.rglob(needle):
- if path.is_dir():
- return path
- return None
-
-
-def _latest_snapshot_dir(data_dir: Path) -> Path:
- conn = sqlite3.connect(data_dir / "index.sqlite3")
- try:
- snapshot_id = conn.execute(
- "SELECT id FROM core_snapshot ORDER BY created_at DESC LIMIT 1"
- ).fetchone()
- finally:
- conn.close()
-
- assert snapshot_id is not None, "Expected a snapshot to be created"
- snapshot_dir = _find_snapshot_dir(data_dir, str(snapshot_id[0]))
- assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id[0]}"
- return snapshot_dir
-
-
-def _latest_plugin_result(data_dir: Path, plugin: str) -> tuple[str, str, dict]:
- conn = sqlite3.connect(data_dir / "index.sqlite3")
- try:
- row = conn.execute(
- "SELECT snapshot_id, status, output_files FROM core_archiveresult "
- "WHERE plugin = ? ORDER BY created_at DESC LIMIT 1",
- (plugin,),
- ).fetchone()
- finally:
- conn.close()
-
- assert row is not None, f"Expected an ArchiveResult row for plugin={plugin}"
- output_files = row[2]
- if isinstance(output_files, str):
- output_files = pyjson.loads(output_files or "{}")
- output_files = output_files or {}
- return str(row[0]), str(row[1]), output_files
-
-
-def _plugin_output_paths(data_dir: Path, plugin: str) -> list[Path]:
- snapshot_id, status, output_files = _latest_plugin_result(data_dir, plugin)
- assert status == "succeeded", f"Expected {plugin} ArchiveResult to succeed, got {status}"
- assert output_files, f"Expected {plugin} ArchiveResult to record output_files"
-
- snapshot_dir = _find_snapshot_dir(data_dir, snapshot_id)
- assert snapshot_dir is not None, f"Snapshot output directory not found for {snapshot_id}"
-
- plugin_dir = snapshot_dir / plugin
- output_paths = [plugin_dir / rel_path for rel_path in output_files.keys()]
- missing_paths = [path for path in output_paths if not path.exists()]
- assert not missing_paths, f"Expected plugin outputs to exist on disk, missing: {missing_paths}"
- return output_paths
-
-
-def _archivebox_env(base_env: dict, data_dir: Path) -> dict:
- env = base_env.copy()
- tmp_dir = Path("/tmp") / f"abx-{data_dir.name}"
- tmp_dir.mkdir(parents=True, exist_ok=True)
- env["TMP_DIR"] = str(tmp_dir)
- env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
- return env
-
-
-def test_singlefile_works(tmp_path, process, disable_extractors_dict):
- data_dir = Path.cwd()
- env = _archivebox_env(disable_extractors_dict, data_dir)
- env.update({"SAVE_SINGLEFILE": "true"})
- add_process = subprocess.run(
- ['archivebox', 'add', '--plugins=singlefile', 'https://example.com'],
- capture_output=True,
- text=True,
- env=env,
- timeout=900,
- )
- assert add_process.returncode == 0, add_process.stderr
- output_files = _plugin_output_paths(data_dir, "singlefile")
- assert any(path.suffix in (".html", ".htm") for path in output_files)
-
-def test_readability_works(tmp_path, process, disable_extractors_dict):
- data_dir = Path.cwd()
- env = _archivebox_env(disable_extractors_dict, data_dir)
- env.update({"SAVE_SINGLEFILE": "true", "SAVE_READABILITY": "true"})
- add_process = subprocess.run(
- ['archivebox', 'add', '--plugins=singlefile,readability', 'https://example.com'],
- capture_output=True,
- text=True,
- env=env,
- timeout=900,
- )
- assert add_process.returncode == 0, add_process.stderr
- output_files = _plugin_output_paths(data_dir, "readability")
- assert any(path.suffix in (".html", ".htm") for path in output_files)
-
-def test_htmltotext_works(tmp_path, process, disable_extractors_dict):
- data_dir = Path.cwd()
- env = _archivebox_env(disable_extractors_dict, data_dir)
- env.update({"SAVE_WGET": "true", "SAVE_HTMLTOTEXT": "true"})
- add_process = subprocess.run(
- ['archivebox', 'add', '--plugins=wget,htmltotext', 'https://example.com'],
- capture_output=True,
- text=True,
- env=env,
- timeout=900,
- )
- assert add_process.returncode == 0, add_process.stderr
- output_files = _plugin_output_paths(data_dir, "htmltotext")
- assert any(path.suffix == ".txt" for path in output_files)
-
-def test_use_node_false_disables_readability_and_singlefile(tmp_path, process, disable_extractors_dict):
- env = _archivebox_env(disable_extractors_dict, Path.cwd())
- env.update({"SAVE_READABILITY": "true", "SAVE_DOM": "true", "SAVE_SINGLEFILE": "true", "USE_NODE": "false"})
- add_process = subprocess.run(['archivebox', 'add', '--plugins=readability,dom,singlefile', 'https://example.com'],
- capture_output=True, env=env)
- output_str = add_process.stdout.decode("utf-8")
- assert "> singlefile" not in output_str
- assert "> readability" not in output_str
-
-def test_headers_retrieved(tmp_path, process, disable_extractors_dict):
- data_dir = Path.cwd()
- env = _archivebox_env(disable_extractors_dict, data_dir)
- env.update({"SAVE_HEADERS": "true"})
- add_process = subprocess.run(
- ['archivebox', 'add', '--plugins=headers', 'https://example.com'],
- capture_output=True,
- text=True,
- env=env,
- timeout=900,
- )
- assert add_process.returncode == 0, add_process.stderr
- output_files = _plugin_output_paths(data_dir, "headers")
- output_file = next((path for path in output_files if path.suffix == ".json"), None)
- assert output_file is not None, f"Expected headers output_files to include a JSON file, got: {output_files}"
- with open(output_file, 'r', encoding='utf-8') as f:
- headers = pyjson.load(f)
- response_headers = headers.get("response_headers") or headers.get("headers") or {}
- assert isinstance(response_headers, dict), f"Expected response_headers dict, got: {response_headers!r}"
- assert 'Content-Type' in response_headers or 'content-type' in response_headers
diff --git a/archivebox/machine/tests/test_machine_models.py b/archivebox/tests/test_machine_models.py
similarity index 100%
rename from archivebox/machine/tests/test_machine_models.py
rename to archivebox/tests/test_machine_models.py
diff --git a/archivebox/workers/tests/test_orchestrator.py b/archivebox/tests/test_orchestrator.py
similarity index 100%
rename from archivebox/workers/tests/test_orchestrator.py
rename to archivebox/tests/test_orchestrator.py
diff --git a/archivebox/tests/test_savepagenow.py b/archivebox/tests/test_savepagenow.py
index fd2f9630..09504b86 100644
--- a/archivebox/tests/test_savepagenow.py
+++ b/archivebox/tests/test_savepagenow.py
@@ -13,7 +13,6 @@ ADMIN_HOST = 'admin.archivebox.localhost:8000'
def _run_savepagenow_script(initialized_archive: Path, request_url: str, expected_url: str, *, login: bool, public_add_view: bool, host: str):
- project_root = Path(__file__).resolve().parents[2]
script = textwrap.dedent(
f"""
import os
@@ -81,7 +80,7 @@ def _run_savepagenow_script(initialized_archive: Path, request_url: str, expecte
return subprocess.run(
[sys.executable, '-c', script],
- cwd=project_root,
+ cwd=initialized_archive,
env=env,
text=True,
capture_output=True,
@@ -90,7 +89,6 @@ def _run_savepagenow_script(initialized_archive: Path, request_url: str, expecte
def _run_savepagenow_not_found_script(initialized_archive: Path, request_url: str):
- project_root = Path(__file__).resolve().parents[2]
script = textwrap.dedent(
f"""
import os
@@ -137,7 +135,7 @@ def _run_savepagenow_not_found_script(initialized_archive: Path, request_url: st
return subprocess.run(
[sys.executable, '-c', script],
- cwd=project_root,
+ cwd=initialized_archive,
env=env,
text=True,
capture_output=True,
@@ -146,7 +144,6 @@ def _run_savepagenow_not_found_script(initialized_archive: Path, request_url: st
def _run_savepagenow_existing_snapshot_script(initialized_archive: Path, request_url: str, stored_url: str):
- project_root = Path(__file__).resolve().parents[2]
script = textwrap.dedent(
f"""
import os
@@ -199,7 +196,7 @@ def _run_savepagenow_existing_snapshot_script(initialized_archive: Path, request
return subprocess.run(
[sys.executable, '-c', script],
- cwd=project_root,
+ cwd=initialized_archive,
env=env,
text=True,
capture_output=True,
diff --git a/archivebox/workers/tests/test_scheduled_crawls.py b/archivebox/tests/test_scheduled_crawls.py
similarity index 100%
rename from archivebox/workers/tests/test_scheduled_crawls.py
rename to archivebox/tests/test_scheduled_crawls.py
diff --git a/archivebox/workers/tests/test_snapshot_worker.py b/archivebox/tests/test_snapshot_worker.py
similarity index 100%
rename from archivebox/workers/tests/test_snapshot_worker.py
rename to archivebox/tests/test_snapshot_worker.py
diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py
index f0ea7b96..9813f9e4 100644
--- a/archivebox/workers/orchestrator.py
+++ b/archivebox/workers/orchestrator.py
@@ -29,6 +29,7 @@ Usage:
__package__ = 'archivebox.workers'
import os
+import sys
import time
from typing import Type
from datetime import datetime, timedelta
@@ -258,9 +259,7 @@ class Orchestrator:
def spawn_worker(self, WorkerClass: Type[Worker]) -> int | None:
"""Spawn a new worker process. Returns PID or None if spawn failed."""
try:
- print(f'[yellow]DEBUG: Spawning {WorkerClass.name} worker with crawl_id={self.crawl_id}...[/yellow]')
pid = WorkerClass.start(parent=self.db_process, crawl_id=self.crawl_id)
- print(f'[yellow]DEBUG: Spawned {WorkerClass.name} worker with PID={pid}[/yellow]')
# CRITICAL: Block until worker registers itself in Process table
# This prevents race condition where orchestrator spawns multiple workers
@@ -281,17 +280,6 @@ class Orchestrator:
# 4. Parent is this orchestrator
# 5. Started recently (within last 10 seconds)
- # Debug: Check all processes with this PID first
- if elapsed < 0.5:
- all_procs = list(Process.objects.filter(pid=pid))
- print(f'[yellow]DEBUG spawn_worker: elapsed={elapsed:.1f}s pid={pid} orchestrator_id={self.db_process.id}[/yellow]')
- print(f'[yellow] Found {len(all_procs)} Process records for pid={pid}[/yellow]')
- for p in all_procs:
- print(
- f'[yellow] -> type={p.process_type} status={p.status} '
- f'parent_id={p.parent_id} match={p.parent_id == self.db_process.id}[/yellow]'
- )
-
worker_process = Process.objects.filter(
pid=pid,
process_type=Process.TypeChoices.WORKER,
@@ -302,7 +290,6 @@ class Orchestrator:
if worker_process:
# Worker successfully registered!
- print(f'[green]DEBUG spawn_worker: Worker registered! Returning pid={pid}[/green]')
return pid
time.sleep(poll_interval)
@@ -653,14 +640,15 @@ class Orchestrator:
def runloop(self) -> None:
"""Main orchestrator loop."""
from rich.live import Live
- from archivebox.misc.logging import IS_TTY
from archivebox.misc.progress_layout import ArchiveBoxProgressLayout
- import sys
import os
+ is_tty = sys.stdout.isatty()
# Enable progress layout only in TTY + foreground mode
- show_progress = IS_TTY and self.exit_on_idle
- plain_output = not IS_TTY
+ show_progress = is_tty and self.exit_on_idle
+ # When stdout is not a TTY, it may be reserved for JSONL pipeline output.
+ # Keep the plain progress view, but emit it to stderr instead of stdout.
+ plain_output = not is_tty
self.on_startup()
if not show_progress:
@@ -1241,7 +1229,7 @@ class Orchestrator:
ts = timezone.now().strftime("%Y-%m-%d %H:%M:%S")
for panel, line in new_lines:
if line:
- print(f"[{ts}] [{panel}] {line}")
+ print(f"[{ts}] [{panel}] {line}", file=sys.stderr)
last_plain_lines = set(plain_lines)
# Track idle state
@@ -1271,7 +1259,7 @@ class Orchestrator:
except KeyboardInterrupt:
if progress_layout:
progress_layout.log_event("Interrupted by user", style="red")
- print() # Newline after ^C
+ print(file=sys.stderr) # Newline after ^C
self.on_shutdown(error=KeyboardInterrupt())
except BaseException as e:
if progress_layout:
@@ -1310,7 +1298,7 @@ class Orchestrator:
Used by commands like 'add' to ensure orchestrator is running.
"""
if cls.is_running():
- print('[grey53]👨✈️ Orchestrator already running[/grey53]')
+ print('[grey53]👨✈️ Orchestrator already running[/grey53]', file=sys.stderr)
# Return a placeholder - actual orchestrator is in another process
return cls(exit_on_idle=exit_on_idle)
diff --git a/pyproject.toml b/pyproject.toml
index 856e5656..d654672c 100755
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -176,7 +176,7 @@ package-dir = {"archivebox" = "archivebox"}
line-length = 140
target-version = "py313"
src = ["archivebox"]
-exclude = ["*.pyi", "typings/", "migrations/"]
+exclude = ["*.pyi", "typings/", "migrations/", "archivebox/tests/data/"]
# https://docs.astral.sh/ruff/rules/
[tool.ruff.lint]
@@ -184,6 +184,7 @@ ignore = ["E731", "E303", "E266", "E241", "E222"]
[tool.pytest.ini_options]
testpaths = [ "archivebox/tests" ]
+norecursedirs = ["archivebox/tests/data"]
DJANGO_SETTINGS_MODULE = "archivebox.core.settings"
# Note: Plugin tests under abx_plugins/plugins/ must NOT load Django
# They use a conftest.py to disable Django automatically
@@ -254,6 +255,8 @@ exclude = [
"**/node_modules",
"**/__pycache__",
"**/migrations",
+ "archivebox/tests/data",
+ "archivebox/tests/data/**",
]
stubPath = "./typings"
venvPath = "."
@@ -267,7 +270,7 @@ pythonPlatform = "Linux"
[tool.ty]
environment = { python-version = "3.13", python-platform = "linux" }
-src = { include = ["archivebox"], exclude = [".venv", "**/*.pyi", "**/__init__.pyi", "**/node_modules", "**/__pycache__", "**/migrations"] }
+src = { include = ["archivebox"], exclude = [".venv", "**/*.pyi", "**/__init__.pyi", "**/node_modules", "**/__pycache__", "**/migrations", "archivebox/tests/data", "archivebox/tests/data/**"] }
[project.scripts]