diff --git a/archivebox/core/migrations/0023_upgrade_to_0_9_0.py b/archivebox/core/migrations/0023_upgrade_to_0_9_0.py
index bc338eae..59dcd9e4 100644
--- a/archivebox/core/migrations/0023_upgrade_to_0_9_0.py
+++ b/archivebox/core/migrations/0023_upgrade_to_0_9_0.py
@@ -143,6 +143,11 @@ def upgrade_core_tables(apps, schema_editor):
if has_added and not has_bookmarked_at:
# Migrating from v0.7.2 (has added/updated, no bookmarked_at/created_at/modified_at)
print('Migrating Snapshot from v0.7.2 schema...')
+ # Debug: Check what data we're about to copy
+ cursor.execute("SELECT id, added, updated FROM core_snapshot LIMIT 3")
+ sample_data = cursor.fetchall()
+ print(f'DEBUG 0023: Sample Snapshot data before migration: {sample_data}')
+
cursor.execute("""
INSERT OR IGNORE INTO core_snapshot_new (
id, url, timestamp, title, bookmarked_at, created_at, modified_at
@@ -154,6 +159,11 @@ def upgrade_core_tables(apps, schema_editor):
COALESCE(updated, added, CURRENT_TIMESTAMP) as modified_at
FROM core_snapshot;
""")
+
+ # Debug: Check what was inserted
+ cursor.execute("SELECT id, bookmarked_at, modified_at FROM core_snapshot_new LIMIT 3")
+ inserted_data = cursor.fetchall()
+ print(f'DEBUG 0023: Sample Snapshot data after INSERT: {inserted_data}')
elif has_bookmarked_at and not has_added:
# Migrating from v0.8.6rc0 (already has bookmarked_at/created_at/modified_at)
print('Migrating Snapshot from v0.8.6rc0 schema...')
@@ -298,12 +308,15 @@ class Migration(migrations.Migration):
),
],
state_operations=[
- # Remove old ArchiveResult fields
- migrations.RemoveField(model_name='archiveresult', name='extractor'),
- migrations.RemoveField(model_name='archiveresult', name='output'),
- # Remove old Snapshot fields
+ # NOTE: We do NOT remove extractor/output here for ArchiveResult!
+ # They are still in the database and will be removed by migration 0025
+ # after copying their data to the new field names (plugin, output_str).
+
+ # However, for Snapshot, we DO remove added/updated here because
+ # the database operations above already renamed them to bookmarked_at/created_at/modified_at.
migrations.RemoveField(model_name='snapshot', name='added'),
migrations.RemoveField(model_name='snapshot', name='updated'),
+
# SnapshotTag table already exists from v0.7.2, just declare it in state
migrations.CreateModel(
name='SnapshotTag',
diff --git a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py
index 04097cc7..1a68ab06 100644
--- a/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py
+++ b/archivebox/core/migrations/0025_alter_archiveresult_options_alter_snapshot_options_and_more.py
@@ -25,7 +25,7 @@ def copy_old_fields_to_new(apps, schema_editor):
count = cursor.fetchone()[0]
print(f'DEBUG 0025: Updated {count} rows with plugin data')
else:
- print(f'DEBUG 0025: NOT copying - extractor in cols: {extractor" in cols}, plugin in cols: {"plugin" in cols}')
+ print(f'DEBUG 0025: NOT copying - extractor in cols: {"extractor" in cols}, plugin in cols: {"plugin" in cols}')
if 'output' in cols and 'output_str' in cols:
# Copy output -> output_str
@@ -239,6 +239,16 @@ class Migration(migrations.Migration):
copy_old_fields_to_new,
reverse_code=migrations.RunPython.noop,
),
+ # Now remove the old ArchiveResult fields after data has been copied
+ migrations.RemoveField(
+ model_name='archiveresult',
+ name='extractor',
+ ),
+ migrations.RemoveField(
+ model_name='archiveresult',
+ name='output',
+ ),
+ # NOTE: Snapshot's added/updated fields were already removed by migration 0023
migrations.AlterField(
model_name='archiveresult',
name='end_ts',
diff --git a/archivebox/core/models.py b/archivebox/core/models.py
index b8aa660c..85d5cee0 100755
--- a/archivebox/core/models.py
+++ b/archivebox/core/models.py
@@ -29,7 +29,7 @@ from archivebox.hooks import (
get_plugins, get_plugin_name, get_plugin_icon,
)
from archivebox.base_models.models import (
- ModelWithUUID, ModelWithSerializers, ModelWithOutputDir,
+ ModelWithUUID, ModelWithOutputDir,
ModelWithConfig, ModelWithNotes, ModelWithHealthStats,
get_or_create_system_user_pk,
)
@@ -40,7 +40,7 @@ from archivebox.machine.models import NetworkInterface, Binary
-class Tag(ModelWithSerializers):
+class Tag(ModelWithUUID):
# Keep AutoField for compatibility with main branch migrations
# Don't use UUIDField here - requires complex FK transformation
id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
@@ -2254,7 +2254,7 @@ class SnapshotMachine(BaseStateMachine, strict_states=True):
)
-class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
+class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithStateMachine):
class StatusChoices(models.TextChoices):
QUEUED = 'queued', 'Queued'
STARTED = 'started', 'Started'
@@ -2551,11 +2551,20 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
pass
def cascade_health_update(self, success: bool):
- """Update health stats for self, parent Snapshot, and grandparent Crawl."""
- self.increment_health_stats(success)
+ """Update health stats for parent Snapshot, Crawl, and execution infrastructure (Binary, Machine, NetworkInterface)."""
+ # Update archival hierarchy
self.snapshot.increment_health_stats(success)
self.snapshot.crawl.increment_health_stats(success)
+ # Update execution infrastructure
+ if self.binary:
+ self.binary.increment_health_stats(success)
+ if self.binary.machine:
+ self.binary.machine.increment_health_stats(success)
+
+ if self.iface:
+ self.iface.increment_health_stats(success)
+
def run(self):
"""
Execute this ArchiveResult's hook and update status.
diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py
index 276d02f8..e39526b5 100755
--- a/archivebox/crawls/models.py
+++ b/archivebox/crawls/models.py
@@ -16,14 +16,14 @@ from statemachine import State, registry
from rich import print
from archivebox.config import CONSTANTS
-from archivebox.base_models.models import ModelWithSerializers, ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, get_or_create_system_user_pk
+from archivebox.base_models.models import ModelWithUUID, ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, get_or_create_system_user_pk
from archivebox.workers.models import ModelWithStateMachine, BaseStateMachine
if TYPE_CHECKING:
from archivebox.core.models import Snapshot, ArchiveResult
-class CrawlSchedule(ModelWithSerializers, ModelWithNotes, ModelWithHealthStats):
+class CrawlSchedule(ModelWithUUID, ModelWithNotes):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False)
@@ -197,9 +197,9 @@ class Crawl(ModelWithOutputDir, ModelWithConfig, ModelWithHealthStats, ModelWith
@property
def output_dir_parent(self) -> str:
- """Construct parent directory: users/{user_id}/crawls/{YYYYMMDD}"""
+ """Construct parent directory: users/{username}/crawls/{YYYYMMDD}"""
date_str = self.created_at.strftime('%Y%m%d')
- return f'users/{self.created_by_id}/crawls/{date_str}'
+ return f'users/{self.created_by.username}/crawls/{date_str}'
@property
def output_dir_name(self) -> str:
diff --git a/archivebox/plugins/parse_rss_urls/tests/test_parse_rss_urls_comprehensive.py.bak b/archivebox/plugins/parse_rss_urls/tests/test_parse_rss_urls_comprehensive.py.bak
deleted file mode 100644
index 562c6805..00000000
--- a/archivebox/plugins/parse_rss_urls/tests/test_parse_rss_urls_comprehensive.py.bak
+++ /dev/null
@@ -1,1002 +0,0 @@
-#!/usr/bin/env python3
-"""Comprehensive tests for parse_rss_urls extractor covering various RSS/Atom variants."""
-
-import json
-import subprocess
-import sys
-from pathlib import Path
-
-import pytest
-
-PLUGIN_DIR = Path(__file__).parent.parent
-SCRIPT_PATH = next(PLUGIN_DIR.glob('on_Snapshot__*_parse_rss_urls.*'), None)
-
-
-class TestRssVariants:
- """Test various RSS format variants."""
-
- def test_rss_091(self, tmp_path):
- """Test RSS 0.91 format (oldest RSS version)."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- RSS 0.91 Feed
- https://example.com
- Test RSS 0.91
- -
- RSS 0.91 Article
- https://example.com/article1
- An article in RSS 0.91 format
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0, f"Failed: {result.stderr}"
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- assert entry['url'] == 'https://example.com/article1'
- assert entry['title'] == 'RSS 0.91 Article'
- assert entry['plugin'] == 'parse_rss_urls'
-
- def test_rss_10_rdf(self, tmp_path):
- """Test RSS 1.0 (RDF) format."""
- input_file = tmp_path / 'feed.rdf'
- input_file.write_text('''
-
-
- RSS 1.0 Feed
- https://example.com
-
- -
- RDF Item 1
- https://example.com/rdf1
- 2024-01-15T10:30:00Z
- Technology
-
- -
- RDF Item 2
- https://example.com/rdf2
- 2024-01-16T14:20:00Z
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0, f"Failed: {result.stderr}"
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
- entries = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
-
- urls = {e['url'] for e in entries}
- assert 'https://example.com/rdf1' in urls
- assert 'https://example.com/rdf2' in urls
- assert any(e.get('bookmarked_at') for e in entries)
-
- def test_rss_20_with_full_metadata(self, tmp_path):
- """Test RSS 2.0 with all standard metadata fields."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- Full RSS 2.0
- https://example.com
- Complete RSS 2.0 feed
- -
- Complete Article
- https://example.com/complete
- Full description here
- author@example.com
- Technology
- Programming
- https://example.com/complete
- Mon, 15 Jan 2024 10:30:00 GMT
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- content = result.stdout.strip()
- lines = content.split('\n')
-
- # Check for Tag records
- tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
- tag_names = {t['name'] for t in tags}
- assert 'Technology' in tag_names
- assert 'Programming' in tag_names
-
- # Check Snapshot record
- snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
- entry = snapshots[0]
- assert entry['url'] == 'https://example.com/complete'
- assert entry['title'] == 'Complete Article'
- assert 'bookmarked_at' in entry
- assert entry['tags'] == 'Technology,Programming' or entry['tags'] == 'Programming,Technology'
-
-
-class TestAtomVariants:
- """Test various Atom format variants."""
-
- def test_atom_10_full(self, tmp_path):
- """Test Atom 1.0 with full metadata."""
- input_file = tmp_path / 'feed.atom'
- input_file.write_text('''
-
- Atom 1.0 Feed
- 2024-01-15T00:00:00Z
-
- Atom Entry 1
-
- urn:uuid:1234-5678
- 2024-01-15T10:30:00Z
- 2024-01-14T08:00:00Z
-
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip()]
-
- tags = [json.loads(line) for line in lines if json.loads(line).get('type') == 'Tag']
- tag_names = {t['name'] for t in tags}
- assert 'science' in tag_names
- assert 'research' in tag_names
-
- snapshots = [json.loads(line) for line in lines if json.loads(line).get('type') == 'Snapshot']
- entry = snapshots[0]
- assert entry['url'] == 'https://atom.example.com/1'
- assert 'bookmarked_at' in entry
-
- def test_atom_with_alternate_link(self, tmp_path):
- """Test Atom feed with alternate link types."""
- input_file = tmp_path / 'feed.atom'
- input_file.write_text('''
-
- Atom Alternate Links
-
- Entry with alternate
-
-
- 2024-01-15T10:30:00Z
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
- # feedparser should pick the alternate link
- assert 'atom.example.com/article' in entry['url']
-
-
-class TestDateFormats:
- """Test various date format handling."""
-
- def test_rfc822_date(self, tmp_path):
- """Test RFC 822 date format (RSS 2.0 standard)."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- RFC 822 Date
- https://example.com/rfc822
- Wed, 15 Jan 2020 10:30:45 GMT
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
- assert 'bookmarked_at' in entry
- assert '2020-01-15' in entry['bookmarked_at']
-
- def test_iso8601_date(self, tmp_path):
- """Test ISO 8601 date format (Atom standard)."""
- input_file = tmp_path / 'feed.atom'
- input_file.write_text('''
-
-
- ISO 8601 Date
-
- 2024-01-15T10:30:45.123Z
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
- assert 'bookmarked_at' in entry
- assert '2024-01-15' in entry['bookmarked_at']
-
- def test_updated_vs_published_date(self, tmp_path):
- """Test that published date is preferred over updated date."""
- input_file = tmp_path / 'feed.atom'
- input_file.write_text('''
-
-
- Date Priority Test
-
- 2024-01-10T10:00:00Z
- 2024-01-15T10:00:00Z
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
- # Should use published date (Jan 10) not updated date (Jan 15)
- assert '2024-01-10' in entry['bookmarked_at']
-
- def test_only_updated_date(self, tmp_path):
- """Test fallback to updated date when published is missing."""
- input_file = tmp_path / 'feed.atom'
- input_file.write_text('''
-
-
- Only Updated
-
- 2024-01-20T10:00:00Z
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
- assert '2024-01-20' in entry['bookmarked_at']
-
- def test_no_date(self, tmp_path):
- """Test entries without any date."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- No Date
- https://example.com/nodate
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
- assert 'bookmarked_at' not in entry
-
-
-class TestTagsAndCategories:
- """Test various tag and category formats."""
-
- def test_rss_categories(self, tmp_path):
- """Test RSS 2.0 category elements."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- Multi Category
- https://example.com/cats
- Tech
- Web
- Programming
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
-
- tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
- tag_names = {t['name'] for t in tags}
- assert 'Tech' in tag_names
- assert 'Web' in tag_names
- assert 'Programming' in tag_names
-
- snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
- entry = snapshots[0]
- tags_list = entry['tags'].split(',')
- assert len(tags_list) == 3
-
- def test_atom_categories(self, tmp_path):
- """Test Atom category elements with various attributes."""
- input_file = tmp_path / 'feed.atom'
- input_file.write_text('''
-
-
- Atom Categories
-
-
-
- 2024-01-15T10:00:00Z
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
-
- tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
- tag_names = {t['name'] for t in tags}
- # feedparser extracts the 'term' attribute
- assert 'python' in tag_names
- assert 'django' in tag_names
-
- def test_no_tags(self, tmp_path):
- """Test entries without tags."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- No Tags
- https://example.com/notags
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
- assert 'tags' not in entry or entry['tags'] == ''
-
- def test_duplicate_tags(self, tmp_path):
- """Test that duplicate tags are handled properly."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- Duplicate Tags
- https://example.com/dups
- Python
- Python
- Web
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
- tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
- # Tag records should be unique
- tag_names = [t['name'] for t in tags]
- assert tag_names.count('Python') == 1
-
-
-class TestCustomNamespaces:
- """Test custom namespace handling (Dublin Core, Media RSS, etc.)."""
-
- def test_dublin_core_metadata(self, tmp_path):
- """Test Dublin Core namespace fields."""
- input_file = tmp_path / 'feed.rdf'
- input_file.write_text('''
-
-
- Dublin Core Feed
-
- -
- Dublin Core Article
- https://example.com/dc1
- John Doe
- Technology
- 2024-01-15T10:30:00Z
- Copyright 2024
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
- snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
- entry = snapshots[0]
-
- assert entry['url'] == 'https://example.com/dc1'
- assert entry['title'] == 'Dublin Core Article'
- # feedparser should parse dc:date as bookmarked_at
- assert 'bookmarked_at' in entry
-
- def test_media_rss_namespace(self, tmp_path):
- """Test Media RSS namespace (common in podcast feeds)."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- Media RSS Feed
- -
- Podcast Episode 1
- https://example.com/podcast/1
-
-
- Mon, 15 Jan 2024 10:00:00 GMT
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- assert entry['url'] == 'https://example.com/podcast/1'
- assert entry['title'] == 'Podcast Episode 1'
-
- def test_itunes_namespace(self, tmp_path):
- """Test iTunes namespace (common in podcast feeds)."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- iTunes Podcast
- -
- Episode 1: Getting Started
- https://example.com/ep1
- Jane Smith
- 45:30
- programming, tutorial, beginner
- Tue, 16 Jan 2024 08:00:00 GMT
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
- snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
- entry = snapshots[0]
-
- assert entry['url'] == 'https://example.com/ep1'
- assert entry['title'] == 'Episode 1: Getting Started'
-
-
-class TestEdgeCases:
- """Test edge cases and malformed data."""
-
- def test_missing_title(self, tmp_path):
- """Test entries without title."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- https://example.com/notitle
- Mon, 15 Jan 2024 10:00:00 GMT
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- assert entry['url'] == 'https://example.com/notitle'
- assert 'title' not in entry
-
- def test_missing_link(self, tmp_path):
- """Test entries without link (should be skipped)."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- No Link
- This entry has no link
-
- -
- Has Link
- https://example.com/haslink
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- # Should only have the entry with a link
- assert entry['url'] == 'https://example.com/haslink'
- assert '1 URL' in result.stdout
-
- def test_html_entities_in_title(self, tmp_path):
- """Test HTML entities in titles are properly decoded."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- Using <div> & <span> tags
- https://example.com/html
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- assert entry['title'] == 'Using
&
tags'
-
- def test_special_characters_in_tags(self, tmp_path):
- """Test special characters in tags."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- Special Tags
- https://example.com/special
- C++
- Node.js
- Web/Mobile
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
-
- tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
- tag_names = {t['name'] for t in tags}
- assert 'C++' in tag_names
- assert 'Node.js' in tag_names
- assert 'Web/Mobile' in tag_names
-
- def test_cdata_sections(self, tmp_path):
- """Test CDATA sections in titles and descriptions."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- HTML in titles]]>
- https://example.com/cdata
- markup]]>
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- # feedparser should strip HTML tags
- assert 'HTML' in entry['title']
- assert entry['url'] == 'https://example.com/cdata'
-
- def test_relative_urls(self, tmp_path):
- """Test that relative URLs are preserved (feedparser handles them)."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- https://example.com
- -
- Relative URL
- /article/relative
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- # feedparser may convert relative to absolute, or leave as-is
- assert 'article/relative' in entry['url']
-
- def test_unicode_characters(self, tmp_path):
- """Test Unicode characters in feed content."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- -
- Unicode: 日本語 Français 中文 العربية
- https://example.com/unicode
- 日本語
- Français
-
-
-
- ''', encoding='utf-8')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip()]
-
- snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
- entry = snapshots[0]
- assert '日本語' in entry['title']
- assert 'Français' in entry['title']
-
- def test_very_long_title(self, tmp_path):
- """Test handling of very long titles."""
- long_title = 'A' * 1000
- input_file = tmp_path / 'feed.rss'
- input_file.write_text(f'''
-
-
- -
- {long_title}
- https://example.com/long
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- assert len(entry['title']) == 1000
- assert entry['title'] == long_title
-
- def test_multiple_entries_batch(self, tmp_path):
- """Test processing a large batch of entries."""
- items = []
- for i in range(100):
- items.append(f'''
- -
- Article {i}
- https://example.com/article/{i}
- Tag{i % 10}
- Mon, {15 + (i % 15)} Jan 2024 10:00:00 GMT
-
- ''')
-
- input_file = tmp_path / 'feed.rss'
- input_file.write_text(f'''
-
-
- Large Feed
- {''.join(items)}
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- assert 'Found 100 URLs' in result.stdout
-
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
-
- # Should have 10 unique tags (Tag0-Tag9) + 100 snapshots
- tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
- snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
-
- assert len(tags) == 10
- assert len(snapshots) == 100
-
-
-class TestRealWorldFeeds:
- """Test patterns from real-world RSS feeds."""
-
- def test_medium_style_feed(self, tmp_path):
- """Test Medium-style feed structure."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
-
- Medium Feed
- -
- Article Title
- https://medium.com/@user/article-slug-123abc
- https://medium.com/p/123abc
- Wed, 15 Jan 2024 10:30:00 GMT
- Programming
- JavaScript
- Author Name
-
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
-
- snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
- entry = snapshots[0]
- assert 'medium.com' in entry['url']
- assert entry['title'] == 'Article Title'
-
- def test_reddit_style_feed(self, tmp_path):
- """Test Reddit-style feed structure."""
- input_file = tmp_path / 'feed.rss'
- input_file.write_text('''
-
- Reddit Feed
-
- Post Title
-
- 2024-01-15T10:30:00+00:00
-
- t3_abc123
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if line.strip() and '\"type\": \"Snapshot\"' in line]
-
- snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
- entry = snapshots[0]
- assert 'reddit.com' in entry['url']
-
- def test_youtube_style_feed(self, tmp_path):
- """Test YouTube-style feed structure."""
- input_file = tmp_path / 'feed.atom'
- input_file.write_text('''
-
- YouTube Channel
-
- Video Title
-
- 2024-01-15T10:30:00+00:00
- dQw4w9WgXcQ
- UCxxxxxxxx
-
-
- ''')
-
- result = subprocess.run(
- [sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
- cwd=tmp_path,
- capture_output=True,
- text=True,
- )
-
- assert result.returncode == 0
- # Output goes to stdout (JSONL)
- lines = [line for line in result.stdout.strip().split('\n') if '\"type\": \"Snapshot\"' in line]
- entry = json.loads(lines[0])
-
- assert 'youtube.com' in entry['url']
- assert 'dQw4w9WgXcQ' in entry['url']
-
-
-if __name__ == '__main__':
- pytest.main([__file__, '-v'])