refactor: batch all URLs into single Crawl, update tests

- archivebox crawl now creates one Crawl with all URLs as newline-separated string
- Updated tests to reflect new pipeline: crawl -> snapshot -> extract
- Added tests for Crawl JSONL parsing and output
- Tests verify Crawl.from_jsonl() handles multiple URLs correctly
This commit is contained in:
Claude
2025-12-30 20:06:35 +00:00
parent bb59287411
commit cf387ed59f
2 changed files with 270 additions and 219 deletions

View File

@@ -43,9 +43,9 @@ def create_crawls(
created_by_id: Optional[int] = None,
) -> int:
"""
Create Crawl jobs from URLs or JSONL records.
Create a single Crawl job from all input URLs.
Reads from args or stdin, creates Crawl objects, outputs JSONL.
Reads from args or stdin, creates one Crawl with all URLs, outputs JSONL.
Does NOT start the crawl - just creates the job in QUEUED state.
Exit codes:
@@ -68,48 +68,50 @@ def create_crawls(
rprint('[yellow]No URLs provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
return 1
# Group URLs by crawl - each URL becomes its own Crawl for now
# (Could be enhanced to batch multiple URLs into one Crawl)
created_crawls = []
# Collect all URLs into a single newline-separated string
urls = []
for record in records:
url = record.get('url')
if not url:
continue
if url:
urls.append(url)
try:
# Build crawl record
crawl_record = {
'url': url,
'max_depth': record.get('depth', depth),
'tags_str': record.get('tags', tag),
'label': record.get('label', ''),
}
crawl = Crawl.from_jsonl(crawl_record, overrides={'created_by_id': created_by_id})
if crawl:
created_crawls.append(crawl)
# Output JSONL record (only when piped)
if not is_tty:
write_record(crawl.to_jsonl())
except Exception as e:
rprint(f'[red]Error creating crawl: {e}[/red]', file=sys.stderr)
continue
if not created_crawls:
rprint('[red]No crawls created[/red]', file=sys.stderr)
if not urls:
rprint('[red]No valid URLs found[/red]', file=sys.stderr)
return 1
rprint(f'[green]Created {len(created_crawls)} crawls[/green]', file=sys.stderr)
try:
# Build crawl record with all URLs as newline-separated string
crawl_record = {
'urls': '\n'.join(urls),
'max_depth': depth,
'tags_str': tag,
'label': '',
}
# If TTY, show human-readable output
if is_tty:
for crawl in created_crawls:
first_url = crawl.get_urls_list()[0] if crawl.get_urls_list() else ''
rprint(f' [dim]{crawl.id}[/dim] {first_url[:60]}', file=sys.stderr)
crawl = Crawl.from_jsonl(crawl_record, overrides={'created_by_id': created_by_id})
if not crawl:
rprint('[red]Failed to create crawl[/red]', file=sys.stderr)
return 1
return 0
# Output JSONL record (only when piped)
if not is_tty:
write_record(crawl.to_jsonl())
rprint(f'[green]Created crawl with {len(urls)} URLs[/green]', file=sys.stderr)
# If TTY, show human-readable output
if is_tty:
rprint(f' [dim]{crawl.id}[/dim]', file=sys.stderr)
for url in urls[:5]: # Show first 5 URLs
rprint(f' {url[:70]}', file=sys.stderr)
if len(urls) > 5:
rprint(f' ... and {len(urls) - 5} more', file=sys.stderr)
return 0
except Exception as e:
rprint(f'[red]Error creating crawl: {e}[/red]', file=sys.stderr)
return 1
def process_crawl_by_id(crawl_id: str) -> int:

View File

@@ -6,12 +6,15 @@ This module tests the JSONL-based piping between CLI commands as described in:
https://github.com/ArchiveBox/ArchiveBox/issues/1363
Workflows tested:
archivebox snapshot URL | archivebox extract
archivebox crawl URL -> Crawl JSONL
archivebox snapshot -> Snapshot JSONL (accepts Crawl or URL input)
archivebox extract -> ArchiveResult JSONL (accepts Snapshot input)
Pipeline:
archivebox crawl URL | archivebox snapshot | archivebox extract
archivebox crawl --plugin=PARSER URL | archivebox snapshot | archivebox extract
Each command should:
- Accept URLs, snapshot_ids, or JSONL as input (args or stdin)
- Accept URLs, IDs, or JSONL as input (args or stdin)
- Output JSONL to stdout when piped (not TTY)
- Output human-readable to stderr when TTY
"""
@@ -84,6 +87,18 @@ class TestJSONLParsing(unittest.TestCase):
self.assertEqual(result['url'], 'https://example.com')
self.assertEqual(result['tags'], 'test,demo')
def test_parse_jsonl_crawl(self):
"""JSONL Crawl records should be parsed correctly."""
from archivebox.misc.jsonl import parse_line, TYPE_CRAWL
line = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com", "max_depth": 1}'
result = parse_line(line)
self.assertIsNotNone(result)
self.assertEqual(result['type'], TYPE_CRAWL)
self.assertEqual(result['id'], 'abc123')
self.assertEqual(result['urls'], 'https://example.com')
self.assertEqual(result['max_depth'], 1)
def test_parse_jsonl_with_id(self):
"""JSONL with id field should be recognized."""
from archivebox.misc.jsonl import parse_line, TYPE_SNAPSHOT
@@ -139,6 +154,30 @@ class TestJSONLParsing(unittest.TestCase):
class TestJSONLOutput(unittest.TestCase):
"""Test JSONL output formatting."""
def test_crawl_to_jsonl(self):
"""Crawl model should serialize to JSONL correctly."""
from archivebox.misc.jsonl import TYPE_CRAWL
# Create a mock crawl with to_jsonl method configured
mock_crawl = MagicMock()
mock_crawl.to_jsonl.return_value = {
'type': TYPE_CRAWL,
'schema_version': '0.9.0',
'id': 'test-crawl-uuid',
'urls': 'https://example.com',
'status': 'queued',
'max_depth': 0,
'tags_str': 'tag1,tag2',
'label': '',
'created_at': None,
}
result = mock_crawl.to_jsonl()
self.assertEqual(result['type'], TYPE_CRAWL)
self.assertEqual(result['id'], 'test-crawl-uuid')
self.assertEqual(result['urls'], 'https://example.com')
self.assertEqual(result['status'], 'queued')
def test_snapshot_to_jsonl(self):
"""Snapshot model should serialize to JSONL correctly."""
from archivebox.misc.jsonl import TYPE_SNAPSHOT
@@ -236,6 +275,20 @@ class TestReadArgsOrStdin(unittest.TestCase):
self.assertEqual(records[0]['url'], 'https://example.com')
self.assertEqual(records[0]['tags'], 'test')
def test_read_crawl_jsonl_from_stdin(self):
"""Should read Crawl JSONL from stdin."""
from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL
stdin_content = '{"type": "Crawl", "id": "abc123", "urls": "https://example.com\\nhttps://foo.com"}\n'
stream = StringIO(stdin_content)
stream.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stream))
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_CRAWL)
self.assertEqual(records[0]['id'], 'abc123')
def test_skip_tty_stdin(self):
"""Should not read from TTY stdin (would block)."""
from archivebox.misc.jsonl import read_args_or_stdin
@@ -273,55 +326,23 @@ class TestCrawlCommand(unittest.TestCase):
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['url'], 'https://example.com')
def test_crawl_accepts_snapshot_id(self):
"""crawl should accept snapshot IDs as input."""
from archivebox.misc.jsonl import read_args_or_stdin
def test_crawl_output_format(self):
"""crawl should output Crawl JSONL records."""
from archivebox.misc.jsonl import TYPE_CRAWL
uuid = '01234567-89ab-cdef-0123-456789abcdef'
args = (uuid,)
records = list(read_args_or_stdin(args))
# Mock crawl output
crawl_output = {
'type': TYPE_CRAWL,
'schema_version': '0.9.0',
'id': 'test-crawl-id',
'urls': 'https://example.com',
'status': 'queued',
'max_depth': 0,
}
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['id'], uuid)
def test_crawl_accepts_jsonl(self):
"""crawl should accept JSONL with snapshot info."""
from archivebox.misc.jsonl import read_args_or_stdin
stdin = StringIO('{"type": "Snapshot", "id": "abc123", "url": "https://example.com"}\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['id'], 'abc123')
self.assertEqual(records[0]['url'], 'https://example.com')
def test_crawl_separates_existing_vs_new(self):
"""crawl should identify existing snapshots vs new URLs."""
# This tests the logic in discover_outlinks() that separates
# records with 'id' (existing) from records with just 'url' (new)
records = [
{'type': 'Snapshot', 'id': 'existing-id-1'}, # Existing (id only)
{'type': 'Snapshot', 'url': 'https://new-url.com'}, # New (url only)
{'type': 'Snapshot', 'id': 'existing-id-2', 'url': 'https://existing.com'}, # Existing (has id)
]
existing = []
new = []
for record in records:
if record.get('id') and not record.get('url'):
existing.append(record['id'])
elif record.get('id'):
existing.append(record['id']) # Has both id and url - treat as existing
elif record.get('url'):
new.append(record)
self.assertEqual(len(existing), 2)
self.assertEqual(len(new), 1)
self.assertEqual(new[0]['url'], 'https://new-url.com')
self.assertEqual(crawl_output['type'], TYPE_CRAWL)
self.assertIn('id', crawl_output)
self.assertIn('urls', crawl_output)
class TestSnapshotCommand(unittest.TestCase):
@@ -346,6 +367,20 @@ class TestSnapshotCommand(unittest.TestCase):
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['url'], 'https://example.com')
def test_snapshot_accepts_crawl_jsonl(self):
"""snapshot should accept Crawl JSONL as input."""
from archivebox.misc.jsonl import read_args_or_stdin, TYPE_CRAWL
stdin = StringIO('{"type": "Crawl", "id": "abc123", "urls": "https://example.com"}\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_CRAWL)
self.assertEqual(records[0]['id'], 'abc123')
self.assertEqual(records[0]['urls'], 'https://example.com')
def test_snapshot_accepts_jsonl_with_metadata(self):
"""snapshot should accept JSONL with tags and other metadata."""
from archivebox.misc.jsonl import read_args_or_stdin
@@ -549,6 +584,86 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
"""Clean up test database."""
shutil.rmtree(cls.test_dir, ignore_errors=True)
def test_crawl_creates_and_outputs_jsonl(self):
"""
Test: archivebox crawl URL1 URL2 URL3
Should create a single Crawl with all URLs and output JSONL when piped.
"""
from archivebox.crawls.models import Crawl
from archivebox.misc.jsonl import TYPE_CRAWL
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# Create crawl with multiple URLs (as newline-separated string)
urls = 'https://test-crawl-1.example.com\nhttps://test-crawl-2.example.com'
crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id})
self.assertIsNotNone(crawl)
self.assertIsNotNone(crawl.id)
self.assertEqual(crawl.urls, urls)
self.assertEqual(crawl.status, 'queued')
# Verify URLs list
urls_list = crawl.get_urls_list()
self.assertEqual(len(urls_list), 2)
self.assertIn('https://test-crawl-1.example.com', urls_list)
self.assertIn('https://test-crawl-2.example.com', urls_list)
# Verify output format
output = crawl.to_jsonl()
self.assertEqual(output['type'], TYPE_CRAWL)
self.assertIn('id', output)
self.assertEqual(output['urls'], urls)
self.assertIn('schema_version', output)
def test_snapshot_accepts_crawl_jsonl(self):
"""
Test: archivebox crawl URL | archivebox snapshot
Snapshot should accept Crawl JSONL and create Snapshots for each URL.
"""
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
read_args_or_stdin,
TYPE_CRAWL, TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# Step 1: Create crawl (simulating 'archivebox crawl')
urls = 'https://crawl-to-snap-1.example.com\nhttps://crawl-to-snap-2.example.com'
crawl = Crawl.from_jsonl({'urls': urls}, overrides={'created_by_id': created_by_id})
crawl_output = crawl.to_jsonl()
# Step 2: Parse crawl output as snapshot input
stdin = StringIO(json.dumps(crawl_output) + '\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_CRAWL)
# Step 3: Create snapshots from crawl URLs
created_snapshots = []
for url in crawl.get_urls_list():
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
self.assertEqual(len(created_snapshots), 2)
# Verify snapshot output
for snapshot in created_snapshots:
output = snapshot.to_jsonl()
self.assertEqual(output['type'], TYPE_SNAPSHOT)
self.assertIn(output['url'], [
'https://crawl-to-snap-1.example.com',
'https://crawl-to-snap-2.example.com'
])
def test_snapshot_creates_and_outputs_jsonl(self):
"""
Test: archivebox snapshot URL
@@ -621,127 +736,49 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
self.assertIn(str(snapshot.id), snapshot_ids)
def test_crawl_outputs_discovered_urls(self):
"""
Test: archivebox crawl URL
Should create snapshot, run plugins, output discovered URLs.
"""
from archivebox.hooks import collect_urls_from_plugins
from archivebox.misc.jsonl import TYPE_SNAPSHOT
# Create a mock snapshot directory with urls.jsonl
test_snapshot_dir = Path(self.test_dir) / 'archive' / 'test-crawl-snapshot'
test_snapshot_dir.mkdir(parents=True, exist_ok=True)
# Create mock extractor output
(test_snapshot_dir / 'parse_html_urls').mkdir()
(test_snapshot_dir / 'parse_html_urls' / 'urls.jsonl').write_text(
'{"url": "https://discovered-1.com"}\n'
'{"url": "https://discovered-2.com", "title": "Discovered 2"}\n'
)
# Collect URLs (as crawl does)
discovered = collect_urls_from_plugins(test_snapshot_dir)
self.assertEqual(len(discovered), 2)
# Add crawl metadata (as crawl does)
for entry in discovered:
entry['type'] = TYPE_SNAPSHOT
entry['depth'] = 1
entry['via_snapshot'] = 'test-crawl-snapshot'
# Verify output format
self.assertEqual(discovered[0]['type'], TYPE_SNAPSHOT)
self.assertEqual(discovered[0]['depth'], 1)
self.assertEqual(discovered[0]['url'], 'https://discovered-1.com')
def test_full_pipeline_snapshot_extract(self):
"""
Test: archivebox snapshot URL | archivebox extract
This is equivalent to: archivebox add URL
"""
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
read_args_or_stdin,
TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# === archivebox snapshot https://example.com ===
url = 'https://test-pipeline-1.example.com'
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
snapshot_jsonl = json.dumps(snapshot.to_jsonl())
# === | archivebox extract ===
stdin = StringIO(snapshot_jsonl + '\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
# Extract should receive the snapshot ID
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['id'], str(snapshot.id))
# Verify snapshot exists in DB
db_snapshot = Snapshot.objects.get(id=snapshot.id)
self.assertEqual(db_snapshot.url, url)
def test_full_pipeline_crawl_snapshot_extract(self):
"""
Test: archivebox crawl URL | archivebox snapshot | archivebox extract
This is equivalent to: archivebox add --depth=1 URL
This is equivalent to: archivebox add --depth=0 URL
"""
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.misc.jsonl import (
read_args_or_stdin,
TYPE_SNAPSHOT
TYPE_CRAWL, TYPE_SNAPSHOT
)
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.hooks import collect_urls_from_plugins
created_by_id = get_or_create_system_user_pk()
# === archivebox crawl https://example.com ===
# Step 1: Create snapshot for starting URL
start_url = 'https://test-crawl-pipeline.example.com'
start_snapshot = Snapshot.from_jsonl({'url': start_url}, overrides={'created_by_id': created_by_id})
# Step 2: Simulate extractor output with discovered URLs
snapshot_dir = Path(self.test_dir) / 'archive' / str(start_snapshot.timestamp)
snapshot_dir.mkdir(parents=True, exist_ok=True)
(snapshot_dir / 'parse_html_urls').mkdir(exist_ok=True)
(snapshot_dir / 'parse_html_urls' / 'urls.jsonl').write_text(
'{"url": "https://outlink-1.example.com"}\n'
'{"url": "https://outlink-2.example.com"}\n'
)
# Step 3: Collect discovered URLs (crawl output)
discovered = collect_urls_from_plugins(snapshot_dir)
crawl_output = []
for entry in discovered:
entry['type'] = TYPE_SNAPSHOT
entry['depth'] = 1
crawl_output.append(json.dumps(entry))
url = 'https://test-pipeline-full.example.com'
crawl = Crawl.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
crawl_jsonl = json.dumps(crawl.to_jsonl())
# === | archivebox snapshot ===
stdin = StringIO('\n'.join(crawl_output) + '\n')
stdin = StringIO(crawl_jsonl + '\n')
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 2)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_CRAWL)
# Create snapshots for discovered URLs
# Create snapshots from crawl
created_snapshots = []
for record in records:
snap = Snapshot.from_jsonl(record, overrides={'created_by_id': created_by_id})
created_snapshots.append(snap)
if record.get('type') == TYPE_CRAWL:
crawl_id = record.get('id')
if crawl_id:
db_crawl = Crawl.objects.get(id=crawl_id)
for crawl_url in db_crawl.get_urls_list():
snapshot = Snapshot.from_jsonl({'url': crawl_url}, overrides={'created_by_id': created_by_id})
if snapshot:
created_snapshots.append(snapshot)
self.assertEqual(len(created_snapshots), 2)
self.assertEqual(len(created_snapshots), 1)
self.assertEqual(created_snapshots[0].url, url)
# === | archivebox extract ===
snapshot_jsonl_lines = [json.dumps(s.to_jsonl()) for s in created_snapshots]
@@ -749,15 +786,9 @@ class TestPipingWorkflowIntegration(unittest.TestCase):
stdin.isatty = lambda: False
records = list(read_args_or_stdin((), stream=stdin))
self.assertEqual(len(records), 2)
# Verify all snapshots exist in DB
for record in records:
db_snapshot = Snapshot.objects.get(id=record['id'])
self.assertIn(db_snapshot.url, [
'https://outlink-1.example.com',
'https://outlink-2.example.com'
])
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['type'], TYPE_SNAPSHOT)
self.assertEqual(records[0]['id'], str(created_snapshots[0].id))
class TestDepthWorkflows(unittest.TestCase):
@@ -782,46 +813,44 @@ class TestDepthWorkflows(unittest.TestCase):
def test_depth_0_workflow(self):
"""
Test: archivebox snapshot URL | archivebox extract
Test: archivebox crawl URL | archivebox snapshot | archivebox extract
Depth 0: Only archive the specified URL, no crawling.
Depth 0: Only archive the specified URL, no recursive crawling.
"""
from archivebox.crawls.models import Crawl
from archivebox.core.models import Snapshot
from archivebox.base_models.models import get_or_create_system_user_pk
created_by_id = get_or_create_system_user_pk()
# Create snapshot
# Create crawl with depth 0
url = 'https://depth0-test.example.com'
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
crawl = Crawl.from_jsonl({'url': url, 'max_depth': 0}, overrides={'created_by_id': created_by_id})
# Verify only one snapshot created
self.assertEqual(Snapshot.objects.filter(url=url).count(), 1)
self.assertEqual(crawl.max_depth, 0)
# Create snapshot
snapshot = Snapshot.from_jsonl({'url': url}, overrides={'created_by_id': created_by_id})
self.assertEqual(snapshot.url, url)
def test_depth_1_workflow(self):
"""
Test: archivebox crawl URL | archivebox snapshot | archivebox extract
def test_depth_metadata_in_crawl(self):
"""Test that depth metadata is stored in Crawl."""
from archivebox.crawls.models import Crawl
from archivebox.base_models.models import get_or_create_system_user_pk
Depth 1: Archive URL + all outlinks from that URL.
"""
# This is tested in test_full_pipeline_crawl_snapshot_extract
pass
created_by_id = get_or_create_system_user_pk()
def test_depth_metadata_propagation(self):
"""Test that depth metadata propagates through the pipeline."""
from archivebox.misc.jsonl import TYPE_SNAPSHOT
# Create crawl with depth
crawl = Crawl.from_jsonl(
{'url': 'https://depth-meta-test.example.com', 'max_depth': 2},
overrides={'created_by_id': created_by_id}
)
# Simulate crawl output with depth metadata
crawl_output = [
{'type': TYPE_SNAPSHOT, 'url': 'https://hop1.com', 'depth': 1, 'via_snapshot': 'root'},
{'type': TYPE_SNAPSHOT, 'url': 'https://hop2.com', 'depth': 2, 'via_snapshot': 'hop1'},
]
self.assertEqual(crawl.max_depth, 2)
# Verify depth is preserved
for entry in crawl_output:
self.assertIn('depth', entry)
self.assertIn('via_snapshot', entry)
# Verify in JSONL output
output = crawl.to_jsonl()
self.assertEqual(output['max_depth'], 2)
class TestParserPluginWorkflows(unittest.TestCase):
@@ -974,6 +1003,26 @@ class TestEdgeCases(unittest.TestCase):
# UUID
self.assertEqual(records[2]['id'], '01234567-89ab-cdef-0123-456789abcdef')
def test_crawl_with_multiple_urls(self):
"""Crawl should handle multiple URLs in a single crawl."""
from archivebox.misc.jsonl import TYPE_CRAWL
# Test crawl JSONL with multiple URLs
crawl_output = {
'type': TYPE_CRAWL,
'id': 'test-multi-url-crawl',
'urls': 'https://url1.com\nhttps://url2.com\nhttps://url3.com',
'max_depth': 0,
}
# Parse the URLs
urls = [u.strip() for u in crawl_output['urls'].split('\n') if u.strip()]
self.assertEqual(len(urls), 3)
self.assertEqual(urls[0], 'https://url1.com')
self.assertEqual(urls[1], 'https://url2.com')
self.assertEqual(urls[2], 'https://url3.com')
if __name__ == '__main__':
unittest.main()