#!/usr/bin/env python3 """Integration tests for recursive crawling functionality.""" import json import os import subprocess import sqlite3 from pathlib import Path import pytest from .fixtures import process, disable_extractors_dict, recursive_test_site def test_background_hooks_dont_block_parser_extractors(tmp_path, process, recursive_test_site): """Test that background hooks (.bg.) don't block other extractors from running.""" os.chdir(tmp_path) # Verify init succeeded assert process.returncode == 0, f"archivebox init failed: {process.stderr}" # Enable only parser extractors and background hooks for this test env = os.environ.copy() env.update({ # Disable most extractors "SAVE_WGET": "false", "SAVE_SINGLEFILE": "false", "SAVE_READABILITY": "false", "SAVE_MERCURY": "false", "SAVE_HTMLTOTEXT": "false", "SAVE_PDF": "false", "SAVE_SCREENSHOT": "false", "SAVE_DOM": "false", "SAVE_HEADERS": "false", "SAVE_GIT": "false", "SAVE_YTDLP": "false", "SAVE_ARCHIVEDOTORG": "false", "SAVE_TITLE": "false", "SAVE_FAVICON": "true", "SAVE_WGET": "true", }) result = subprocess.run( ['archivebox', 'add', '--depth=1', '--plugins=favicon,wget,parse_html_urls', recursive_test_site['root_url']], capture_output=True, text=True, env=env, timeout=60, ) assert result.returncode == 0, result.stderr stdout, stderr = result.stdout, result.stderr if stderr: print(f"\n=== STDERR ===\n{stderr}\n=== END STDERR ===\n") if stdout: print(f"\n=== STDOUT (last 2000 chars) ===\n{stdout[-2000:]}\n=== END STDOUT ===\n") conn = sqlite3.connect('index.sqlite3') c = conn.cursor() snapshots = c.execute("SELECT url, depth, status FROM core_snapshot").fetchall() bg_hooks = c.execute( "SELECT plugin, status FROM core_archiveresult WHERE plugin IN ('favicon', 'consolelog', 'ssl', 'responses', 'redirects', 'staticfile') ORDER BY plugin" ).fetchall() parser_extractors = c.execute( "SELECT plugin, status FROM core_archiveresult WHERE plugin LIKE 'parse_%_urls' ORDER BY plugin" ).fetchall() all_extractors = c.execute( "SELECT plugin, status FROM core_archiveresult ORDER BY plugin" ).fetchall() conn.close() assert len(snapshots) > 0, ( f"Should have created snapshot after Crawl hooks finished. " f"If this fails, Crawl hooks may be taking too long. " f"Snapshots: {snapshots}" ) assert len(all_extractors) > 0, ( f"Should have extractors created for snapshot. " f"If this fails, Snapshot.run() may not have started. " f"Got: {all_extractors}" ) parser_statuses = [status for _, status in parser_extractors] assert 'started' in parser_statuses or 'succeeded' in parser_statuses or 'failed' in parser_statuses, \ f"Parser extractors should have run, got statuses: {parser_statuses}. Background hooks: {bg_hooks}" def test_parser_extractors_emit_snapshot_jsonl(tmp_path, process, recursive_test_site): """Test that parser extractors emit Snapshot JSONL to stdout.""" os.chdir(tmp_path) env = os.environ.copy() env.update({ "SAVE_WGET": "false", "SAVE_SINGLEFILE": "false", "SAVE_READABILITY": "false", "SAVE_MERCURY": "false", "SAVE_HTMLTOTEXT": "false", "SAVE_PDF": "false", "SAVE_SCREENSHOT": "false", "SAVE_DOM": "false", "SAVE_HEADERS": "false", "SAVE_GIT": "false", "SAVE_YTDLP": "false", "SAVE_ARCHIVEDOTORG": "false", "SAVE_TITLE": "false", "SAVE_FAVICON": "false", "USE_CHROME": "false", }) result = subprocess.run( ['archivebox', 'add', '--depth=0', '--plugins=wget,parse_html_urls', recursive_test_site['root_url']], capture_output=True, text=True, env=env, timeout=60, ) assert result.returncode == 0, result.stderr conn = sqlite3.connect('index.sqlite3') c = conn.cursor() parse_html = c.execute( "SELECT id, status, output_str FROM core_archiveresult WHERE plugin LIKE '%parse_html_urls' ORDER BY id LIMIT 1" ).fetchone() conn.close() if parse_html: status = parse_html[1] output = parse_html[2] or "" assert status in ['started', 'succeeded', 'failed'], \ f"60_parse_html_urls should have run, got status: {status}" if status == 'succeeded' and output: assert 'parsed' in output.lower(), "Parser summary should report parsed URLs" urls_jsonl_files = list(Path("users/system/snapshots").rglob("parse_html_urls/**/urls.jsonl")) assert urls_jsonl_files, "parse_html_urls should write urls.jsonl output" records = [] for line in urls_jsonl_files[0].read_text().splitlines(): if line.strip(): records.append(json.loads(line)) assert records, "urls.jsonl should contain parsed Snapshot records" assert all(record.get("type") == "Snapshot" for record in records), \ f"Expected Snapshot JSONL records, got: {records}" def test_recursive_crawl_creates_child_snapshots(tmp_path, process, recursive_test_site): """Test that recursive crawling creates child snapshots with proper depth and parent_snapshot_id.""" os.chdir(tmp_path) env = os.environ.copy() env.update({ "URL_ALLOWLIST": r"127\.0\.0\.1[:/].*", "SAVE_READABILITY": "false", "SAVE_SINGLEFILE": "false", "SAVE_MERCURY": "false", "SAVE_SCREENSHOT": "false", "SAVE_PDF": "false", "SAVE_HEADERS": "false", "SAVE_ARCHIVEDOTORG": "false", "SAVE_GIT": "false", "SAVE_YTDLP": "false", "SAVE_TITLE": "false", }) result = subprocess.run( ['archivebox', 'add', '--depth=1', '--plugins=wget,parse_html_urls', recursive_test_site['root_url']], capture_output=True, text=True, env=env, timeout=60, ) assert result.returncode == 0, result.stderr stdout, stderr = result.stdout, result.stderr if stderr: print(f"\n=== STDERR ===\n{stderr}\n=== END STDERR ===\n") if stdout: print(f"\n=== STDOUT (last 2000 chars) ===\n{stdout[-2000:]}\n=== END STDOUT ===\n") conn = sqlite3.connect('index.sqlite3') c = conn.cursor() all_snapshots = c.execute("SELECT url, depth FROM core_snapshot").fetchall() root_snapshot = c.execute( "SELECT id, url, depth, parent_snapshot_id FROM core_snapshot WHERE depth = 0 ORDER BY created_at LIMIT 1" ).fetchone() child_snapshots = c.execute( "SELECT id, url, depth, parent_snapshot_id FROM core_snapshot WHERE depth = 1" ).fetchall() crawl = c.execute( "SELECT id, max_depth FROM crawls_crawl ORDER BY created_at DESC LIMIT 1" ).fetchone() parser_status = c.execute( "SELECT plugin, status FROM core_archiveresult WHERE snapshot_id = ? AND plugin LIKE 'parse_%_urls'", (root_snapshot[0] if root_snapshot else '',) ).fetchall() started_extractors = c.execute( "SELECT plugin, status FROM core_archiveresult WHERE snapshot_id = ? AND status = 'started'", (root_snapshot[0] if root_snapshot else '',) ).fetchall() conn.close() assert root_snapshot is not None, f"Root snapshot should exist at depth=0. All snapshots: {all_snapshots}" root_id = root_snapshot[0] assert crawl is not None, "Crawl should be created" assert crawl[1] == 1, f"Crawl max_depth should be 1, got {crawl[1]}" assert len(child_snapshots) > 0, \ f"Child snapshots should be created from monadical.com links. Parser status: {parser_status}. Started extractors blocking: {started_extractors}" for child_id, child_url, child_depth, parent_id in child_snapshots: assert child_depth == 1, f"Child snapshot should have depth=1, got {child_depth}" assert parent_id == root_id, \ f"Child snapshot {child_url} should have parent_snapshot_id={root_id}, got {parent_id}" def test_recursive_crawl_respects_depth_limit(tmp_path, process, disable_extractors_dict, recursive_test_site): """Test that recursive crawling stops at max_depth.""" os.chdir(tmp_path) result = subprocess.run( ['archivebox', 'add', '--depth=1', '--plugins=wget,parse_html_urls', recursive_test_site['root_url']], capture_output=True, text=True, env=disable_extractors_dict, timeout=60, ) assert result.returncode == 0, result.stderr conn = sqlite3.connect('index.sqlite3') c = conn.cursor() max_depth_found = c.execute( "SELECT MAX(depth) FROM core_snapshot" ).fetchone()[0] depth_counts = c.execute( "SELECT depth, COUNT(*) FROM core_snapshot GROUP BY depth ORDER BY depth" ).fetchall() conn.close() assert max_depth_found is not None, "Should have at least one snapshot" assert max_depth_found <= 1, \ f"Max depth should not exceed 1, got {max_depth_found}. Depth distribution: {depth_counts}" def test_crawl_snapshot_has_parent_snapshot_field(tmp_path, process, disable_extractors_dict): """Test that Snapshot model has parent_snapshot field.""" os.chdir(tmp_path) conn = sqlite3.connect('index.sqlite3') c = conn.cursor() # Check schema for parent_snapshot_id column schema = c.execute("PRAGMA table_info(core_snapshot)").fetchall() conn.close() column_names = [col[1] for col in schema] assert 'parent_snapshot_id' in column_names, \ f"Snapshot table should have parent_snapshot_id column. Columns: {column_names}" def test_snapshot_depth_field_exists(tmp_path, process, disable_extractors_dict): """Test that Snapshot model has depth field.""" os.chdir(tmp_path) conn = sqlite3.connect('index.sqlite3') c = conn.cursor() # Check schema for depth column schema = c.execute("PRAGMA table_info(core_snapshot)").fetchall() conn.close() column_names = [col[1] for col in schema] assert 'depth' in column_names, \ f"Snapshot table should have depth column. Columns: {column_names}" def test_root_snapshot_has_depth_zero(tmp_path, process, disable_extractors_dict, recursive_test_site): """Test that root snapshots are created with depth=0.""" os.chdir(tmp_path) result = subprocess.run( ['archivebox', 'add', '--depth=1', '--plugins=wget,parse_html_urls', recursive_test_site['root_url']], capture_output=True, text=True, env=disable_extractors_dict, timeout=60, ) assert result.returncode == 0, result.stderr conn = sqlite3.connect('index.sqlite3') c = conn.cursor() snapshot = c.execute( "SELECT id, depth FROM core_snapshot WHERE url = ? ORDER BY created_at LIMIT 1", (recursive_test_site['root_url'],) ).fetchone() conn.close() assert snapshot is not None, "Root snapshot should be created" assert snapshot[1] == 0, f"Root snapshot should have depth=0, got {snapshot[1]}" def test_archiveresult_worker_queue_filters_by_foreground_extractors(tmp_path, process, recursive_test_site): """Test that background hooks don't block foreground extractors from running.""" os.chdir(tmp_path) env = os.environ.copy() env.update({ "SAVE_WGET": "true", "SAVE_SINGLEFILE": "false", "SAVE_PDF": "false", "SAVE_SCREENSHOT": "false", "SAVE_FAVICON": "true", }) result = subprocess.run( ['archivebox', 'add', '--plugins=favicon,wget,parse_html_urls', recursive_test_site['root_url']], capture_output=True, text=True, env=env, timeout=60, ) assert result.returncode == 0, result.stderr conn = sqlite3.connect('index.sqlite3') c = conn.cursor() bg_results = c.execute( "SELECT plugin, status FROM core_archiveresult WHERE plugin IN ('favicon', 'consolelog', 'ssl', 'responses', 'redirects', 'staticfile') AND status IN ('started', 'succeeded', 'failed')" ).fetchall() parser_status = c.execute( "SELECT plugin, status FROM core_archiveresult WHERE plugin LIKE 'parse_%_urls'" ).fetchall() conn.close() if len(bg_results) > 0: parser_statuses = [status for _, status in parser_status] non_queued = [s for s in parser_statuses if s != 'queued'] assert len(non_queued) > 0 or len(parser_status) == 0, \ f"With {len(bg_results)} background hooks started, parser extractors should still run. " \ f"Got statuses: {parser_statuses}" if __name__ == '__main__': pytest.main([__file__, '-v'])