more tests and migrations fixes

This commit is contained in:
Nick Sweeting
2025-12-26 18:22:48 -08:00
parent 0fbcbd2616
commit e2cbcd17f6
26 changed files with 3608 additions and 1792 deletions

View File

@@ -9,7 +9,12 @@
"Bash(pkill:*)",
"Bash(python3:*)",
"Bash(sqlite3:*)",
"WebFetch(domain:github.com)"
"WebFetch(domain:github.com)",
"Bash(uv add:*)",
"Bash(mkdir:*)",
"Bash(chmod:*)",
"Bash(python -m forum_dl:*)",
"Bash(archivebox manage migrate:*)"
]
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,27 @@
# Generated by Django 6.0 on 2025-12-27 01:40
import base_models.models
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('api', '0002_alter_outboundwebhook_options_and_more'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AlterField(
model_name='apitoken',
name='created_by',
field=models.ForeignKey(default=base_models.models.get_or_create_system_user_pk, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL),
),
migrations.AlterField(
model_name='outboundwebhook',
name='created_by',
field=models.ForeignKey(default=base_models.models.get_or_create_system_user_pk, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL),
),
]

View File

@@ -10,6 +10,8 @@ from django.utils import timezone
from django_stubs_ext.db.models import TypedModelMeta
from signal_webhooks.models import WebhookBase
from base_models.models import get_or_create_system_user_pk
def generate_secret_token() -> str:
return secrets.token_hex(16)
@@ -17,7 +19,7 @@ def generate_secret_token() -> str:
class APIToken(models.Model):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=None, null=False)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
token = models.CharField(max_length=32, default=generate_secret_token, unique=True)
@@ -40,7 +42,7 @@ class APIToken(models.Model):
class OutboundWebhook(WebhookBase):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=None, null=False)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)

View File

@@ -50,7 +50,7 @@ class ModelWithUUID(models.Model):
id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
modified_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=None, null=False, db_index=True)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=False, db_index=True)
class Meta(TypedModelMeta):
abstract = True

View File

@@ -0,0 +1,32 @@
# Generated by Django 6.0 on 2025-12-27 01:40
import archivebox.base_models.models
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0026_remove_archiveresult_output_dir_and_more'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AlterField(
model_name='archiveresult',
name='created_by',
field=models.ForeignKey(default=archivebox.base_models.models.get_or_create_system_user_pk, on_delete=django.db.models.deletion.CASCADE, related_name='archiveresult_set', to=settings.AUTH_USER_MODEL),
),
migrations.AlterField(
model_name='snapshot',
name='created_by',
field=models.ForeignKey(default=archivebox.base_models.models.get_or_create_system_user_pk, on_delete=django.db.models.deletion.CASCADE, related_name='snapshot_set', to=settings.AUTH_USER_MODEL),
),
migrations.AlterField(
model_name='snapshot',
name='tags',
field=models.ManyToManyField(blank=True, related_name='snapshot_set', through='core.SnapshotTag', through_fields=('snapshot', 'tag'), to='core.tag'),
),
]

View File

@@ -0,0 +1,19 @@
# Generated by Django 6.0 on 2025-12-27 01:40
import pathlib
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('crawls', '0002_drop_seed_model'),
]
operations = [
migrations.AlterField(
model_name='crawl',
name='output_dir',
field=models.FilePathField(blank=True, default='', path=pathlib.PurePosixPath('/private/tmp/test_archivebox_migrations/archive')),
),
]

View File

@@ -0,0 +1,46 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": false,
"properties": {
"SAVE_FORUMDL": {
"type": "boolean",
"default": true,
"description": "Enable forum downloading with forum-dl"
},
"FORUMDL_BINARY": {
"type": "string",
"default": "forum-dl",
"description": "Path to forum-dl binary"
},
"FORUMDL_TIMEOUT": {
"type": "integer",
"default": 3600,
"minimum": 30,
"x-fallback": "TIMEOUT",
"description": "Timeout for forum downloads in seconds"
},
"FORUMDL_OUTPUT_FORMAT": {
"type": "string",
"default": "jsonl",
"enum": ["jsonl", "warc", "mbox", "maildir", "mh", "mmdf", "babyl"],
"description": "Output format for forum downloads"
},
"FORUMDL_TEXTIFY": {
"type": "boolean",
"default": false,
"description": "Convert HTML content to plaintext (keep false to preserve HTML)"
},
"FORUMDL_CHECK_SSL_VALIDITY": {
"type": "boolean",
"default": true,
"x-fallback": "CHECK_SSL_VALIDITY",
"description": "Whether to verify SSL certificates"
},
"FORUMDL_EXTRA_ARGS": {
"type": "string",
"default": "",
"description": "Extra arguments for forum-dl (space-separated)"
}
}
}

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
Validation hook for forum-dl.
Runs at crawl start to verify forum-dl binary is available.
Outputs JSONL for InstalledBinary and Machine config updates.
"""
import os
import sys
import json
import shutil
import hashlib
import subprocess
from pathlib import Path
def get_binary_version(abspath: str, version_flag: str = '--version') -> str | None:
"""Get version string from binary."""
try:
result = subprocess.run(
[abspath, version_flag],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout:
first_line = result.stdout.strip().split('\n')[0]
return first_line[:64]
except Exception:
pass
return None
def get_binary_hash(abspath: str) -> str | None:
"""Get SHA256 hash of binary."""
try:
with open(abspath, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
def find_forumdl() -> dict | None:
"""Find forum-dl binary."""
try:
from abx_pkg import Binary, PipProvider, EnvProvider
class ForumdlBinary(Binary):
name: str = 'forum-dl'
binproviders_supported = [PipProvider(), EnvProvider()]
binary = ForumdlBinary()
loaded = binary.load()
if loaded and loaded.abspath:
return {
'name': 'forum-dl',
'abspath': str(loaded.abspath),
'version': str(loaded.version) if loaded.version else None,
'sha256': loaded.sha256 if hasattr(loaded, 'sha256') else None,
'binprovider': loaded.binprovider.name if loaded.binprovider else 'env',
}
except ImportError:
pass
except Exception:
pass
# Fallback to shutil.which
abspath = shutil.which('forum-dl') or os.environ.get('FORUMDL_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'forum-dl',
'abspath': abspath,
'version': get_binary_version(abspath),
'sha256': get_binary_hash(abspath),
'binprovider': 'env',
}
return None
def main():
# Check for forum-dl (required)
forumdl_result = find_forumdl()
missing_deps = []
# Emit results for forum-dl
if forumdl_result and forumdl_result.get('abspath'):
print(json.dumps({
'type': 'InstalledBinary',
'name': forumdl_result['name'],
'abspath': forumdl_result['abspath'],
'version': forumdl_result['version'],
'sha256': forumdl_result['sha256'],
'binprovider': forumdl_result['binprovider'],
}))
print(json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/FORUMDL_BINARY',
'value': forumdl_result['abspath'],
}))
if forumdl_result['version']:
print(json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/FORUMDL_VERSION',
'value': forumdl_result['version'],
}))
else:
print(json.dumps({
'type': 'Dependency',
'bin_name': 'forum-dl',
'bin_providers': 'pip,env',
}))
missing_deps.append('forum-dl')
if missing_deps:
print(f"Missing dependencies: {', '.join(missing_deps)}", file=sys.stderr)
sys.exit(1)
else:
sys.exit(0)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""
Download forum content from a URL using forum-dl.
Usage: on_Snapshot__forumdl.py --url=<url> --snapshot-id=<uuid>
Output: Downloads forum content to $PWD/
Environment variables:
FORUMDL_BINARY: Path to forum-dl binary
FORUMDL_TIMEOUT: Timeout in seconds (default: 3600 for large forums)
FORUMDL_OUTPUT_FORMAT: Output format (default: jsonl)
FORUMDL_TEXTIFY: Convert HTML to plaintext (default: False - keeps HTML)
FORUMDL_CHECK_SSL_VALIDITY: Whether to check SSL certificates (default: True)
FORUMDL_EXTRA_ARGS: Extra arguments for forum-dl (space-separated)
# Forum-dl feature toggles
SAVE_FORUMDL: Enable forum-dl forum extraction (default: True)
# Fallback to ARCHIVING_CONFIG values if FORUMDL_* not set:
TIMEOUT: Fallback timeout
CHECK_SSL_VALIDITY: Fallback SSL check
"""
import json
import os
import shutil
import subprocess
import sys
from pathlib import Path
import rich_click as click
# Extractor metadata
EXTRACTOR_NAME = 'forumdl'
BIN_NAME = 'forum-dl'
BIN_PROVIDERS = 'pip,env'
OUTPUT_DIR = '.'
def get_env(name: str, default: str = '') -> str:
return os.environ.get(name, default).strip()
def get_env_bool(name: str, default: bool = False) -> bool:
val = get_env(name, '').lower()
if val in ('true', '1', 'yes', 'on'):
return True
if val in ('false', '0', 'no', 'off'):
return False
return default
def get_env_int(name: str, default: int = 0) -> int:
try:
return int(get_env(name, str(default)))
except ValueError:
return default
def find_forumdl() -> str | None:
"""Find forum-dl binary."""
forumdl = get_env('FORUMDL_BINARY')
if forumdl and os.path.isfile(forumdl):
return forumdl
binary = shutil.which('forum-dl')
if binary:
return binary
return None
def get_version(binary: str) -> str:
"""Get forum-dl version."""
try:
result = subprocess.run([binary, '--version'], capture_output=True, text=True, timeout=10)
return result.stdout.strip()[:64]
except Exception:
return ''
def save_forum(url: str, binary: str) -> tuple[bool, str | None, str]:
"""
Download forum using forum-dl.
Returns: (success, output_path, error_message)
"""
# Get config from env
timeout = get_env_int('FORUMDL_TIMEOUT') or get_env_int('TIMEOUT', 3600)
check_ssl = get_env_bool('FORUMDL_CHECK_SSL_VALIDITY', get_env_bool('CHECK_SSL_VALIDITY', True))
textify = get_env_bool('FORUMDL_TEXTIFY', False)
extra_args = get_env('FORUMDL_EXTRA_ARGS', '')
output_format = get_env('FORUMDL_OUTPUT_FORMAT', 'jsonl')
# Output directory is current directory (hook already runs in output dir)
output_dir = Path(OUTPUT_DIR)
# Build output filename based on format
if output_format == 'warc':
output_file = output_dir / 'forum.warc.gz'
elif output_format == 'jsonl':
output_file = output_dir / 'forum.jsonl'
elif output_format == 'maildir':
output_file = output_dir / 'forum' # maildir is a directory
elif output_format in ('mbox', 'mh', 'mmdf', 'babyl'):
output_file = output_dir / f'forum.{output_format}'
else:
output_file = output_dir / f'forum.{output_format}'
# Build command
cmd = [binary, '-f', output_format, '-o', str(output_file)]
if textify:
cmd.append('--textify')
if not check_ssl:
cmd.append('--no-check-certificate')
if extra_args:
cmd.extend(extra_args.split())
cmd.append(url)
try:
result = subprocess.run(cmd, capture_output=True, timeout=timeout, text=True)
# Check if output file was created
if output_file.exists() and output_file.stat().st_size > 0:
return True, str(output_file), ''
else:
stderr = result.stderr
# These are NOT errors - page simply has no downloadable forum content
stderr_lower = stderr.lower()
if 'unsupported url' in stderr_lower:
return True, None, '' # Not a forum site - success, no output
if 'no content' in stderr_lower:
return True, None, '' # No forum found - success, no output
if result.returncode == 0:
return True, None, '' # forum-dl exited cleanly, just no forum - success
# These ARE errors - something went wrong
if '404' in stderr:
return False, None, '404 Not Found'
if '403' in stderr:
return False, None, '403 Forbidden'
if 'unable to extract' in stderr_lower:
return False, None, 'Unable to extract forum info'
return False, None, f'forum-dl error: {stderr[:200]}'
except subprocess.TimeoutExpired:
return False, None, f'Timed out after {timeout} seconds'
except Exception as e:
return False, None, f'{type(e).__name__}: {e}'
@click.command()
@click.option('--url', required=True, help='URL to download forum from')
@click.option('--snapshot-id', required=True, help='Snapshot UUID')
def main(url: str, snapshot_id: str):
"""Download forum content from a URL using forum-dl."""
version = ''
output = None
status = 'failed'
error = ''
binary = None
cmd_str = ''
try:
# Check if forum-dl is enabled
if not get_env_bool('SAVE_FORUMDL', True):
print('Skipping forum-dl (SAVE_FORUMDL=False)')
status = 'skipped'
print(f'STATUS={status}')
print(f'RESULT_JSON={json.dumps({"extractor": EXTRACTOR_NAME, "status": status, "url": url, "snapshot_id": snapshot_id})}')
sys.exit(0)
# Find binary
binary = find_forumdl()
if not binary:
print(f'ERROR: {BIN_NAME} binary not found', file=sys.stderr)
print(f'DEPENDENCY_NEEDED={BIN_NAME}', file=sys.stderr)
print(f'BIN_PROVIDERS={BIN_PROVIDERS}', file=sys.stderr)
print(f'INSTALL_HINT=pip install forum-dl', file=sys.stderr)
sys.exit(1)
version = get_version(binary)
cmd_str = f'{binary} {url}'
# Run extraction
success, output, error = save_forum(url, binary)
status = 'succeeded' if success else 'failed'
if success:
if output:
output_path = Path(output)
file_size = output_path.stat().st_size
print(f'forum-dl completed: {output_path.name} ({file_size} bytes)')
else:
print(f'forum-dl completed: no forum content found on page (this is normal)')
except Exception as e:
error = f'{type(e).__name__}: {e}'
status = 'failed'
# Print results
if cmd_str:
print(f'CMD={cmd_str}')
if version:
print(f'VERSION={version}')
if output:
print(f'OUTPUT={output}')
print(f'STATUS={status}')
if error:
print(f'ERROR={error}', file=sys.stderr)
# Print JSON result
result_json = {
'extractor': EXTRACTOR_NAME,
'url': url,
'snapshot_id': snapshot_id,
'status': status,
'cmd_version': version,
'output': output,
'error': error or None,
}
print(f'RESULT_JSON={json.dumps(result_json)}')
sys.exit(0 if status == 'succeeded' else 1)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,40 @@
<!-- Embedded forum view - renders JSONL forum posts -->
<div class="extractor-embed forumdl-embed" style="width: 100%; max-width: 900px; margin: 0 auto; background: #1a1a1a; padding: 20px; border-radius: 8px;">
<div style="text-align: center; padding: 15px 0; border-bottom: 1px solid #333; margin-bottom: 20px;">
<span style="font-size: 32px;">💬</span>
<h3 style="margin: 10px 0; color: #fff; font-size: 18px;">Forum Thread</h3>
</div>
<div id="forum-posts" style="max-height: 500px; overflow-y: auto; color: #ddd;"></div>
<script>
(async function() {
try {
const response = await fetch('{{ output_path }}');
const text = await response.text();
const posts = text.trim().split('\n').map(line => JSON.parse(line));
const container = document.getElementById('forum-posts');
posts.forEach(post => {
const postDiv = document.createElement('div');
postDiv.style.cssText = 'background: #2a2a2a; padding: 15px; margin-bottom: 15px; border-radius: 5px; border-left: 3px solid #4a9eff;';
const author = post.author || 'Anonymous';
const date = post.date ? new Date(post.date).toLocaleString() : '';
const title = post.title || '';
const content = post.content || post.body || '';
postDiv.innerHTML = `
<div style="display: flex; justify-content: space-between; margin-bottom: 10px; padding-bottom: 8px; border-bottom: 1px solid #444;">
<strong style="color: #4a9eff;">${author}</strong>
<span style="color: #888; font-size: 12px;">${date}</span>
</div>
${title ? `<h4 style="margin: 0 0 10px 0; color: #fff;">${title}</h4>` : ''}
<div style="color: #ccc; line-height: 1.5;">${content}</div>
`;
container.appendChild(postDiv);
});
} catch(e) {
document.getElementById('forum-posts').innerHTML = '<p style="color: #888;">Error loading forum posts</p>';
}
})();
</script>
</div>

View File

@@ -0,0 +1,147 @@
<!-- Fullscreen forum view - renders JSONL forum posts -->
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Forum Thread</title>
<style>
body {
margin: 0;
padding: 20px;
background: #0d1117;
color: #c9d1d9;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Helvetica, Arial, sans-serif;
line-height: 1.6;
}
.header {
max-width: 1000px;
margin: 0 auto 30px;
text-align: center;
padding: 20px;
border-bottom: 1px solid #30363d;
}
.icon {
font-size: 48px;
margin-bottom: 10px;
}
h1 {
margin: 0;
font-size: 28px;
color: #f0f6fc;
}
.container {
max-width: 1000px;
margin: 0 auto;
}
.post {
background: #161b22;
border: 1px solid #30363d;
border-radius: 6px;
margin-bottom: 16px;
padding: 16px;
transition: border-color 0.2s;
}
.post:hover {
border-color: #58a6ff;
}
.post-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 12px;
padding-bottom: 12px;
border-bottom: 1px solid #21262d;
}
.post-author {
font-weight: 600;
color: #58a6ff;
font-size: 14px;
}
.post-date {
color: #8b949e;
font-size: 12px;
}
.post-title {
margin: 0 0 12px 0;
font-size: 18px;
font-weight: 600;
color: #f0f6fc;
}
.post-content {
color: #c9d1d9;
word-wrap: break-word;
}
.post-content img {
max-width: 100%;
height: auto;
border-radius: 4px;
}
.post-content a {
color: #58a6ff;
text-decoration: none;
}
.post-content a:hover {
text-decoration: underline;
}
.loading {
text-align: center;
padding: 40px;
color: #8b949e;
}
</style>
</head>
<body>
<div class="header">
<div class="icon">💬</div>
<h1>Forum Thread</h1>
</div>
<div class="container">
<div id="forum-posts" class="loading">Loading posts...</div>
</div>
<script>
(async function() {
try {
const response = await fetch('{{ output_path }}');
const text = await response.text();
const posts = text.trim().split('\n').filter(line => line).map(line => JSON.parse(line));
const container = document.getElementById('forum-posts');
container.innerHTML = '';
container.className = '';
posts.forEach(post => {
const postDiv = document.createElement('div');
postDiv.className = 'post';
const author = post.author || 'Anonymous';
const date = post.date ? new Date(post.date).toLocaleString() : '';
const title = post.title || '';
const content = post.content || post.body || '';
postDiv.innerHTML = `
<div class="post-header">
<span class="post-author">${escapeHtml(author)}</span>
<span class="post-date">${escapeHtml(date)}</span>
</div>
${title ? `<h2 class="post-title">${escapeHtml(title)}</h2>` : ''}
<div class="post-content">${content}</div>
`;
container.appendChild(postDiv);
});
if (posts.length === 0) {
container.innerHTML = '<div class="loading">No posts found</div>';
}
} catch(e) {
document.getElementById('forum-posts').innerHTML = '<div class="loading">Error loading posts: ' + e.message + '</div>';
}
})();
function escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
</script>
</body>
</html>

View File

@@ -0,0 +1 @@
💬

View File

@@ -0,0 +1,7 @@
<!-- Forum thumbnail - shows icon placeholder -->
<div class="extractor-thumbnail forumdl-thumbnail" style="width: 100%; height: 100px; overflow: hidden; background: #1a1a1a; display: flex; align-items: center; justify-content: center;">
<div style="display: flex; flex-direction: column; align-items: center; color: #888; font-size: 12px;">
<span style="font-size: 32px;">💬</span>
<span>Forum</span>
</div>
</div>

View File

@@ -0,0 +1,157 @@
"""
Integration tests for forumdl plugin
Tests verify:
1. Hook script exists
2. Dependencies installed via validation hooks
3. Verify deps with abx-pkg
4. Forum extraction works on forum URLs
5. JSONL output is correct
6. Config options work
7. Handles non-forum URLs gracefully
"""
import json
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
PLUGIN_DIR = Path(__file__).parent.parent
PLUGINS_ROOT = PLUGIN_DIR.parent
FORUMDL_HOOK = PLUGIN_DIR / 'on_Snapshot__53_forumdl.py'
FORUMDL_VALIDATE_HOOK = PLUGIN_DIR / 'on_Crawl__00_validate_forumdl.py'
TEST_URL = 'https://example.com'
def test_hook_script_exists():
"""Verify on_Snapshot hook exists."""
assert FORUMDL_HOOK.exists(), f"Hook not found: {FORUMDL_HOOK}"
def test_forumdl_validate_hook():
"""Test forum-dl validate hook checks for forum-dl."""
# Run forum-dl validate hook
result = subprocess.run(
[sys.executable, str(FORUMDL_VALIDATE_HOOK)],
capture_output=True,
text=True,
timeout=30
)
# Hook exits 0 if all binaries found, 1 if any not found
# Parse output for InstalledBinary and Dependency records
found_binary = False
found_dependency = False
for line in result.stdout.strip().split('\n'):
if line.strip():
try:
record = json.loads(line)
if record.get('type') == 'InstalledBinary':
if record['name'] == 'forum-dl':
assert record['abspath'], "forum-dl should have abspath"
found_binary = True
elif record.get('type') == 'Dependency':
if record['bin_name'] == 'forum-dl':
found_dependency = True
except json.JSONDecodeError:
pass
# forum-dl should either be found (InstalledBinary) or missing (Dependency)
assert found_binary or found_dependency, \
"forum-dl should have either InstalledBinary or Dependency record"
def test_verify_deps_with_abx_pkg():
"""Verify forum-dl is available via abx-pkg."""
from abx_pkg import Binary, PipProvider, EnvProvider, BinProviderOverrides
missing_binaries = []
# Verify forum-dl is available
forumdl_binary = Binary(name='forum-dl', binproviders=[PipProvider(), EnvProvider()])
forumdl_loaded = forumdl_binary.load()
if not (forumdl_loaded and forumdl_loaded.abspath):
missing_binaries.append('forum-dl')
if missing_binaries:
pytest.skip(f"Binaries not available: {', '.join(missing_binaries)} - Dependency records should have been emitted")
def test_handles_non_forum_url():
"""Test that forum-dl extractor handles non-forum URLs gracefully via hook."""
# Prerequisites checked by earlier test
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# Run forum-dl extraction hook on non-forum URL
result = subprocess.run(
[sys.executable, str(FORUMDL_HOOK), '--url', 'https://example.com', '--snapshot-id', 'test789'],
cwd=tmpdir,
capture_output=True,
text=True,
timeout=60
)
# Should exit 0 even for non-forum URL
assert result.returncode == 0, f"Should handle non-forum URL gracefully: {result.stderr}"
# Verify JSONL output
assert 'STATUS=' in result.stdout, "Should report status"
assert 'RESULT_JSON=' in result.stdout, "Should output RESULT_JSON"
# Parse JSONL result
result_json = None
for line in result.stdout.split('\n'):
if line.startswith('RESULT_JSON='):
result_json = json.loads(line.split('=', 1)[1])
break
assert result_json, "Should have RESULT_JSON"
assert result_json['extractor'] == 'forumdl'
def test_config_save_forumdl_false_skips():
"""Test that SAVE_FORUMDL=False causes skip."""
import os
with tempfile.TemporaryDirectory() as tmpdir:
env = os.environ.copy()
env['SAVE_FORUMDL'] = 'False'
result = subprocess.run(
[sys.executable, str(FORUMDL_HOOK), '--url', TEST_URL, '--snapshot-id', 'test999'],
cwd=tmpdir,
capture_output=True,
text=True,
env=env,
timeout=30
)
assert result.returncode == 0, f"Should exit 0 when skipping: {result.stderr}"
assert 'STATUS=' in result.stdout
def test_config_timeout():
"""Test that FORUMDL_TIMEOUT config is respected."""
import os
with tempfile.TemporaryDirectory() as tmpdir:
env = os.environ.copy()
env['FORUMDL_TIMEOUT'] = '5'
result = subprocess.run(
[sys.executable, str(FORUMDL_HOOK), '--url', 'https://example.com', '--snapshot-id', 'testtimeout'],
cwd=tmpdir,
capture_output=True,
text=True,
env=env,
timeout=30
)
assert result.returncode == 0, "Should complete without hanging"
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -3,31 +3,30 @@
"type": "object",
"additionalProperties": false,
"properties": {
"SAVE_GALLERY_DL": {
"SAVE_GALLERYDL": {
"type": "boolean",
"default": true,
"x-aliases": ["USE_GALLERY_DL", "FETCH_GALLERY"],
"description": "Enable gallery downloading with gallery-dl"
},
"GALLERY_DL_BINARY": {
"GALLERYDL_BINARY": {
"type": "string",
"default": "gallery-dl",
"description": "Path to gallery-dl binary"
},
"GALLERY_DL_TIMEOUT": {
"GALLERYDL_TIMEOUT": {
"type": "integer",
"default": 3600,
"minimum": 30,
"x-fallback": "TIMEOUT",
"description": "Timeout for gallery downloads in seconds"
},
"GALLERY_DL_CHECK_SSL_VALIDITY": {
"GALLERYDL_CHECK_SSL_VALIDITY": {
"type": "boolean",
"default": true,
"x-fallback": "CHECK_SSL_VALIDITY",
"description": "Whether to verify SSL certificates"
},
"GALLERY_DL_ARGS": {
"GALLERYDL_ARGS": {
"type": "array",
"items": {"type": "string"},
"default": [
@@ -36,7 +35,7 @@
],
"description": "Default gallery-dl arguments"
},
"GALLERY_DL_EXTRA_ARGS": {
"GALLERYDL_EXTRA_ARGS": {
"type": "string",
"default": "",
"description": "Extra arguments for gallery-dl (space-separated)"

View File

@@ -66,7 +66,7 @@ def find_gallerydl() -> dict | None:
pass
# Fallback to shutil.which
abspath = shutil.which('gallery-dl') or os.environ.get('GALLERY_DL_BINARY', '')
abspath = shutil.which('gallery-dl') or os.environ.get('GALLERYDL_BINARY', '')
if abspath and Path(abspath).is_file():
return {
'name': 'gallery-dl',
@@ -99,7 +99,7 @@ def main():
print(json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/GALLERY_DL_BINARY',
'key': 'config/GALLERYDL_BINARY',
'value': gallerydl_result['abspath'],
}))
@@ -107,7 +107,7 @@ def main():
print(json.dumps({
'type': 'Machine',
'_method': 'update',
'key': 'config/GALLERY_DL_VERSION',
'key': 'config/GALLERYDL_VERSION',
'value': gallerydl_result['version'],
}))
else:

View File

@@ -6,17 +6,18 @@ Usage: on_Snapshot__gallerydl.py --url=<url> --snapshot-id=<uuid>
Output: Downloads gallery images to $PWD/gallerydl/
Environment variables:
GALLERY_DL_BINARY: Path to gallery-dl binary
GALLERY_DL_TIMEOUT: Timeout in seconds (default: 3600 for large galleries)
GALLERY_DL_CHECK_SSL_VALIDITY: Whether to check SSL certificates (default: True)
GALLERY_DL_EXTRA_ARGS: Extra arguments for gallery-dl (space-separated)
GALLERYDL_BINARY: Path to gallery-dl binary
GALLERYDL_TIMEOUT: Timeout in seconds (default: 3600 for large galleries)
GALLERYDL_CHECK_SSL_VALIDITY: Whether to check SSL certificates (default: True)
GALLERYDL_EXTRA_ARGS: Extra arguments for gallery-dl (space-separated)
COOKIES_FILE: Path to cookies file for authentication
# Gallery-dl feature toggles
USE_GALLERY_DL: Enable gallery-dl gallery extraction (default: True)
SAVE_GALLERY_DL: Alias for USE_GALLERY_DL
USE_GALLERYDL: Enable gallery-dl gallery extraction (default: True)
SAVE_GALLERYDL: Alias for USE_GALLERYDL
# Fallback to ARCHIVING_CONFIG values if GALLERY_DL_* not set:
GALLERY_DL_TIMEOUT: Fallback timeout for gallery downloads
# Fallback to ARCHIVING_CONFIG values if GALLERYDL_* not set:
GALLERYDL_TIMEOUT: Fallback timeout for gallery downloads
TIMEOUT: Fallback timeout
CHECK_SSL_VALIDITY: Fallback SSL check
"""
@@ -26,7 +27,6 @@ import os
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
import rich_click as click
@@ -76,7 +76,7 @@ def has_media_output() -> bool:
def find_gallerydl() -> str | None:
"""Find gallery-dl binary."""
gallerydl = get_env('GALLERY_DL_BINARY')
gallerydl = get_env('GALLERYDL_BINARY')
if gallerydl and os.path.isfile(gallerydl):
return gallerydl
@@ -111,24 +111,29 @@ def save_gallery(url: str, binary: str) -> tuple[bool, str | None, str]:
Returns: (success, output_path, error_message)
"""
# Get config from env (with GALLERY_DL_ prefix or fallback to ARCHIVING_CONFIG style)
timeout = get_env_int('GALLERY_DL_TIMEOUT') or get_env_int('TIMEOUT', 3600)
check_ssl = get_env_bool('GALLERY_DL_CHECK_SSL_VALIDITY', get_env_bool('CHECK_SSL_VALIDITY', True))
extra_args = get_env('GALLERY_DL_EXTRA_ARGS', '')
# Get config from env (with GALLERYDL_ prefix or fallback to ARCHIVING_CONFIG style)
timeout = get_env_int('GALLERYDL_TIMEOUT') or get_env_int('TIMEOUT', 3600)
check_ssl = get_env_bool('GALLERYDL_CHECK_SSL_VALIDITY', get_env_bool('CHECK_SSL_VALIDITY', True))
extra_args = get_env('GALLERYDL_EXTRA_ARGS', '')
cookies_file = get_env('COOKIES_FILE', '')
# Output directory is current directory (hook already runs in output dir)
output_dir = Path(OUTPUT_DIR)
# Build command (later options take precedence)
# Use -D for exact directory (flat structure) instead of -d (nested structure)
cmd = [
binary,
*get_gallerydl_default_args(),
'-d', str(output_dir),
'-D', str(output_dir),
]
if not check_ssl:
cmd.append('--no-check-certificate')
if cookies_file and Path(cookies_file).exists():
cmd.extend(['-C', cookies_file])
if extra_args:
cmd.extend(extra_args.split())
@@ -137,7 +142,7 @@ def save_gallery(url: str, binary: str) -> tuple[bool, str | None, str]:
try:
result = subprocess.run(cmd, capture_output=True, timeout=timeout, text=True)
# Check if any gallery files were downloaded
# Check if any gallery files were downloaded (search recursively)
gallery_extensions = (
'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg',
'.mp4', '.webm', '.mkv', '.avi', '.mov', '.flv',
@@ -145,7 +150,7 @@ def save_gallery(url: str, binary: str) -> tuple[bool, str | None, str]:
)
downloaded_files = [
f for f in output_dir.glob('*')
f for f in output_dir.rglob('*')
if f.is_file() and f.suffix.lower() in gallery_extensions
]
@@ -162,9 +167,10 @@ def save_gallery(url: str, binary: str) -> tuple[bool, str | None, str]:
# These are NOT errors - page simply has no downloadable gallery
# Return success with no output (legitimate "nothing to download")
if 'unsupported URL' in stderr.lower():
stderr_lower = stderr.lower()
if 'unsupported url' in stderr_lower:
return True, None, '' # Not a gallery site - success, no output
if 'no results' in stderr.lower():
if 'no results' in stderr_lower:
return True, None, '' # No gallery found - success, no output
if result.returncode == 0:
return True, None, '' # gallery-dl exited cleanly, just no gallery - success
@@ -174,7 +180,7 @@ def save_gallery(url: str, binary: str) -> tuple[bool, str | None, str]:
return False, None, '404 Not Found'
if '403' in stderr:
return False, None, '403 Forbidden'
if 'Unable to extract' in stderr:
if 'unable to extract' in stderr_lower:
return False, None, 'Unable to extract gallery info'
return False, None, f'gallery-dl error: {stderr[:200]}'
@@ -191,7 +197,6 @@ def save_gallery(url: str, binary: str) -> tuple[bool, str | None, str]:
def main(url: str, snapshot_id: str):
"""Download image gallery from a URL using gallery-dl."""
start_ts = datetime.now(timezone.utc)
version = ''
output = None
status = 'failed'
@@ -201,12 +206,9 @@ def main(url: str, snapshot_id: str):
try:
# Check if gallery-dl is enabled
if not (get_env_bool('USE_GALLERY_DL', True) and get_env_bool('SAVE_GALLERY_DL', True)):
print('Skipping gallery-dl (USE_GALLERY_DL=False or SAVE_GALLERY_DL=False)')
if not (get_env_bool('USE_GALLERYDL', True) and get_env_bool('SAVE_GALLERYDL', True)):
print('Skipping gallery-dl (USE_GALLERYDL=False or SAVE_GALLERYDL=False)')
status = 'skipped'
end_ts = datetime.now(timezone.utc)
print(f'START_TS={start_ts.isoformat()}')
print(f'END_TS={end_ts.isoformat()}')
print(f'STATUS={status}')
print(f'RESULT_JSON={json.dumps({"extractor": EXTRACTOR_NAME, "status": status, "url": url, "snapshot_id": snapshot_id})}')
sys.exit(0)
@@ -215,8 +217,6 @@ def main(url: str, snapshot_id: str):
if has_staticfile_output():
print(f'Skipping gallery-dl - staticfile extractor already downloaded this')
status = 'skipped'
print(f'START_TS={start_ts.isoformat()}')
print(f'END_TS={datetime.now(timezone.utc).isoformat()}')
print(f'STATUS={status}')
print(f'RESULT_JSON={json.dumps({"extractor": EXTRACTOR_NAME, "status": status, "url": url, "snapshot_id": snapshot_id})}')
sys.exit(0)
@@ -224,8 +224,6 @@ def main(url: str, snapshot_id: str):
if has_media_output():
print(f'Skipping gallery-dl - media extractor already downloaded this')
status = 'skipped'
print(f'START_TS={start_ts.isoformat()}')
print(f'END_TS={datetime.now(timezone.utc).isoformat()}')
print(f'STATUS={status}')
print(f'RESULT_JSON={json.dumps({"extractor": EXTRACTOR_NAME, "status": status, "url": url, "snapshot_id": snapshot_id})}')
sys.exit(0)
@@ -260,12 +258,6 @@ def main(url: str, snapshot_id: str):
status = 'failed'
# Print results
end_ts = datetime.now(timezone.utc)
duration = (end_ts - start_ts).total_seconds()
print(f'START_TS={start_ts.isoformat()}')
print(f'END_TS={end_ts.isoformat()}')
print(f'DURATION={duration:.2f}')
if cmd_str:
print(f'CMD={cmd_str}')
if version:
@@ -283,9 +275,6 @@ def main(url: str, snapshot_id: str):
'url': url,
'snapshot_id': snapshot_id,
'status': status,
'start_ts': start_ts.isoformat(),
'end_ts': end_ts.isoformat(),
'duration': round(duration, 2),
'cmd_version': version,
'output': output,
'error': error or None,

View File

@@ -0,0 +1,157 @@
"""
Integration tests for gallerydl plugin
Tests verify:
1. Hook script exists
2. Dependencies installed via validation hooks
3. Verify deps with abx-pkg
4. Gallery extraction works on gallery URLs
5. JSONL output is correct
6. Config options work
7. Handles non-gallery URLs gracefully
"""
import json
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
PLUGIN_DIR = Path(__file__).parent.parent
PLUGINS_ROOT = PLUGIN_DIR.parent
GALLERYDL_HOOK = PLUGIN_DIR / 'on_Snapshot__52_gallerydl.py'
GALLERYDL_VALIDATE_HOOK = PLUGIN_DIR / 'on_Crawl__00_validate_gallerydl.py'
TEST_URL = 'https://example.com'
def test_hook_script_exists():
"""Verify on_Snapshot hook exists."""
assert GALLERYDL_HOOK.exists(), f"Hook not found: {GALLERYDL_HOOK}"
def test_gallerydl_validate_hook():
"""Test gallery-dl validate hook checks for gallery-dl."""
# Run gallery-dl validate hook
result = subprocess.run(
[sys.executable, str(GALLERYDL_VALIDATE_HOOK)],
capture_output=True,
text=True,
timeout=30
)
# Hook exits 0 if all binaries found, 1 if any not found
# Parse output for InstalledBinary and Dependency records
found_binary = False
found_dependency = False
for line in result.stdout.strip().split('\n'):
if line.strip():
try:
record = json.loads(line)
if record.get('type') == 'InstalledBinary':
if record['name'] == 'gallery-dl':
assert record['abspath'], "gallery-dl should have abspath"
found_binary = True
elif record.get('type') == 'Dependency':
if record['bin_name'] == 'gallery-dl':
found_dependency = True
except json.JSONDecodeError:
pass
# gallery-dl should either be found (InstalledBinary) or missing (Dependency)
assert found_binary or found_dependency, \
"gallery-dl should have either InstalledBinary or Dependency record"
def test_verify_deps_with_abx_pkg():
"""Verify gallery-dl is available via abx-pkg."""
from abx_pkg import Binary, PipProvider, EnvProvider, BinProviderOverrides
missing_binaries = []
# Verify gallery-dl is available
gallerydl_binary = Binary(name='gallery-dl', binproviders=[PipProvider(), EnvProvider()])
gallerydl_loaded = gallerydl_binary.load()
if not (gallerydl_loaded and gallerydl_loaded.abspath):
missing_binaries.append('gallery-dl')
if missing_binaries:
pytest.skip(f"Binaries not available: {', '.join(missing_binaries)} - Dependency records should have been emitted")
def test_handles_non_gallery_url():
"""Test that gallery-dl extractor handles non-gallery URLs gracefully via hook."""
# Prerequisites checked by earlier test
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# Run gallery-dl extraction hook on non-gallery URL
result = subprocess.run(
[sys.executable, str(GALLERYDL_HOOK), '--url', 'https://example.com', '--snapshot-id', 'test789'],
cwd=tmpdir,
capture_output=True,
text=True,
timeout=60
)
# Should exit 0 even for non-gallery URL
assert result.returncode == 0, f"Should handle non-gallery URL gracefully: {result.stderr}"
# Verify JSONL output
assert 'STATUS=' in result.stdout, "Should report status"
assert 'RESULT_JSON=' in result.stdout, "Should output RESULT_JSON"
# Parse JSONL result
result_json = None
for line in result.stdout.split('\n'):
if line.startswith('RESULT_JSON='):
result_json = json.loads(line.split('=', 1)[1])
break
assert result_json, "Should have RESULT_JSON"
assert result_json['extractor'] == 'gallerydl'
def test_config_save_gallery_dl_false_skips():
"""Test that SAVE_GALLERYDL=False causes skip."""
import os
with tempfile.TemporaryDirectory() as tmpdir:
env = os.environ.copy()
env['SAVE_GALLERYDL'] = 'False'
result = subprocess.run(
[sys.executable, str(GALLERYDL_HOOK), '--url', TEST_URL, '--snapshot-id', 'test999'],
cwd=tmpdir,
capture_output=True,
text=True,
env=env,
timeout=30
)
assert result.returncode == 0, f"Should exit 0 when skipping: {result.stderr}"
assert 'STATUS=' in result.stdout
def test_config_timeout():
"""Test that GALLERY_DL_TIMEOUT config is respected."""
import os
with tempfile.TemporaryDirectory() as tmpdir:
env = os.environ.copy()
env['GALLERY_DL_TIMEOUT'] = '5'
result = subprocess.run(
[sys.executable, str(GALLERYDL_HOOK), '--url', 'https://example.com', '--snapshot-id', 'testtimeout'],
cwd=tmpdir,
capture_output=True,
text=True,
env=env,
timeout=30
)
assert result.returncode == 0, "Should complete without hanging"
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,29 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": false,
"properties": {
"SAVE_PAPERSDL": {
"type": "boolean",
"default": true,
"description": "Enable paper downloading with papers-dl"
},
"PAPERSDL_BINARY": {
"type": "string",
"default": "papers-dl",
"description": "Path to papers-dl binary"
},
"PAPERSDL_TIMEOUT": {
"type": "integer",
"default": 300,
"minimum": 30,
"x-fallback": "TIMEOUT",
"description": "Timeout for paper downloads in seconds"
},
"PAPERSDL_EXTRA_ARGS": {
"type": "string",
"default": "",
"description": "Extra arguments for papers-dl (space-separated)"
}
}
}

View File

@@ -24,14 +24,125 @@ import rich_click as click
EXTRACTOR_NAME = 'parse_netscape_urls'
# Constants for timestamp epoch detection
UNIX_EPOCH = 0 # 1970-01-01 00:00:00 UTC
MAC_COCOA_EPOCH = 978307200 # 2001-01-01 00:00:00 UTC (Mac/Cocoa/NSDate epoch)
# Reasonable date range for bookmarks (to detect correct epoch/unit)
MIN_REASONABLE_YEAR = 1995 # Netscape Navigator era
MAX_REASONABLE_YEAR = 2035 # Far enough in future
# Regex pattern for Netscape bookmark format
# Example: <DT><A HREF="https://example.com/?q=1+2" ADD_DATE="1497562974" TAGS="tag1,tag2">example title</A>
# Make ADD_DATE optional and allow negative numbers
NETSCAPE_PATTERN = re.compile(
r'<a\s+href="([^"]+)"\s+add_date="(\d+)"(?:\s+[^>]*?tags="([^"]*)")?[^>]*>([^<]+)</a>',
r'<a\s+href="([^"]+)"(?:\s+add_date="([^"]*)")?(?:\s+[^>]*?tags="([^"]*)")?[^>]*>([^<]+)</a>',
re.UNICODE | re.IGNORECASE
)
def parse_timestamp(timestamp_str: str) -> datetime | None:
"""
Intelligently parse bookmark timestamp with auto-detection of format and epoch.
Browsers use different timestamp formats:
- Firefox: Unix epoch (1970) in seconds (10 digits): 1609459200
- Safari: Mac/Cocoa epoch (2001) in seconds (9-10 digits): 631152000
- Chrome: Unix epoch in microseconds (16 digits): 1609459200000000
- Others: Unix epoch in milliseconds (13 digits): 1609459200000
Strategy:
1. Try parsing with different epoch + unit combinations
2. Pick the one that yields a reasonable date (1995-2035)
3. Prioritize more common formats (Unix seconds, then Mac seconds, etc.)
"""
if not timestamp_str or timestamp_str == '':
return None
try:
timestamp_num = float(timestamp_str)
except (ValueError, TypeError):
return None
# Detect sign and work with absolute value
is_negative = timestamp_num < 0
abs_timestamp = abs(timestamp_num)
# Determine number of digits to guess the unit
if abs_timestamp == 0:
num_digits = 1
else:
num_digits = len(str(int(abs_timestamp)))
# Try different interpretations in order of likelihood
candidates = []
# Unix epoch seconds (10-11 digits) - Most common: Firefox, Chrome HTML export
if 9 <= num_digits <= 11:
try:
dt = datetime.fromtimestamp(timestamp_num, tz=timezone.utc)
if MIN_REASONABLE_YEAR <= dt.year <= MAX_REASONABLE_YEAR:
candidates.append((dt, 'unix_seconds', 100)) # Highest priority
except (ValueError, OSError, OverflowError):
pass
# Mac/Cocoa epoch seconds (9-10 digits) - Safari
# Only consider if Unix seconds didn't work or gave unreasonable date
if 8 <= num_digits <= 11:
try:
dt = datetime.fromtimestamp(timestamp_num + MAC_COCOA_EPOCH, tz=timezone.utc)
if MIN_REASONABLE_YEAR <= dt.year <= MAX_REASONABLE_YEAR:
candidates.append((dt, 'mac_seconds', 90))
except (ValueError, OSError, OverflowError):
pass
# Unix epoch milliseconds (13 digits) - JavaScript exports
if 12 <= num_digits <= 14:
try:
dt = datetime.fromtimestamp(timestamp_num / 1000, tz=timezone.utc)
if MIN_REASONABLE_YEAR <= dt.year <= MAX_REASONABLE_YEAR:
candidates.append((dt, 'unix_milliseconds', 95))
except (ValueError, OSError, OverflowError):
pass
# Mac/Cocoa epoch milliseconds (12-13 digits) - Rare
if 11 <= num_digits <= 14:
try:
dt = datetime.fromtimestamp((timestamp_num / 1000) + MAC_COCOA_EPOCH, tz=timezone.utc)
if MIN_REASONABLE_YEAR <= dt.year <= MAX_REASONABLE_YEAR:
candidates.append((dt, 'mac_milliseconds', 85))
except (ValueError, OSError, OverflowError):
pass
# Unix epoch microseconds (16-17 digits) - Chrome WebKit timestamps
if 15 <= num_digits <= 18:
try:
dt = datetime.fromtimestamp(timestamp_num / 1_000_000, tz=timezone.utc)
if MIN_REASONABLE_YEAR <= dt.year <= MAX_REASONABLE_YEAR:
candidates.append((dt, 'unix_microseconds', 98))
except (ValueError, OSError, OverflowError):
pass
# Mac/Cocoa epoch microseconds (15-16 digits) - Very rare
if 14 <= num_digits <= 18:
try:
dt = datetime.fromtimestamp((timestamp_num / 1_000_000) + MAC_COCOA_EPOCH, tz=timezone.utc)
if MIN_REASONABLE_YEAR <= dt.year <= MAX_REASONABLE_YEAR:
candidates.append((dt, 'mac_microseconds', 80))
except (ValueError, OSError, OverflowError):
pass
# If no candidates found, return None
if not candidates:
return None
# Sort by priority (highest first) and return best match
candidates.sort(key=lambda x: x[2], reverse=True)
best_dt, best_format, _ = candidates[0]
return best_dt
def fetch_content(url: str) -> str:
"""Fetch content from a URL (supports file:// and https://)."""
parsed = urlparse(url)
@@ -69,6 +180,7 @@ def main(url: str, snapshot_id: str = None):
match = NETSCAPE_PATTERN.search(line)
if match:
bookmark_url = match.group(1)
timestamp_str = match.group(2)
tags_str = match.group(3) or ''
title = match.group(4).strip()
@@ -86,11 +198,13 @@ def main(url: str, snapshot_id: str = None):
tag = tag.strip()
if tag:
all_tags.add(tag)
try:
# Convert unix timestamp to ISO 8601
entry['bookmarked_at'] = datetime.fromtimestamp(float(match.group(2)), tz=timezone.utc).isoformat()
except (ValueError, OSError):
pass
# Parse timestamp with intelligent format detection
if timestamp_str:
dt = parse_timestamp(timestamp_str)
if dt:
entry['bookmarked_at'] = dt.isoformat()
urls_found.append(entry)
if not urls_found:

View File

@@ -0,0 +1,930 @@
#!/usr/bin/env python3
"""Comprehensive tests for parse_netscape_urls extractor covering various browser formats."""
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
import pytest
PLUGIN_DIR = Path(__file__).parent.parent
SCRIPT_PATH = next(PLUGIN_DIR.glob('on_Snapshot__*_parse_netscape_urls.py'), None)
class TestFirefoxFormat:
"""Test Firefox Netscape bookmark export format."""
def test_firefox_basic_format(self, tmp_path):
"""Test standard Firefox export format with Unix timestamps in seconds."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<!-- This is an automatically generated file.
It will be read and overwritten.
DO NOT EDIT! -->
<META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=UTF-8">
<TITLE>Bookmarks</TITLE>
<H1>Bookmarks Menu</H1>
<DL><p>
<DT><A HREF="https://example.com" ADD_DATE="1609459200" LAST_MODIFIED="1609545600">Example Site</A>
<DT><A HREF="https://mozilla.org" ADD_DATE="1640995200">Mozilla</A>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
assert len(entries) == 2
assert entries[0]['url'] == 'https://example.com'
assert entries[0]['title'] == 'Example Site'
# Timestamp should be parsed as seconds (Jan 1, 2021)
assert '2021-01-01' in entries[0]['bookmarked_at']
# Second bookmark (Jan 1, 2022)
assert '2022-01-01' in entries[1]['bookmarked_at']
def test_firefox_with_tags(self, tmp_path):
"""Test Firefox bookmarks with tags."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<DL><p>
<DT><A HREF="https://example.com" ADD_DATE="1609459200" TAGS="coding,tutorial,python">Python Tutorial</A>
<DT><A HREF="https://rust-lang.org" ADD_DATE="1609459200" TAGS="coding,rust">Rust Lang</A>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
# Should have Tag records + Snapshot records
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
tag_names = {t['name'] for t in tags}
assert 'coding' in tag_names
assert 'tutorial' in tag_names
assert 'python' in tag_names
assert 'rust' in tag_names
assert snapshots[0]['tags'] == 'coding,tutorial,python'
assert snapshots[1]['tags'] == 'coding,rust'
def test_firefox_nested_folders(self, tmp_path):
"""Test Firefox bookmark folders and nested structure."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<DL><p>
<DT><H3 ADD_DATE="1609459200" LAST_MODIFIED="1609545600">Toolbar</H3>
<DL><p>
<DT><A HREF="https://github.com" ADD_DATE="1609459200">GitHub</A>
<DT><H3 ADD_DATE="1609459200" LAST_MODIFIED="1609545600">Development</H3>
<DL><p>
<DT><A HREF="https://stackoverflow.com" ADD_DATE="1609459200">Stack Overflow</A>
<DT><A HREF="https://developer.mozilla.org" ADD_DATE="1609459200">MDN</A>
</DL><p>
</DL><p>
<DT><A HREF="https://news.ycombinator.com" ADD_DATE="1609459200">Hacker News</A>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
urls = {e['url'] for e in entries}
assert 'https://github.com' in urls
assert 'https://stackoverflow.com' in urls
assert 'https://developer.mozilla.org' in urls
assert 'https://news.ycombinator.com' in urls
assert len(entries) == 4
def test_firefox_icon_and_icon_uri(self, tmp_path):
"""Test Firefox bookmarks with ICON and ICON_URI attributes."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<DL><p>
<DT><A HREF="https://example.com" ADD_DATE="1609459200" ICON="data:image/png;base64,iVBORw0K">Example</A>
<DT><A HREF="https://github.com" ADD_DATE="1609459200" ICON_URI="https://github.com/favicon.ico">GitHub</A>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
assert entries[0]['url'] == 'https://example.com'
assert entries[1]['url'] == 'https://github.com'
class TestChromeFormat:
"""Test Chrome/Chromium Netscape bookmark export format."""
def test_chrome_microsecond_timestamps(self, tmp_path):
"""Test Chrome format with microsecond timestamps (16-17 digits)."""
input_file = tmp_path / 'bookmarks.html'
# Chrome uses WebKit/Chrome timestamps which are microseconds
# 1609459200000000 = Jan 1, 2021 00:00:00 in microseconds
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=UTF-8">
<TITLE>Bookmarks</TITLE>
<H1>Bookmarks</H1>
<DL><p>
<DT><A HREF="https://google.com" ADD_DATE="1609459200000000">Google</A>
<DT><A HREF="https://chrome.google.com" ADD_DATE="1640995200000000">Chrome</A>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
# Should correctly parse microsecond timestamps
# Currently will fail - we'll fix the parser after writing tests
assert entries[0]['url'] == 'https://google.com'
# Timestamp should be around Jan 1, 2021, not year 52970!
if 'bookmarked_at' in entries[0]:
year = datetime.fromisoformat(entries[0]['bookmarked_at']).year
# Should be 2021, not some far future date
assert 2020 <= year <= 2025, f"Year should be ~2021, got {year}"
def test_chrome_with_folders(self, tmp_path):
"""Test Chrome bookmark folder structure."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<DL><p>
<DT><H3 ADD_DATE="1609459200" LAST_MODIFIED="1609459200" PERSONAL_TOOLBAR_FOLDER="true">Bookmarks bar</H3>
<DL><p>
<DT><A HREF="https://google.com" ADD_DATE="1609459200">Google</A>
</DL><p>
<DT><H3 ADD_DATE="1609459200" LAST_MODIFIED="1609459200">Other bookmarks</H3>
<DL><p>
<DT><A HREF="https://example.com" ADD_DATE="1609459200">Example</A>
</DL><p>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
urls = {e['url'] for e in entries}
assert 'https://google.com' in urls
assert 'https://example.com' in urls
class TestSafariFormat:
"""Test Safari Netscape bookmark export format."""
def test_safari_basic_format(self, tmp_path):
"""Test Safari export format."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=UTF-8">
<Title>Bookmarks</Title>
<H1>Bookmarks</H1>
<DL><p>
<DT><H3 FOLDED ADD_DATE="1609459200">BookmarksBar</H3>
<DL><p>
<DT><A HREF="https://apple.com" ADD_DATE="1609459200">Apple</A>
<DT><A HREF="https://webkit.org" ADD_DATE="1609459200">WebKit</A>
</DL><p>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
urls = {e['url'] for e in entries}
assert 'https://apple.com' in urls
assert 'https://webkit.org' in urls
def test_safari_reading_list(self, tmp_path):
"""Test Safari Reading List entries."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<DL><p>
<DT><H3 FOLDED ADD_DATE="1609459200">com.apple.ReadingList</H3>
<DL><p>
<DT><A HREF="https://article1.com" ADD_DATE="1609459200">Article 1</A>
<DD>Long article to read later
<DT><A HREF="https://article2.com" ADD_DATE="1609545600">Article 2</A>
<DD>Another saved article
</DL><p>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
urls = {e['url'] for e in entries}
assert 'https://article1.com' in urls
assert 'https://article2.com' in urls
class TestEdgeFormat:
"""Test Edge/IE bookmark export formats."""
def test_edge_chromium_format(self, tmp_path):
"""Test Edge (Chromium-based) format."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''<!DOCTYPE NETSCAPE-Bookmark-file-1>
<META HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=UTF-8">
<TITLE>Bookmarks</TITLE>
<H1>Bookmarks</H1>
<DL><p>
<DT><A HREF="https://microsoft.com" ADD_DATE="1609459200">Microsoft</A>
<DT><A HREF="https://bing.com" ADD_DATE="1609459200">Bing</A>
</DL><p>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
urls = {e['url'] for e in entries}
assert 'https://microsoft.com' in urls
assert 'https://bing.com' in urls
class TestTimestampFormats:
"""Test various timestamp format handling and edge cases."""
def test_unix_seconds_timestamp(self, tmp_path):
"""Test Unix epoch timestamp in seconds (10-11 digits) - Firefox, Chrome HTML export."""
input_file = tmp_path / 'bookmarks.html'
# 1609459200 = Jan 1, 2021 00:00:00 UTC (Unix epoch)
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="1609459200">Test</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
dt = datetime.fromisoformat(entry['bookmarked_at'])
assert dt.year == 2021
assert dt.month == 1
assert dt.day == 1
def test_mac_cocoa_seconds_timestamp(self, tmp_path):
"""Test Mac/Cocoa epoch timestamp in seconds - Safari uses epoch of 2001-01-01."""
input_file = tmp_path / 'bookmarks.html'
# Safari uses Mac absolute time: seconds since 2001-01-01 00:00:00 UTC
# 631152000 seconds after 2001-01-01 = Jan 1, 2021
# 631152000 as Unix would be Feb 1990 (too old for a recent bookmark)
input_file.write_text('''
<DT><A HREF="https://apple.com" ADD_DATE="631152000">Safari Bookmark</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
dt = datetime.fromisoformat(entry['bookmarked_at'])
# Should detect Mac epoch and convert correctly to 2021
assert 2020 <= dt.year <= 2022, f"Expected ~2021, got {dt.year}"
def test_safari_recent_timestamp(self, tmp_path):
"""Test recent Safari timestamp (Mac epoch)."""
input_file = tmp_path / 'bookmarks.html'
# 725846400 seconds after 2001-01-01 = Jan 1, 2024
input_file.write_text('''
<DT><A HREF="https://webkit.org" ADD_DATE="725846400">Recent Safari</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
dt = datetime.fromisoformat(entry['bookmarked_at'])
# Should detect Mac epoch and convert to 2024
assert 2023 <= dt.year <= 2025, f"Expected ~2024, got {dt.year}"
def test_unix_milliseconds_timestamp(self, tmp_path):
"""Test Unix epoch timestamp in milliseconds (13 digits) - Some JavaScript exports."""
input_file = tmp_path / 'bookmarks.html'
# 1609459200000 = Jan 1, 2021 00:00:00 UTC in milliseconds
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="1609459200000">Test</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
dt = datetime.fromisoformat(entry['bookmarked_at'])
assert dt.year == 2021
assert dt.month == 1
assert dt.day == 1
def test_chrome_webkit_microseconds_timestamp(self, tmp_path):
"""Test Chrome WebKit timestamp in microseconds (16-17 digits) - Chrome internal format."""
input_file = tmp_path / 'bookmarks.html'
# 1609459200000000 = Jan 1, 2021 00:00:00 UTC in microseconds (Unix epoch)
# Chrome sometimes exports with microsecond precision
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="1609459200000000">Test</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
dt = datetime.fromisoformat(entry['bookmarked_at'])
assert dt.year == 2021
assert dt.month == 1
assert dt.day == 1
def test_mac_cocoa_milliseconds_timestamp(self, tmp_path):
"""Test Mac/Cocoa epoch in milliseconds (rare but possible)."""
input_file = tmp_path / 'bookmarks.html'
# 631152000000 milliseconds after 2001-01-01 = Jan 1, 2021
input_file.write_text('''
<DT><A HREF="https://apple.com" ADD_DATE="631152000000">Safari Milliseconds</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
dt = datetime.fromisoformat(entry['bookmarked_at'])
# Should detect Mac epoch with milliseconds and convert to 2021
assert 2020 <= dt.year <= 2022, f"Expected ~2021, got {dt.year}"
def test_ambiguous_timestamp_detection(self, tmp_path):
"""Test that ambiguous timestamps are resolved to reasonable dates."""
input_file = tmp_path / 'bookmarks.html'
# Test multiple bookmarks with different timestamp formats mixed together
# Parser should handle each correctly
input_file.write_text('''
<DT><A HREF="https://unix-seconds.com" ADD_DATE="1609459200">Unix Seconds 2021</A>
<DT><A HREF="https://mac-seconds.com" ADD_DATE="631152000">Mac Seconds 2021</A>
<DT><A HREF="https://unix-ms.com" ADD_DATE="1704067200000">Unix MS 2024</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
# All should be parsed to reasonable dates (2020-2025)
for entry in entries:
dt = datetime.fromisoformat(entry['bookmarked_at'])
assert 2020 <= dt.year <= 2025, f"Date {dt.year} out of reasonable range for {entry['url']}"
def test_very_old_timestamp(self, tmp_path):
"""Test very old timestamp (1990s)."""
input_file = tmp_path / 'bookmarks.html'
# 820454400 = Jan 1, 1996
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="820454400">Old Bookmark</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
dt = datetime.fromisoformat(entry['bookmarked_at'])
assert dt.year == 1996
def test_recent_timestamp(self, tmp_path):
"""Test recent timestamp (2024)."""
input_file = tmp_path / 'bookmarks.html'
# 1704067200 = Jan 1, 2024
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="1704067200">Recent</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
dt = datetime.fromisoformat(entry['bookmarked_at'])
assert dt.year == 2024
def test_invalid_timestamp(self, tmp_path):
"""Test invalid/malformed timestamp - should extract URL but skip timestamp."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="invalid">Test</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
# Should still extract URL but skip timestamp
assert entry['url'] == 'https://example.com'
assert 'bookmarked_at' not in entry
def test_zero_timestamp(self, tmp_path):
"""Test timestamp of 0 (Unix epoch) - too old, should be skipped."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="0">Test</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
# Timestamp 0 = 1970, which is before MIN_REASONABLE_YEAR (1995)
# Parser should skip it as unreasonable
assert entry['url'] == 'https://example.com'
# Timestamp should be omitted (outside reasonable range)
assert 'bookmarked_at' not in entry
def test_negative_timestamp(self, tmp_path):
"""Test negative timestamp (before Unix epoch) - should handle gracefully."""
input_file = tmp_path / 'bookmarks.html'
# -86400 = 1 day before Unix epoch = Dec 31, 1969
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="-86400">Before Unix Epoch</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
# Should handle gracefully (extracts URL, may or may not include timestamp)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert entry['url'] == 'https://example.com'
# If timestamp is included, should be reasonable (1969)
if 'bookmarked_at' in entry:
dt = datetime.fromisoformat(entry['bookmarked_at'])
# Should be near Unix epoch (late 1969)
assert 1969 <= dt.year <= 1970
class TestBookmarkAttributes:
"""Test various bookmark attributes and metadata."""
def test_private_attribute(self, tmp_path):
"""Test bookmarks with PRIVATE attribute."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://private.example.com" ADD_DATE="1609459200" PRIVATE="1">Private</A>
<DT><A HREF="https://public.example.com" ADD_DATE="1609459200">Public</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
# Both should be extracted
assert len(entries) == 2
def test_shortcuturl_attribute(self, tmp_path):
"""Test bookmarks with SHORTCUTURL keyword attribute."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://google.com/search?q=%s" ADD_DATE="1609459200" SHORTCUTURL="g">Google Search</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert 'google.com' in entry['url']
def test_post_data_attribute(self, tmp_path):
"""Test bookmarks with POST_DATA attribute."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://example.com/login" ADD_DATE="1609459200" POST_DATA="user=test">Login</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert entry['url'] == 'https://example.com/login'
class TestEdgeCases:
"""Test edge cases and malformed data."""
def test_multiline_bookmark(self, tmp_path):
"""Test bookmark spanning multiple lines."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://example.com"
ADD_DATE="1609459200"
TAGS="tag1,tag2">
Multi-line Bookmark
</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
# Current regex works line-by-line, so this might not match
# Document current behavior
if result.returncode == 0:
output_file = tmp_path / 'urls.jsonl'
if output_file.exists():
content = output_file.read_text().strip()
if content:
entry = json.loads(content)
assert 'example.com' in entry['url']
def test_missing_add_date(self, tmp_path):
"""Test bookmark without ADD_DATE attribute - should still extract URL."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://example.com">No Date</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
# Should succeed and extract URL without timestamp
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert entry['url'] == 'https://example.com'
assert entry['title'] == 'No Date'
assert 'bookmarked_at' not in entry
def test_empty_title(self, tmp_path):
"""Test bookmark with empty title."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="1609459200"></A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
# Current regex requires non-empty title [^<]+
# Document current behavior
assert result.returncode == 1
def test_special_chars_in_url(self, tmp_path):
"""Test URLs with special characters."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://example.com/path?q=test&foo=bar&baz=qux#section" ADD_DATE="1609459200">Special URL</A>
<DT><A HREF="https://example.com/path%20with%20spaces" ADD_DATE="1609459200">Encoded Spaces</A>
<DT><A HREF="https://example.com/unicode/日本語" ADD_DATE="1609459200">Unicode Path</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
assert len(entries) == 3
assert 'q=test&foo=bar' in entries[0]['url']
assert '%20' in entries[1]['url']
def test_javascript_url(self, tmp_path):
"""Test javascript: URLs (should still be extracted)."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="javascript:alert('test')" ADD_DATE="1609459200">JS Bookmarklet</A>
<DT><A HREF="https://example.com" ADD_DATE="1609459200">Normal</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines]
# Both should be extracted
assert len(entries) == 2
assert entries[0]['url'].startswith('javascript:')
def test_data_url(self, tmp_path):
"""Test data: URLs."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="data:text/html,<h1>Test</h1>" ADD_DATE="1609459200">Data URL</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert entry['url'].startswith('data:')
def test_file_url(self, tmp_path):
"""Test file:// URLs."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="file:///home/user/document.pdf" ADD_DATE="1609459200">Local File</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert entry['url'].startswith('file://')
def test_very_long_url(self, tmp_path):
"""Test very long URLs (2000+ characters)."""
long_url = 'https://example.com/path?' + '&'.join([f'param{i}=value{i}' for i in range(100)])
input_file = tmp_path / 'bookmarks.html'
input_file.write_text(f'''
<DT><A HREF="{long_url}" ADD_DATE="1609459200">Long URL</A>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert len(entry['url']) > 1000
assert entry['url'].startswith('https://example.com')
def test_unicode_in_title(self, tmp_path):
"""Test Unicode characters in titles."""
input_file = tmp_path / 'bookmarks.html'
input_file.write_text('''
<DT><A HREF="https://example.com" ADD_DATE="1609459200">日本語のタイトル</A>
<DT><A HREF="https://example.org" ADD_DATE="1609459200">Título en Español</A>
<DT><A HREF="https://example.net" ADD_DATE="1609459200">Заголовок на русском</A>
<DT><A HREF="https://example.biz" ADD_DATE="1609459200">عنوان بالعربية</A>
<DT><A HREF="https://example.info" ADD_DATE="1609459200">Emoji 🚀 📚 🎉</A>
''', encoding='utf-8')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text(encoding='utf-8').strip().split('\n')
entries = [json.loads(line) for line in lines]
assert len(entries) == 5
assert any('日本語' in e.get('title', '') for e in entries)
assert any('Español' in e.get('title', '') for e in entries)
def test_large_file_many_bookmarks(self, tmp_path):
"""Test parsing large file with many bookmarks (1000+)."""
bookmarks = []
for i in range(1000):
bookmarks.append(
f'<DT><A HREF="https://example.com/page{i}" ADD_DATE="1609459200" TAGS="tag{i % 10}">Bookmark {i}</A>'
)
input_file = tmp_path / 'bookmarks.html'
input_file.write_text(
'<!DOCTYPE NETSCAPE-Bookmark-file-1>\n<DL><p>\n' +
'\n'.join(bookmarks) +
'\n</DL><p>'
)
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
timeout=30,
)
assert result.returncode == 0
assert 'Found 1000 URLs' in result.stdout
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
# Should have 10 unique tags + 1000 snapshots
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
assert len(tags) == 10
assert len(snapshots) == 1000
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,987 @@
#!/usr/bin/env python3
"""Comprehensive tests for parse_rss_urls extractor covering various RSS/Atom variants."""
import json
import subprocess
import sys
from pathlib import Path
import pytest
PLUGIN_DIR = Path(__file__).parent.parent
SCRIPT_PATH = next(PLUGIN_DIR.glob('on_Snapshot__*_parse_rss_urls.py'), None)
class TestRssVariants:
"""Test various RSS format variants."""
def test_rss_091(self, tmp_path):
"""Test RSS 0.91 format (oldest RSS version)."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0" encoding="UTF-8"?>
<rss version="0.91">
<channel>
<title>RSS 0.91 Feed</title>
<link>https://example.com</link>
<description>Test RSS 0.91</description>
<item>
<title>RSS 0.91 Article</title>
<link>https://example.com/article1</link>
<description>An article in RSS 0.91 format</description>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0, f"Failed: {result.stderr}"
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entry = json.loads(lines[0])
assert entry['url'] == 'https://example.com/article1'
assert entry['title'] == 'RSS 0.91 Article'
assert entry['via_extractor'] == 'parse_rss_urls'
def test_rss_10_rdf(self, tmp_path):
"""Test RSS 1.0 (RDF) format."""
input_file = tmp_path / 'feed.rdf'
input_file.write_text('''<?xml version="1.0" encoding="UTF-8"?>
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns="http://purl.org/rss/1.0/"
xmlns:dc="http://purl.org/dc/elements/1.1/">
<channel rdf:about="https://example.com">
<title>RSS 1.0 Feed</title>
<link>https://example.com</link>
</channel>
<item rdf:about="https://example.com/rdf1">
<title>RDF Item 1</title>
<link>https://example.com/rdf1</link>
<dc:date>2024-01-15T10:30:00Z</dc:date>
<dc:subject>Technology</dc:subject>
</item>
<item rdf:about="https://example.com/rdf2">
<title>RDF Item 2</title>
<link>https://example.com/rdf2</link>
<dc:date>2024-01-16T14:20:00Z</dc:date>
</item>
</rdf:RDF>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0, f"Failed: {result.stderr}"
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
entries = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
urls = {e['url'] for e in entries}
assert 'https://example.com/rdf1' in urls
assert 'https://example.com/rdf2' in urls
assert any(e.get('bookmarked_at') for e in entries)
def test_rss_20_with_full_metadata(self, tmp_path):
"""Test RSS 2.0 with all standard metadata fields."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel>
<title>Full RSS 2.0</title>
<link>https://example.com</link>
<description>Complete RSS 2.0 feed</description>
<item>
<title>Complete Article</title>
<link>https://example.com/complete</link>
<description>Full description here</description>
<author>author@example.com</author>
<category>Technology</category>
<category>Programming</category>
<guid>https://example.com/complete</guid>
<pubDate>Mon, 15 Jan 2024 10:30:00 GMT</pubDate>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
content = output_file.read_text().strip()
lines = content.split('\n')
# Check for Tag records
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
tag_names = {t['name'] for t in tags}
assert 'Technology' in tag_names
assert 'Programming' in tag_names
# Check Snapshot record
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
entry = snapshots[0]
assert entry['url'] == 'https://example.com/complete'
assert entry['title'] == 'Complete Article'
assert 'bookmarked_at' in entry
assert entry['tags'] == 'Technology,Programming' or entry['tags'] == 'Programming,Technology'
class TestAtomVariants:
"""Test various Atom format variants."""
def test_atom_10_full(self, tmp_path):
"""Test Atom 1.0 with full metadata."""
input_file = tmp_path / 'feed.atom'
input_file.write_text('''<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>Atom 1.0 Feed</title>
<updated>2024-01-15T00:00:00Z</updated>
<entry>
<title>Atom Entry 1</title>
<link href="https://atom.example.com/1"/>
<id>urn:uuid:1234-5678</id>
<updated>2024-01-15T10:30:00Z</updated>
<published>2024-01-14T08:00:00Z</published>
<category term="science"/>
<category term="research"/>
</entry>
</feed>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
tag_names = {t['name'] for t in tags}
assert 'science' in tag_names
assert 'research' in tag_names
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
entry = snapshots[0]
assert entry['url'] == 'https://atom.example.com/1'
assert 'bookmarked_at' in entry
def test_atom_with_alternate_link(self, tmp_path):
"""Test Atom feed with alternate link types."""
input_file = tmp_path / 'feed.atom'
input_file.write_text('''<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>Atom Alternate Links</title>
<entry>
<title>Entry with alternate</title>
<link rel="alternate" type="text/html" href="https://atom.example.com/article"/>
<link rel="self" href="https://atom.example.com/feed"/>
<updated>2024-01-15T10:30:00Z</updated>
</entry>
</feed>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
# feedparser should pick the alternate link
assert 'atom.example.com/article' in entry['url']
class TestDateFormats:
"""Test various date format handling."""
def test_rfc822_date(self, tmp_path):
"""Test RFC 822 date format (RSS 2.0 standard)."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>RFC 822 Date</title>
<link>https://example.com/rfc822</link>
<pubDate>Wed, 15 Jan 2020 10:30:45 GMT</pubDate>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert 'bookmarked_at' in entry
assert '2020-01-15' in entry['bookmarked_at']
def test_iso8601_date(self, tmp_path):
"""Test ISO 8601 date format (Atom standard)."""
input_file = tmp_path / 'feed.atom'
input_file.write_text('''<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<entry>
<title>ISO 8601 Date</title>
<link href="https://example.com/iso"/>
<published>2024-01-15T10:30:45.123Z</published>
</entry>
</feed>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert 'bookmarked_at' in entry
assert '2024-01-15' in entry['bookmarked_at']
def test_updated_vs_published_date(self, tmp_path):
"""Test that published date is preferred over updated date."""
input_file = tmp_path / 'feed.atom'
input_file.write_text('''<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<entry>
<title>Date Priority Test</title>
<link href="https://example.com/dates"/>
<published>2024-01-10T10:00:00Z</published>
<updated>2024-01-15T10:00:00Z</updated>
</entry>
</feed>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
# Should use published date (Jan 10) not updated date (Jan 15)
assert '2024-01-10' in entry['bookmarked_at']
def test_only_updated_date(self, tmp_path):
"""Test fallback to updated date when published is missing."""
input_file = tmp_path / 'feed.atom'
input_file.write_text('''<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<entry>
<title>Only Updated</title>
<link href="https://example.com/updated"/>
<updated>2024-01-20T10:00:00Z</updated>
</entry>
</feed>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert '2024-01-20' in entry['bookmarked_at']
def test_no_date(self, tmp_path):
"""Test entries without any date."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>No Date</title>
<link>https://example.com/nodate</link>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert 'bookmarked_at' not in entry
class TestTagsAndCategories:
"""Test various tag and category formats."""
def test_rss_categories(self, tmp_path):
"""Test RSS 2.0 category elements."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>Multi Category</title>
<link>https://example.com/cats</link>
<category>Tech</category>
<category>Web</category>
<category>Programming</category>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
tag_names = {t['name'] for t in tags}
assert 'Tech' in tag_names
assert 'Web' in tag_names
assert 'Programming' in tag_names
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
entry = snapshots[0]
tags_list = entry['tags'].split(',')
assert len(tags_list) == 3
def test_atom_categories(self, tmp_path):
"""Test Atom category elements with various attributes."""
input_file = tmp_path / 'feed.atom'
input_file.write_text('''<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<entry>
<title>Atom Categories</title>
<link href="https://example.com/atomcats"/>
<category term="python" scheme="http://example.com/categories" label="Python Programming"/>
<category term="django" label="Django Framework"/>
<updated>2024-01-15T10:00:00Z</updated>
</entry>
</feed>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
tag_names = {t['name'] for t in tags}
# feedparser extracts the 'term' attribute
assert 'python' in tag_names
assert 'django' in tag_names
def test_no_tags(self, tmp_path):
"""Test entries without tags."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>No Tags</title>
<link>https://example.com/notags</link>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert 'tags' not in entry or entry['tags'] == ''
def test_duplicate_tags(self, tmp_path):
"""Test that duplicate tags are handled properly."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>Duplicate Tags</title>
<link>https://example.com/dups</link>
<category>Python</category>
<category>Python</category>
<category>Web</category>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
# Tag records should be unique
tag_names = [t['name'] for t in tags]
assert tag_names.count('Python') == 1
class TestCustomNamespaces:
"""Test custom namespace handling (Dublin Core, Media RSS, etc.)."""
def test_dublin_core_metadata(self, tmp_path):
"""Test Dublin Core namespace fields."""
input_file = tmp_path / 'feed.rdf'
input_file.write_text('''<?xml version="1.0" encoding="UTF-8"?>
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns="http://purl.org/rss/1.0/"
xmlns:dc="http://purl.org/dc/elements/1.1/">
<channel rdf:about="https://example.com">
<title>Dublin Core Feed</title>
</channel>
<item rdf:about="https://example.com/dc1">
<title>Dublin Core Article</title>
<link>https://example.com/dc1</link>
<dc:creator>John Doe</dc:creator>
<dc:subject>Technology</dc:subject>
<dc:date>2024-01-15T10:30:00Z</dc:date>
<dc:rights>Copyright 2024</dc:rights>
</item>
</rdf:RDF>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
entry = snapshots[0]
assert entry['url'] == 'https://example.com/dc1'
assert entry['title'] == 'Dublin Core Article'
# feedparser should parse dc:date as bookmarked_at
assert 'bookmarked_at' in entry
def test_media_rss_namespace(self, tmp_path):
"""Test Media RSS namespace (common in podcast feeds)."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0" xmlns:media="http://search.yahoo.com/mrss/">
<channel>
<title>Media RSS Feed</title>
<item>
<title>Podcast Episode 1</title>
<link>https://example.com/podcast/1</link>
<media:content url="https://example.com/audio.mp3" type="audio/mpeg"/>
<media:thumbnail url="https://example.com/thumb.jpg"/>
<pubDate>Mon, 15 Jan 2024 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert entry['url'] == 'https://example.com/podcast/1'
assert entry['title'] == 'Podcast Episode 1'
def test_itunes_namespace(self, tmp_path):
"""Test iTunes namespace (common in podcast feeds)."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0" xmlns:itunes="http://www.itunes.com/dtds/podcast-1.0.dtd">
<channel>
<title>iTunes Podcast</title>
<item>
<title>Episode 1: Getting Started</title>
<link>https://example.com/ep1</link>
<itunes:author>Jane Smith</itunes:author>
<itunes:duration>45:30</itunes:duration>
<itunes:keywords>programming, tutorial, beginner</itunes:keywords>
<pubDate>Tue, 16 Jan 2024 08:00:00 GMT</pubDate>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
entry = snapshots[0]
assert entry['url'] == 'https://example.com/ep1'
assert entry['title'] == 'Episode 1: Getting Started'
class TestEdgeCases:
"""Test edge cases and malformed data."""
def test_missing_title(self, tmp_path):
"""Test entries without title."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<link>https://example.com/notitle</link>
<pubDate>Mon, 15 Jan 2024 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert entry['url'] == 'https://example.com/notitle'
assert 'title' not in entry
def test_missing_link(self, tmp_path):
"""Test entries without link (should be skipped)."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>No Link</title>
<description>This entry has no link</description>
</item>
<item>
<title>Has Link</title>
<link>https://example.com/haslink</link>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
# Should only have the entry with a link
assert entry['url'] == 'https://example.com/haslink'
assert '1 URL' in result.stdout
def test_html_entities_in_title(self, tmp_path):
"""Test HTML entities in titles are properly decoded."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>Using &lt;div&gt; &amp; &lt;span&gt; tags</title>
<link>https://example.com/html</link>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert entry['title'] == 'Using <div> & <span> tags'
def test_special_characters_in_tags(self, tmp_path):
"""Test special characters in tags."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>Special Tags</title>
<link>https://example.com/special</link>
<category>C++</category>
<category>Node.js</category>
<category>Web/Mobile</category>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
tag_names = {t['name'] for t in tags}
assert 'C++' in tag_names
assert 'Node.js' in tag_names
assert 'Web/Mobile' in tag_names
def test_cdata_sections(self, tmp_path):
"""Test CDATA sections in titles and descriptions."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title><![CDATA[Using <strong>HTML</strong> in titles]]></title>
<link>https://example.com/cdata</link>
<description><![CDATA[Content with <em>markup</em>]]></description>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
# feedparser should strip HTML tags
assert 'HTML' in entry['title']
assert entry['url'] == 'https://example.com/cdata'
def test_relative_urls(self, tmp_path):
"""Test that relative URLs are preserved (feedparser handles them)."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<link>https://example.com</link>
<item>
<title>Relative URL</title>
<link>/article/relative</link>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
# feedparser may convert relative to absolute, or leave as-is
assert 'article/relative' in entry['url']
def test_unicode_characters(self, tmp_path):
"""Test Unicode characters in feed content."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel>
<item>
<title>Unicode: 日本語 Français 中文 العربية</title>
<link>https://example.com/unicode</link>
<category>日本語</category>
<category>Français</category>
</item>
</channel>
</rss>
''', encoding='utf-8')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text(encoding='utf-8').strip().split('\n')
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
entry = snapshots[0]
assert '日本語' in entry['title']
assert 'Français' in entry['title']
def test_very_long_title(self, tmp_path):
"""Test handling of very long titles."""
long_title = 'A' * 1000
input_file = tmp_path / 'feed.rss'
input_file.write_text(f'''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<item>
<title>{long_title}</title>
<link>https://example.com/long</link>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert len(entry['title']) == 1000
assert entry['title'] == long_title
def test_multiple_entries_batch(self, tmp_path):
"""Test processing a large batch of entries."""
items = []
for i in range(100):
items.append(f'''
<item>
<title>Article {i}</title>
<link>https://example.com/article/{i}</link>
<category>Tag{i % 10}</category>
<pubDate>Mon, {15 + (i % 15)} Jan 2024 10:00:00 GMT</pubDate>
</item>
''')
input_file = tmp_path / 'feed.rss'
input_file.write_text(f'''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<title>Large Feed</title>
{''.join(items)}
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
assert 'Found 100 URLs' in result.stdout
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
# Should have 10 unique tags (Tag0-Tag9) + 100 snapshots
tags = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Tag']
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
assert len(tags) == 10
assert len(snapshots) == 100
class TestRealWorldFeeds:
"""Test patterns from real-world RSS feeds."""
def test_medium_style_feed(self, tmp_path):
"""Test Medium-style feed structure."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<rss version="2.0">
<channel>
<title>Medium Feed</title>
<item>
<title>Article Title</title>
<link>https://medium.com/@user/article-slug-123abc</link>
<guid isPermaLink="false">https://medium.com/p/123abc</guid>
<pubDate>Wed, 15 Jan 2024 10:30:00 GMT</pubDate>
<category>Programming</category>
<category>JavaScript</category>
<dc:creator xmlns:dc="http://purl.org/dc/elements/1.1/">Author Name</dc:creator>
</item>
</channel>
</rss>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
entry = snapshots[0]
assert 'medium.com' in entry['url']
assert entry['title'] == 'Article Title'
def test_reddit_style_feed(self, tmp_path):
"""Test Reddit-style feed structure."""
input_file = tmp_path / 'feed.rss'
input_file.write_text('''<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>Reddit Feed</title>
<entry>
<title>Post Title</title>
<link href="https://www.reddit.com/r/programming/comments/abc123/post_title/"/>
<updated>2024-01-15T10:30:00+00:00</updated>
<category term="programming" label="r/programming"/>
<id>t3_abc123</id>
</entry>
</feed>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
lines = output_file.read_text().strip().split('\n')
snapshots = [json.loads(line) for line in lines if json.loads(line)['type'] == 'Snapshot']
entry = snapshots[0]
assert 'reddit.com' in entry['url']
def test_youtube_style_feed(self, tmp_path):
"""Test YouTube-style feed structure."""
input_file = tmp_path / 'feed.atom'
input_file.write_text('''<?xml version="1.0"?>
<feed xmlns:yt="http://www.youtube.com/xml/schemas/2015"
xmlns="http://www.w3.org/2005/Atom">
<title>YouTube Channel</title>
<entry>
<title>Video Title</title>
<link rel="alternate" href="https://www.youtube.com/watch?v=dQw4w9WgXcQ"/>
<published>2024-01-15T10:30:00+00:00</published>
<yt:videoId>dQw4w9WgXcQ</yt:videoId>
<yt:channelId>UCxxxxxxxx</yt:channelId>
</entry>
</feed>
''')
result = subprocess.run(
[sys.executable, str(SCRIPT_PATH), '--url', f'file://{input_file}'],
cwd=tmp_path,
capture_output=True,
text=True,
)
assert result.returncode == 0
output_file = tmp_path / 'urls.jsonl'
entry = json.loads(output_file.read_text().strip())
assert 'youtube.com' in entry['url']
assert 'dQw4w9WgXcQ' in entry['url']
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -47,16 +47,13 @@ dependencies = [
"django-admin-data-views>=0.4.1",
"django-object-actions>=4.3.0",
"django-taggit==6.1.0", # TODO: remove this in favor of KVTags only
### State Management
"python-statemachine>=2.3.6",
### CLI / Logging
"click>=8.1.7", # for: nicer CLI command + argument definintions
"rich>=13.8.0", # for: pretty CLI output
"rich-click>=1.8.4", # for: pretty CLI command help text & output
"ipython>=8.27.0", # for: archivebox shell (TODO: replace with bpython?)
### Host OS / System
"supervisor>=4.2.5", # for: archivebox server starting daphne and workers
"psutil>=6.0.0", # for: monitoring orchestractor,actors,workers,etc. and machine.models.Process
@@ -65,33 +62,28 @@ dependencies = [
"atomicwrites==1.4.1", # for: config file writes, index.json file writes, etc. (TODO: remove this deprecated lib in favor of archivebox.filestore.util/os.rename/os.replace)
"python-crontab>=3.2.0", # for: archivebox schedule (TODO: remove this in favor of our own custom archivebox scheduler)
"croniter>=3.0.3", # for: archivebox schedule (TODO: remove this in favor of our own custom archivebox scheduler)
### Base Types
"pydantic>=2.8.0", # for: archivebox.api (django-ninja), archivebox.config (pydantic-settings), and archivebox.index.schema (pydantic)
"pydantic-settings>=2.5.2", # for: archivebox.config
"python-benedict[io,parse]>=0.33.2", # for: dict replacement all over the codebase to allow .attr-style access
"base32-crockford>=0.3.0", # for: encoding UUIDs in base32
### Static Typing
"mypy-extensions>=1.0.0", # for: django-stubs type hints (TODO: remove in favor of pylance/pyright?)
"django-stubs>=5.0.4", # for: vscode type hints on models and common django APIs
### API clients
"requests>=2.32.3", # for: fetching title, static files, headers (TODO: replace with httpx?)
"sonic-client>=1.0.0",
"pocket>=0.3.6", # for: importing URLs from Pocket API
### Parsers
"feedparser>=6.0.11", # for: parsing pocket/pinboard/etc. RSS/bookmarks imports
"dateparser>=1.2.0", # for: parsing pocket/pinboard/etc. RSS/bookmark import dates
"tzdata>=2024.2", # needed for dateparser {TZ: UTC} on some systems: https://github.com/ArchiveBox/ArchiveBox/issues/1553
"w3lib>=2.2.1", # used for parsing content-type encoding from http response headers & html tags
### Extractor dependencies (optional binaries detected at runtime via shutil.which)
"yt-dlp>=2024.1.0", # for: media extractor
### Binary/Package Management
"abx-pkg>=0.1.0", # for: detecting, versioning, and installing binaries via apt/brew/pip/npm
"gallery-dl>=1.31.1",
]
[project.optional-dependencies]

14
uv.lock generated
View File

@@ -77,6 +77,7 @@ dependencies = [
{ name = "django-stubs", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "django-taggit", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "feedparser", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "gallery-dl", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "ipython", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "mypy-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "platformdirs", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
@@ -172,6 +173,7 @@ requires-dist = [
{ name = "django-taggit", specifier = "==6.1.0" },
{ name = "djdt-flamegraph", marker = "extra == 'debug'", specifier = ">=0.2.13" },
{ name = "feedparser", specifier = ">=6.0.11" },
{ name = "gallery-dl", specifier = ">=1.31.1" },
{ name = "ipdb", marker = "extra == 'debug'", specifier = ">=0.13.13" },
{ name = "ipython", specifier = ">=8.27.0" },
{ name = "mypy-extensions", specifier = ">=1.0.0" },
@@ -819,6 +821,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ab/6e/81d47999aebc1b155f81eca4477a616a70f238a2549848c38983f3c22a82/ftfy-6.3.1-py3-none-any.whl", hash = "sha256:7c70eb532015cd2f9adb53f101fb6c7945988d023a085d127d1573dc49dd0083", size = 44821, upload-time = "2024-10-26T00:50:33.425Z" },
]
[[package]]
name = "gallery-dl"
version = "1.31.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "requests", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/71/3a/3fd9e453ff2a24e6d51d5f7f9d1d9b4dc62ad22ec6a7e1cf1453e3551370/gallery_dl-1.31.1.tar.gz", hash = "sha256:5255279a06dcb7e6d0594f80cf693f7f8f07ae94deb8a797358c372a900959d4", size = 633786, upload-time = "2025-12-20T09:56:39.023Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7f/95/3ce479e13ab6be2e50de22e45e843c17354913bb6d7d393aed4d00915eaf/gallery_dl-1.31.1-py3-none-any.whl", hash = "sha256:b9bdd63f2d14affbac3df35ebe6462ae75a4032787913112035eb5c42a054467", size = 788352, upload-time = "2025-12-20T09:56:35.7Z" },
]
[[package]]
name = "googleapis-common-protos"
version = "1.72.0"