From 6fadcf5168d9a1b68a5e4a2463109e090117205f Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Wed, 31 Dec 2025 18:01:53 -0800 Subject: [PATCH] remove model health stats from models that dont need it --- .../core/migrations/0023_upgrade_to_0_9_0.py | 21 +- ...options_alter_snapshot_options_and_more.py | 12 +- archivebox/core/models.py | 19 +- archivebox/crawls/models.py | 8 +- .../test_parse_rss_urls_comprehensive.py.bak | 1002 ----------------- 5 files changed, 46 insertions(+), 1016 deletions(-) delete mode 100644 archivebox/plugins/parse_rss_urls/tests/test_parse_rss_urls_comprehensive.py.bak diff --git a/archivebox/core/migrations/0023_upgrade_to_0_9_0.py b/archivebox/core/migrations/0023_upgrade_to_0_9_0.py index bc338eae..59dcd9e4 100644 --- a/archivebox/core/migrations/0023_upgrade_to_0_9_0.py +++ b/archivebox/core/migrations/0023_upgrade_to_0_9_0.py @@ -143,6 +143,11 @@ def upgrade_core_tables(apps, schema_editor): if has_added and not has_bookmarked_at: # Migrating from v0.7.2 (has added/updated, no bookmarked_at/created_at/modified_at) print('Migrating Snapshot from v0.7.2 schema...') + # Debug: Check what data we're about to copy + cursor.execute("SELECT id, added, updated FROM core_snapshot LIMIT 3") + sample_data = cursor.fetchall() + print(f'DEBUG 0023: Sample Snapshot data before migration: {sample_data}') + cursor.execute(""" INSERT OR IGNORE INTO core_snapshot_new ( id, url, timestamp, title, bookmarked_at, created_at, modified_at @@ -154,6 +159,11 @@ def upgrade_core_tables(apps, schema_editor): COALESCE(updated, added, CURRENT_TIMESTAMP) as modified_at FROM core_snapshot; """) + + # Debug: Check what was inserted + cursor.execute("SELECT id, bookmarked_at, modified_at FROM core_snapshot_new LIMIT 3") + inserted_data = cursor.fetchall() + print(f'DEBUG 0023: Sample Snapshot data after INSERT: {inserted_data}') elif has_bookmarked_at and not has_added: # Migrating from v0.8.6rc0 (already has bookmarked_at/created_at/modified_at) print('Migrating Snapshot from v0.8.6rc0 schema...') @@ -298,12 +308,15 @@ class Migration(migrations.Migration): ), ], state_operations=[ - # Remove old ArchiveResult fields - migrations.RemoveField(model_name='archiveresult', name='extractor'), - migrations.RemoveField(model_name='archiveresult', name='output'), - # Remove old Snapshot fields + # NOTE: We do NOT remove extractor/output here for ArchiveResult! + # They are still in the database and will be removed by migration 0025 + # after copying their data to the new field names (plugin, output_str). + + # However, for Snapshot, we DO remove added/updated here because + # the database operations above already renamed them to bookmarked_at/created_at/modified_at. migrations.RemoveField(model_name='snapshot', name='added'), migrations.RemoveField(model_name='snapshot', name='updated'), + # SnapshotTag table already exists from v0.7.2, just declare it in state migrations.CreateModel( name='SnapshotTag', diff --git a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py index 04097cc7..1a68ab06 100644 --- a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py +++ b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py @@ -25,7 +25,7 @@ def copy_old_fields_to_new(apps, schema_editor): count = cursor.fetchone()[0] print(f'DEBUG 0025: Updated {count} rows with plugin data') else: - print(f'DEBUG 0025: NOT copying - extractor in cols: {extractor" in cols}, plugin in cols: {"plugin" in cols}') + print(f'DEBUG 0025: NOT copying - extractor in cols: {"extractor" in cols}, plugin in cols: {"plugin" in cols}') if 'output' in cols and 'output_str' in cols: # Copy output -> output_str @@ -239,6 +239,16 @@ class Migration(migrations.Migration): copy_old_fields_to_new, reverse_code=migrations.RunPython.noop, ), + # Now remove the old ArchiveResult fields after data has been copied + migrations.RemoveField( + model_name='archiveresult', + name='extractor', + ), + migrations.RemoveField( + model_name='archiveresult', + name='output', + ), + # NOTE: Snapshot's added/updated fields were already removed by migration 0023 migrations.AlterField( model_name='archiveresult', name='end_ts', diff --git a/archivebox/core/models.py b/archivebox/core/models.py index b8aa660c..85d5cee0 100755 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -29,7 +29,7 @@ from archivebox.hooks import ( get_plugins, get_plugin_name, get_plugin_icon, ) from archivebox.base_models.models import ( - ModelWithUUID, ModelWithSerializers, ModelWithOutputDir, + ModelWithUUID, ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, get_or_create_system_user_pk, ) @@ -40,7 +40,7 @@ from archivebox.machine.models import NetworkInterface, Binary -class Tag(ModelWithSerializers): +class Tag(ModelWithUUID): # Keep AutoField for compatibility with main branch migrations # Don't use UUIDField here - requires complex FK transformation id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID') @@ -2254,7 +2254,7 @@ class SnapshotMachine(BaseStateMachine, strict_states=True): ) -class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine): +class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithStateMachine): class StatusChoices(models.TextChoices): QUEUED = 'queued', 'Queued' STARTED = 'started', 'Started' @@ -2551,11 +2551,20 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi pass def cascade_health_update(self, success: bool): - """Update health stats for self, parent Snapshot, and grandparent Crawl.""" - self.increment_health_stats(success) + """Update health stats for parent Snapshot, Crawl, and execution infrastructure (Binary, Machine, NetworkInterface).""" + # Update archival hierarchy self.snapshot.increment_health_stats(success) self.snapshot.crawl.increment_health_stats(success) + # Update execution infrastructure + if self.binary: + self.binary.increment_health_stats(success) + if self.binary.machine: + self.binary.machine.increment_health_stats(success) + + if self.iface: + self.iface.increment_health_stats(success) + def run(self): """ Execute this ArchiveResult's hook and update status. diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 276d02f8..e39526b5 100755 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -16,14 +16,14 @@ from statemachine import State, registry from rich import print from archivebox.config import CONSTANTS -from archivebox.base_models.models import ModelWithSerializers, ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, get_or_create_system_user_pk +from archivebox.base_models.models import ModelWithUUID, ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, get_or_create_system_user_pk from archivebox.workers.models import ModelWithStateMachine, BaseStateMachine if TYPE_CHECKING: from archivebox.core.models import Snapshot, ArchiveResult -class CrawlSchedule(ModelWithSerializers, ModelWithNotes, ModelWithHealthStats): +class CrawlSchedule(ModelWithUUID, ModelWithNotes): id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True) created_at = models.DateTimeField(default=timezone.now, db_index=True) created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False) @@ -197,9 +197,9 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith @property def output_dir_parent(self) -> str: - """Construct parent directory: users/{user_id}/crawls/{YYYYMMDD}""" + """Construct parent directory: users/{username}/crawls/{YYYYMMDD}""" date_str = self.created_at.strftime('%Y%m%d') - return f'users/{self.created_by_id}/crawls/{date_str}' + return f'users/{self.created_by.username}/crawls/{date_str}' @property def output_dir_name(self) -> str: diff --git a/archivebox/plugins/parse_rss_urls/tests/test_parse_rss_urls_comprehensive.py.bak b/archivebox/plugins/parse_rss_urls/tests/test_parse_rss_urls_comprehensive.py.bak deleted file mode 100644 index 562c6805..00000000 --- a/archivebox/plugins/parse_rss_urls/tests/test_parse_rss_urls_comprehensive.py.bak +++ /dev/null @@ -1,1002 +0,0 @@ -#!/usr/bin/env python3 -"""Comprehensive tests for parse_rss_urls extractor covering various RSS/Atom variants.""" - -import json -import subprocess -import sys -from pathlib import Path - -import pytest - -PLUGIN_DIR = Path(__file__).parent.parent -SCRIPT_PATH = next(PLUGIN_DIR.glob('on_Snapshot__*_parse_rss_urls.*'), None) - - -class TestRssVariants: - """Test various RSS format variants.""" - - def test_rss_091(self, tmp_path): - """Test RSS 0.91 format (oldest RSS version).""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - RSS 0.91 Feed - https://example.com - Test RSS 0.91 - - RSS 0.91 Article - https://example.com/article1 - An article in RSS 0.91 format - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0, f"Failed: {result.stderr}" - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - assert entry['url'] == 'https://example.com/article1' - assert entry['title'] == 'RSS 0.91 Article' - assert entry['plugin'] == 'parse_rss_urls' - - def test_rss_10_rdf(self, tmp_path): - """Test RSS 1.0 (RDF) format.""" - input_file = tmp_path / 'feed.rdf' - input_file.write_text(''' - - - RSS 1.0 Feed - https://example.com - - - RDF Item 1 - https://example.com/rdf1 - 2024-01-15T10:30:00Z - Technology - - - RDF Item 2 - https://example.com/rdf2 - 2024-01-16T14:20:00Z - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0, f"Failed: {result.stderr}" - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - entries = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - - urls = {e['url'] for e in entries} - assert 'https://example.com/rdf1' in urls - assert 'https://example.com/rdf2' in urls - assert any(e.get('bookmarked_at') for e in entries) - - def test_rss_20_with_full_metadata(self, tmp_path): - """Test RSS 2.0 with all standard metadata fields.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - Full RSS 2.0 - https://example.com - Complete RSS 2.0 feed - - Complete Article - https://example.com/complete - Full description here - author@example.com - Technology - Programming - https://example.com/complete - Mon, 15 Jan 2024 10:30:00 GMT - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - content = result.stdout.strip() - lines = content.split('\n') - - # Check for Tag records - tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag'] - tag_names = {t['name'] for t in tags} - assert 'Technology' in tag_names - assert 'Programming' in tag_names - - # Check Snapshot record - snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - entry = snapshots[0] - assert entry['url'] == 'https://example.com/complete' - assert entry['title'] == 'Complete Article' - assert 'bookmarked_at' in entry - assert entry['tags'] == 'Technology,Programming' or entry['tags'] == 'Programming,Technology' - - -class TestAtomVariants: - """Test various Atom format variants.""" - - def test_atom_10_full(self, tmp_path): - """Test Atom 1.0 with full metadata.""" - input_file = tmp_path / 'feed.atom' - input_file.write_text(''' - - Atom 1.0 Feed - 2024-01-15T00:00:00Z - - Atom Entry 1 - - urn:uuid:1234-5678 - 2024-01-15T10:30:00Z - 2024-01-14T08:00:00Z - - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip()] - - tags = [json.loads(line) for line in lines if json.loads(line).get('type') == 'Tag'] - tag_names = {t['name'] for t in tags} - assert 'science' in tag_names - assert 'research' in tag_names - - snapshots = [json.loads(line) for line in lines if json.loads(line).get('type') == 'Snapshot'] - entry = snapshots[0] - assert entry['url'] == 'https://atom.example.com/1' - assert 'bookmarked_at' in entry - - def test_atom_with_alternate_link(self, tmp_path): - """Test Atom feed with alternate link types.""" - input_file = tmp_path / 'feed.atom' - input_file.write_text(''' - - Atom Alternate Links - - Entry with alternate - - - 2024-01-15T10:30:00Z - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - # feedparser should pick the alternate link - assert 'atom.example.com/article' in entry['url'] - - -class TestDateFormats: - """Test various date format handling.""" - - def test_rfc822_date(self, tmp_path): - """Test RFC 822 date format (RSS 2.0 standard).""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - RFC 822 Date - https://example.com/rfc822 - Wed, 15 Jan 2020 10:30:45 GMT - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - assert 'bookmarked_at' in entry - assert '2020-01-15' in entry['bookmarked_at'] - - def test_iso8601_date(self, tmp_path): - """Test ISO 8601 date format (Atom standard).""" - input_file = tmp_path / 'feed.atom' - input_file.write_text(''' - - - ISO 8601 Date - - 2024-01-15T10:30:45.123Z - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - assert 'bookmarked_at' in entry - assert '2024-01-15' in entry['bookmarked_at'] - - def test_updated_vs_published_date(self, tmp_path): - """Test that published date is preferred over updated date.""" - input_file = tmp_path / 'feed.atom' - input_file.write_text(''' - - - Date Priority Test - - 2024-01-10T10:00:00Z - 2024-01-15T10:00:00Z - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - # Should use published date (Jan 10) not updated date (Jan 15) - assert '2024-01-10' in entry['bookmarked_at'] - - def test_only_updated_date(self, tmp_path): - """Test fallback to updated date when published is missing.""" - input_file = tmp_path / 'feed.atom' - input_file.write_text(''' - - - Only Updated - - 2024-01-20T10:00:00Z - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - assert '2024-01-20' in entry['bookmarked_at'] - - def test_no_date(self, tmp_path): - """Test entries without any date.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - No Date - https://example.com/nodate - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - assert 'bookmarked_at' not in entry - - -class TestTagsAndCategories: - """Test various tag and category formats.""" - - def test_rss_categories(self, tmp_path): - """Test RSS 2.0 category elements.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - Multi Category - https://example.com/cats - Tech - Web - Programming - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - - tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag'] - tag_names = {t['name'] for t in tags} - assert 'Tech' in tag_names - assert 'Web' in tag_names - assert 'Programming' in tag_names - - snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - entry = snapshots[0] - tags_list = entry['tags'].split(',') - assert len(tags_list) == 3 - - def test_atom_categories(self, tmp_path): - """Test Atom category elements with various attributes.""" - input_file = tmp_path / 'feed.atom' - input_file.write_text(''' - - - Atom Categories - - - - 2024-01-15T10:00:00Z - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - - tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag'] - tag_names = {t['name'] for t in tags} - # feedparser extracts the 'term' attribute - assert 'python' in tag_names - assert 'django' in tag_names - - def test_no_tags(self, tmp_path): - """Test entries without tags.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - No Tags - https://example.com/notags - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - assert 'tags' not in entry or entry['tags'] == '' - - def test_duplicate_tags(self, tmp_path): - """Test that duplicate tags are handled properly.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - Duplicate Tags - https://example.com/dups - Python - Python - Web - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag'] - # Tag records should be unique - tag_names = [t['name'] for t in tags] - assert tag_names.count('Python') == 1 - - -class TestCustomNamespaces: - """Test custom namespace handling (Dublin Core, Media RSS, etc.).""" - - def test_dublin_core_metadata(self, tmp_path): - """Test Dublin Core namespace fields.""" - input_file = tmp_path / 'feed.rdf' - input_file.write_text(''' - - - Dublin Core Feed - - - Dublin Core Article - https://example.com/dc1 - John Doe - Technology - 2024-01-15T10:30:00Z - Copyright 2024 - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - entry = snapshots[0] - - assert entry['url'] == 'https://example.com/dc1' - assert entry['title'] == 'Dublin Core Article' - # feedparser should parse dc:date as bookmarked_at - assert 'bookmarked_at' in entry - - def test_media_rss_namespace(self, tmp_path): - """Test Media RSS namespace (common in podcast feeds).""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - Media RSS Feed - - Podcast Episode 1 - https://example.com/podcast/1 - - - Mon, 15 Jan 2024 10:00:00 GMT - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - assert entry['url'] == 'https://example.com/podcast/1' - assert entry['title'] == 'Podcast Episode 1' - - def test_itunes_namespace(self, tmp_path): - """Test iTunes namespace (common in podcast feeds).""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - iTunes Podcast - - Episode 1: Getting Started - https://example.com/ep1 - Jane Smith - 45:30 - programming, tutorial, beginner - Tue, 16 Jan 2024 08:00:00 GMT - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - entry = snapshots[0] - - assert entry['url'] == 'https://example.com/ep1' - assert entry['title'] == 'Episode 1: Getting Started' - - -class TestEdgeCases: - """Test edge cases and malformed data.""" - - def test_missing_title(self, tmp_path): - """Test entries without title.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - https://example.com/notitle - Mon, 15 Jan 2024 10:00:00 GMT - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - assert entry['url'] == 'https://example.com/notitle' - assert 'title' not in entry - - def test_missing_link(self, tmp_path): - """Test entries without link (should be skipped).""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - No Link - This entry has no link - - - Has Link - https://example.com/haslink - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - # Should only have the entry with a link - assert entry['url'] == 'https://example.com/haslink' - assert '1 URL' in result.stdout - - def test_html_entities_in_title(self, tmp_path): - """Test HTML entities in titles are properly decoded.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - Using <div> & <span> tags - https://example.com/html - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - assert entry['title'] == 'Using
& tags' - - def test_special_characters_in_tags(self, tmp_path): - """Test special characters in tags.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - Special Tags - https://example.com/special - C++ - Node.js - Web/Mobile - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - - tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag'] - tag_names = {t['name'] for t in tags} - assert 'C++' in tag_names - assert 'Node.js' in tag_names - assert 'Web/Mobile' in tag_names - - def test_cdata_sections(self, tmp_path): - """Test CDATA sections in titles and descriptions.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - <![CDATA[Using <strong>HTML</strong> in titles]]> - https://example.com/cdata - markup]]> - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - # feedparser should strip HTML tags - assert 'HTML' in entry['title'] - assert entry['url'] == 'https://example.com/cdata' - - def test_relative_urls(self, tmp_path): - """Test that relative URLs are preserved (feedparser handles them).""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - https://example.com - - Relative URL - /article/relative - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - # feedparser may convert relative to absolute, or leave as-is - assert 'article/relative' in entry['url'] - - def test_unicode_characters(self, tmp_path): - """Test Unicode characters in feed content.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - - Unicode: 日本語 Français 中文 العربية - https://example.com/unicode - 日本語 - Français - - - - ''', encoding='utf-8') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip()] - - snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - entry = snapshots[0] - assert '日本語' in entry['title'] - assert 'Français' in entry['title'] - - def test_very_long_title(self, tmp_path): - """Test handling of very long titles.""" - long_title = 'A' * 1000 - input_file = tmp_path / 'feed.rss' - input_file.write_text(f''' - - - - {long_title} - https://example.com/long - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - assert len(entry['title']) == 1000 - assert entry['title'] == long_title - - def test_multiple_entries_batch(self, tmp_path): - """Test processing a large batch of entries.""" - items = [] - for i in range(100): - items.append(f''' - - Article {i} - https://example.com/article/{i} - Tag{i % 10} - Mon, {15 + (i % 15)} Jan 2024 10:00:00 GMT - - ''') - - input_file = tmp_path / 'feed.rss' - input_file.write_text(f''' - - - Large Feed - {''.join(items)} - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - assert 'Found 100 URLs' in result.stdout - - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - - # Should have 10 unique tags (Tag0-Tag9) + 100 snapshots - tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag'] - snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - - assert len(tags) == 10 - assert len(snapshots) == 100 - - -class TestRealWorldFeeds: - """Test patterns from real-world RSS feeds.""" - - def test_medium_style_feed(self, tmp_path): - """Test Medium-style feed structure.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - - Medium Feed - - Article Title - https://medium.com/@user/article-slug-123abc - https://medium.com/p/123abc - Wed, 15 Jan 2024 10:30:00 GMT - Programming - JavaScript - Author Name - - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - - snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - entry = snapshots[0] - assert 'medium.com' in entry['url'] - assert entry['title'] == 'Article Title' - - def test_reddit_style_feed(self, tmp_path): - """Test Reddit-style feed structure.""" - input_file = tmp_path / 'feed.rss' - input_file.write_text(''' - - Reddit Feed - - Post Title - - 2024-01-15T10:30:00+00:00 - - t3_abc123 - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line] - - snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot'] - entry = snapshots[0] - assert 'reddit.com' in entry['url'] - - def test_youtube_style_feed(self, tmp_path): - """Test YouTube-style feed structure.""" - input_file = tmp_path / 'feed.atom' - input_file.write_text(''' - - YouTube Channel - - Video Title - - 2024-01-15T10:30:00+00:00 - dQw4w9WgXcQ - UCxxxxxxxx - - - ''') - - result = subprocess.run( - [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'], - cwd=tmp_path, - capture_output=True, - text=True, - ) - - assert result.returncode == 0 - # Output goes to stdout (JSONL) - lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line] - entry = json.loads(lines[0]) - - assert 'youtube.com' in entry['url'] - assert 'dQw4w9WgXcQ' in entry['url'] - - -if __name__ == '__main__': - pytest.main([__file__, '-v'])