cleanup tui, startup, card templtes, and more

This commit is contained in:
Nick Sweeting
2026-01-19 14:33:20 -08:00
parent bef67760db
commit 86e7973334
68 changed files with 1370 additions and 546 deletions

View File

@@ -57,7 +57,7 @@ def render_archiveresults_list(archiveresults_qs, limit=50):
# Build output link - use embed_path() which checks output_files first
embed_path = result.embed_path() if hasattr(result, 'embed_path') else None
output_link = f'/archive/{result.snapshot.timestamp}/{embed_path}' if embed_path and result.status == 'succeeded' else f'/archive/{result.snapshot.timestamp}/'
output_link = f'/{result.snapshot.archive_path}/{embed_path}' if embed_path and result.status == 'succeeded' else f'/{result.snapshot.archive_path}/'
# Get version - try cmd_version field
version = result.cmd_version if result.cmd_version else '-'
@@ -83,8 +83,8 @@ def render_archiveresults_list(archiveresults_qs, limit=50):
{icon}
</td>
<td style="padding: 10px 12px; font-weight: 500; color: #334155;">
<a href="{output_link}" target="_blank"
style="color: #334155; text-decoration: none;"
<a href="{output_link}" target="_blank"
style="color: #334155; text-decoration: none;"
title="View output fullscreen"
onmouseover="this.style.color='#2563eb'; this.style.textDecoration='underline';"
onmouseout="this.style.color='#334155'; this.style.textDecoration='none';">
@@ -301,8 +301,8 @@ class ArchiveResultAdmin(BaseModelAdmin):
)
def snapshot_info(self, result):
return format_html(
'<a href="/archive/{}/index.html"><b><code>[{}]</code></b> &nbsp; {} &nbsp; {}</a><br/>',
result.snapshot.timestamp,
'<a href="/{}/index.html"><b><code>[{}]</code></b> &nbsp; {} &nbsp; {}</a><br/>',
result.snapshot.archive_path,
str(result.snapshot.id)[:8],
result.snapshot.bookmarked_at.strftime('%Y-%m-%d %H:%M'),
result.snapshot.url[:128],
@@ -336,8 +336,8 @@ class ArchiveResultAdmin(BaseModelAdmin):
embed_path = result.embed_path() if hasattr(result, 'embed_path') else None
output_path = embed_path if (result.status == 'succeeded' and embed_path) else 'index.html'
return format_html(
'<a href="/archive/{}/{}" class="output-link">↗️</a><pre>{}</pre>',
result.snapshot.timestamp,
'<a href="/{}/{}" class="output-link">↗️</a><pre>{}</pre>',
result.snapshot.archive_path,
output_path,
result.output_str,
)
@@ -348,7 +348,7 @@ class ArchiveResultAdmin(BaseModelAdmin):
'<pre style="display: inline-block">{}</pre><br/>',
result.output_str,
)
output_html += format_html('<a href="/archive/{}/index.html#all">See result files ...</a><br/><pre><code>', str(result.snapshot.timestamp))
output_html += format_html('<a href="/{}/index.html#all">See result files ...</a><br/><pre><code>', str(result.snapshot.archive_path))
embed_path = result.embed_path() if hasattr(result, 'embed_path') else ''
path_from_embed = (snapshot_dir / (embed_path or ''))
output_html += format_html('<i style="padding: 1px">{}</i><b style="padding-right: 20px">/</b><i>{}</i><br/><hr/>', str(snapshot_dir), str(embed_path))

View File

@@ -237,13 +237,13 @@ class SnapshotAdmin(SearchResultsAdminMixin, ConfigEditorMixin, BaseModelAdmin):
'''
<div style="display: flex; flex-wrap: wrap; gap: 12px; align-items: center;">
<a class="btn" style="display: inline-flex; align-items: center; gap: 6px; padding: 10px 16px; background: #f8fafc; border: 1px solid #e2e8f0; border-radius: 8px; color: #334155; text-decoration: none; font-size: 14px; font-weight: 500; transition: all 0.15s;"
href="/archive/{}"
href="/{}"
onmouseover="this.style.background='#f1f5f9'; this.style.borderColor='#cbd5e1';"
onmouseout="this.style.background='#f8fafc'; this.style.borderColor='#e2e8f0';">
📄 Summary Page
</a>
<a class="btn" style="display: inline-flex; align-items: center; gap: 6px; padding: 10px 16px; background: #f8fafc; border: 1px solid #e2e8f0; border-radius: 8px; color: #334155; text-decoration: none; font-size: 14px; font-weight: 500; transition: all 0.15s;"
href="/archive/{}/index.html#all"
href="/{}/index.html#all"
onmouseover="this.style.background='#f1f5f9'; this.style.borderColor='#cbd5e1';"
onmouseout="this.style.background='#f8fafc'; this.style.borderColor='#e2e8f0';">
📁 Result Files
@@ -291,8 +291,8 @@ class SnapshotAdmin(SearchResultsAdminMixin, ConfigEditorMixin, BaseModelAdmin):
<b>Tip:</b> Action buttons link to the list view with this snapshot pre-selected. Select it and use the action dropdown to execute.
</p>
''',
obj.timestamp,
obj.timestamp,
obj.archive_path,
obj.archive_path,
obj.url,
obj.pk,
obj.pk,
@@ -310,7 +310,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ConfigEditorMixin, BaseModelAdmin):
'' if obj.is_archived else '',
obj.num_outputs,
self.size(obj) or '0kb',
f'/archive/{obj.timestamp}/favicon.ico',
f'/{obj.archive_path}/favicon.ico',
obj.extension or '-',
)

View File

@@ -1,6 +1,7 @@
__package__ = 'archivebox.core'
import ipaddress
import re
from django.utils import timezone
from django.contrib.auth.middleware import RemoteUserMiddleware
from django.core.exceptions import ImproperlyConfigured
@@ -28,10 +29,11 @@ def TimezoneMiddleware(get_response):
def CacheControlMiddleware(get_response):
snapshot_path_re = re.compile(r"^/[^/]+/\\d{8}/[^/]+/[0-9a-fA-F-]{8,36}/")
def middleware(request):
response = get_response(request)
if '/archive/' in request.path or '/static/' in request.path:
if '/archive/' in request.path or '/static/' in request.path or snapshot_path_re.match(request.path):
policy = 'public' if SERVER_CONFIG.PUBLIC_SNAPSHOTS else 'private'
response['Cache-Control'] = f'{policy}, max-age=60, stale-while-revalidate=300'
# print('Set Cache-Control header to', response['Cache-Control'])

View File

@@ -1296,7 +1296,6 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
)}
path = self.archive_path
canon = self.canonical_outputs()
output = ""
output_template = '<a href="/{}/{}" class="exists-{}" title="{}">{}</a> &nbsp;'
@@ -1313,10 +1312,11 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
if not icon.strip() and not existing:
continue
embed_path = result.embed_path() if result else f'{plugin}/'
output += format_html(
output_template,
path,
canon.get(plugin, plugin + '/'),
embed_path,
str(bool(existing)),
plugin,
icon
@@ -1402,9 +1402,38 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
return
@cached_property
def archive_path(self):
def legacy_archive_path(self) -> str:
return f'{CONSTANTS.ARCHIVE_DIR_NAME}/{self.timestamp}'
@cached_property
def url_path(self) -> str:
"""URL path matching the current snapshot output_dir layout."""
try:
rel_path = Path(self.output_dir).resolve().relative_to(CONSTANTS.DATA_DIR)
except Exception:
return self.legacy_archive_path
parts = rel_path.parts
# New layout: users/<username>/snapshots/<YYYYMMDD>/<domain>/<uuid>/
if len(parts) >= 6 and parts[0] == 'users' and parts[2] == 'snapshots':
username = parts[1]
if username == 'system':
username = 'web'
date_str = parts[3]
domain = parts[4]
snapshot_id = parts[5]
return f'{username}/{date_str}/{domain}/{snapshot_id}'
# Legacy layout: archive/<timestamp>/
if len(parts) >= 2 and parts[0] == CONSTANTS.ARCHIVE_DIR_NAME:
return f'{parts[0]}/{parts[1]}'
return '/'.join(parts)
@cached_property
def archive_path(self):
return self.url_path
@cached_property
def archive_size(self):
try:
@@ -1467,8 +1496,8 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
for pid_file in Path(self.output_dir).glob('**/*.pid'):
pid_file.unlink(missing_ok=True)
# Update all STARTED ArchiveResults from filesystem
results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED)
# Update all background ArchiveResults from filesystem (in case output arrived late)
results = self.archiveresult_set.filter(hook_name__contains='.bg.')
for ar in results:
ar.update_from_output()
@@ -1914,153 +1943,6 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
# Output Path Methods (migrated from Link schema)
# =========================================================================
def canonical_outputs(self) -> Dict[str, Optional[str]]:
"""
Intelligently discover the best output file for each plugin.
Uses actual ArchiveResult data and filesystem scanning with smart heuristics.
"""
FAVICON_PROVIDER = 'https://www.google.com/s2/favicons?domain={}'
# Mimetypes that can be embedded/previewed in an iframe
IFRAME_EMBEDDABLE_EXTENSIONS = {
'html', 'htm', 'pdf', 'txt', 'md', 'json', 'jsonl',
'png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico',
'mp4', 'webm', 'mp3', 'opus', 'ogg', 'wav',
}
MIN_DISPLAY_SIZE = 15_000 # 15KB - filter out tiny files
MAX_SCAN_FILES = 50 # Don't scan massive directories
def find_best_output_in_dir(dir_path: Path, plugin_name: str) -> Optional[str]:
"""Find the best representative file in a plugin's output directory"""
if not dir_path.exists() or not dir_path.is_dir():
return None
candidates = []
file_count = 0
# Special handling for media plugin - look for thumbnails
is_media_dir = plugin_name == 'media'
# Scan for suitable files
for file_path in dir_path.rglob('*'):
file_count += 1
if file_count > MAX_SCAN_FILES:
break
if file_path.is_dir() or file_path.name.startswith('.'):
continue
ext = file_path.suffix.lstrip('.').lower()
if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
continue
try:
size = file_path.stat().st_size
except OSError:
continue
# For media dir, allow smaller image files (thumbnails are often < 15KB)
min_size = 5_000 if (is_media_dir and ext in ('png', 'jpg', 'jpeg', 'webp', 'gif')) else MIN_DISPLAY_SIZE
if size < min_size:
continue
# Prefer main files: index.html, output.*, content.*, etc.
priority = 0
name_lower = file_path.name.lower()
if is_media_dir:
# Special prioritization for media directories
if any(keyword in name_lower for keyword in ('thumb', 'thumbnail', 'cover', 'poster')):
priority = 200 # Highest priority for thumbnails
elif ext in ('png', 'jpg', 'jpeg', 'webp', 'gif'):
priority = 150 # High priority for any image
elif ext in ('mp4', 'webm', 'mp3', 'opus', 'ogg'):
priority = 100 # Lower priority for actual media files
else:
priority = 50
elif 'index' in name_lower:
priority = 100
elif name_lower.startswith(('output', 'content', plugin_name)):
priority = 50
elif ext in ('html', 'htm', 'pdf'):
priority = 30
elif ext in ('png', 'jpg', 'jpeg', 'webp'):
priority = 20
else:
priority = 10
candidates.append((priority, size, file_path))
if not candidates:
return None
# Sort by priority (desc), then size (desc)
candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
best_file = candidates[0][2]
return str(best_file.relative_to(Path(self.output_dir)))
canonical = {
'index_path': 'index.html',
'google_favicon_path': FAVICON_PROVIDER.format(self.domain),
'archivedotorg_path': f'https://web.archive.org/web/{self.base_url}',
}
# Scan each ArchiveResult's output directory for the best file
snap_dir = Path(self.output_dir)
for result in self.archiveresult_set.filter(status='succeeded'):
if not result.output_files and not result.output_str:
continue
# Try to find the best output file for this plugin
plugin_dir = snap_dir / result.plugin
best_output = None
# Check output_files first (new field)
if result.output_files:
first_file = next(iter(result.output_files.keys()), None)
if first_file and (plugin_dir / first_file).exists():
best_output = f'{result.plugin}/{first_file}'
# Fallback to output_str if it looks like a path
if not best_output and result.output_str and (snap_dir / result.output_str).exists():
best_output = result.output_str
if not best_output and plugin_dir.exists():
# Intelligently find the best file in the plugin's directory
best_output = find_best_output_in_dir(plugin_dir, result.plugin)
if best_output:
canonical[f'{result.plugin}_path'] = best_output
# Also scan top-level for legacy outputs (backwards compatibility)
for file_path in snap_dir.glob('*'):
if file_path.is_dir() or file_path.name in ('index.html', 'index.json'):
continue
ext = file_path.suffix.lstrip('.').lower()
if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
continue
try:
size = file_path.stat().st_size
if size >= MIN_DISPLAY_SIZE:
# Add as generic output with stem as key
key = f'{file_path.stem}_path'
if key not in canonical:
canonical[key] = file_path.name
except OSError:
continue
if self.is_static:
static_path = f'warc/{self.timestamp}'
canonical.update({
'title': self.basename,
'wget_path': static_path,
})
return canonical
def latest_outputs(self, status: Optional[str] = None) -> Dict[str, Any]:
"""Get the latest output that each plugin produced"""
from archivebox.hooks import get_plugins
@@ -2078,6 +1960,96 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
latest[plugin] = result.embed_path() if result else None
return latest
def discover_outputs(self) -> list[dict]:
"""Discover output files from ArchiveResults and filesystem."""
from archivebox.misc.util import ts_to_date_str
ArchiveResult = self.archiveresult_set.model
snap_dir = Path(self.output_dir)
outputs: list[dict] = []
seen: set[str] = set()
text_exts = ('.json', '.jsonl', '.txt', '.csv', '.tsv', '.xml', '.yml', '.yaml', '.md', '.log')
def is_metadata_path(path: str | None) -> bool:
lower = (path or '').lower()
return lower.endswith(text_exts)
def is_compact_path(path: str | None) -> bool:
lower = (path or '').lower()
return lower.endswith(text_exts)
for result in self.archiveresult_set.all().order_by('start_ts'):
embed_path = result.embed_path()
if not embed_path or embed_path.strip() in ('.', '/', './'):
continue
abs_path = snap_dir / embed_path
if not abs_path.exists():
continue
if abs_path.is_dir():
if not any(p.is_file() for p in abs_path.rglob('*')):
continue
size = sum(p.stat().st_size for p in abs_path.rglob('*') if p.is_file())
else:
size = abs_path.stat().st_size
outputs.append({
'name': result.plugin,
'path': embed_path,
'ts': ts_to_date_str(result.end_ts),
'size': size or 0,
'is_metadata': is_metadata_path(embed_path),
'is_compact': is_compact_path(embed_path),
'result': result,
})
seen.add(result.plugin)
embeddable_exts = {
'html', 'htm', 'pdf', 'txt', 'md', 'json', 'jsonl', 'csv', 'tsv',
'png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico',
'mp4', 'webm', 'mp3', 'opus', 'ogg', 'wav',
}
for entry in snap_dir.iterdir():
if entry.name in ('index.html', 'index.json', 'favicon.ico', 'warc'):
continue
if entry.is_dir():
plugin = entry.name
if plugin in seen:
continue
best_file = ArchiveResult._find_best_output_file(entry, plugin)
if not best_file:
continue
rel_path = str(best_file.relative_to(snap_dir))
outputs.append({
'name': plugin,
'path': rel_path,
'ts': ts_to_date_str(best_file.stat().st_mtime or 0),
'size': best_file.stat().st_size or 0,
'is_metadata': is_metadata_path(rel_path),
'is_compact': is_compact_path(rel_path),
'result': None,
})
seen.add(plugin)
elif entry.is_file():
ext = entry.suffix.lstrip('.').lower()
if ext not in embeddable_exts:
continue
plugin = entry.stem
if plugin in seen:
continue
outputs.append({
'name': plugin,
'path': entry.name,
'ts': ts_to_date_str(entry.stat().st_mtime or 0),
'size': entry.stat().st_size or 0,
'is_metadata': is_metadata_path(entry.name),
'is_compact': is_compact_path(entry.name),
'result': None,
})
seen.add(plugin)
return outputs
# =========================================================================
# Serialization Methods
# =========================================================================
@@ -2114,8 +2086,6 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
'num_outputs': self.num_outputs,
'num_failures': self.num_failures,
}
if extended:
result['canonical'] = self.canonical_outputs()
return result
def to_json_str(self, indent: int = 4) -> str:
@@ -2146,23 +2116,29 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
SAVE_ARCHIVE_DOT_ORG = config.get('SAVE_ARCHIVE_DOT_ORG', True)
TITLE_LOADING_MSG = 'Not yet archived...'
canonical = self.canonical_outputs()
preview_priority = [
'singlefile_path',
'screenshot_path',
'wget_path',
'dom_path',
'pdf_path',
'readability_path',
'singlefile',
'screenshot',
'wget',
'dom',
'pdf',
'readability',
]
best_preview_path = next(
(canonical.get(key) for key in preview_priority if canonical.get(key)),
canonical.get('index_path', 'index.html'),
)
outputs = self.discover_outputs()
outputs_by_plugin = {out['name']: out for out in outputs}
best_preview_path = 'about:blank'
for plugin in preview_priority:
out = outputs_by_plugin.get(plugin)
if out and out.get('path'):
best_preview_path = out['path']
break
if best_preview_path == 'about:blank' and outputs:
best_preview_path = outputs[0].get('path') or 'about:blank'
context = {
**self.to_dict(extended=True),
**{f'{k}_path': v for k, v in canonical.items()},
'canonical': {f'{k}_path': v for k, v in canonical.items()},
'title': htmlencode(self.title or (self.base_url if self.is_archived else TITLE_LOADING_MSG)),
'url_str': htmlencode(urldecode(self.base_url)),
'archive_url': urlencode(f'warc/{self.timestamp}' or (self.domain if self.is_archived else '')) or 'about:blank',
@@ -2175,6 +2151,7 @@ class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHea
'SAVE_ARCHIVE_DOT_ORG': SAVE_ARCHIVE_DOT_ORG,
'PREVIEW_ORIGINALS': SERVER_CONFIG.PREVIEW_ORIGINALS,
'best_preview_path': best_preview_path,
'archiveresults': outputs,
}
rendered_html = render_to_string('snapshot.html', context)
atomic_write(str(Path(out_dir) / CONSTANTS.HTML_INDEX_FILENAME), rendered_html)
@@ -2496,6 +2473,61 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
def output_exists(self) -> bool:
return os.path.exists(Path(self.snapshot_dir) / self.plugin)
@staticmethod
def _find_best_output_file(dir_path: Path, plugin_name: str | None = None) -> Optional[Path]:
if not dir_path.exists() or not dir_path.is_dir():
return None
embeddable_exts = {
'html', 'htm', 'pdf', 'txt', 'md', 'json', 'jsonl', 'csv', 'tsv',
'png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico',
'mp4', 'webm', 'mp3', 'opus', 'ogg', 'wav',
}
for name in ('index.html', 'index.htm'):
candidate = dir_path / name
if candidate.exists() and candidate.is_file():
return candidate
candidates = []
file_count = 0
max_scan = 200
plugin_lower = (plugin_name or '').lower()
for file_path in dir_path.rglob('*'):
file_count += 1
if file_count > max_scan:
break
if file_path.is_dir() or file_path.name.startswith('.'):
continue
ext = file_path.suffix.lstrip('.').lower()
if ext not in embeddable_exts:
continue
try:
size = file_path.stat().st_size
except OSError:
continue
name_lower = file_path.name.lower()
priority = 0
if name_lower.startswith('index'):
priority = 100
elif plugin_lower and name_lower.startswith(('output', 'content', plugin_lower)):
priority = 60
elif ext in ('html', 'htm', 'pdf'):
priority = 40
elif ext in ('png', 'jpg', 'jpeg', 'webp', 'svg', 'gif', 'ico'):
priority = 30
elif ext in ('json', 'jsonl', 'txt', 'md', 'csv', 'tsv'):
priority = 20
else:
priority = 10
candidates.append((priority, size, file_path))
if not candidates:
return None
candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
return candidates[0][2]
def embed_path(self) -> Optional[str]:
"""
Get the relative path to the embeddable output file for this result.
@@ -2503,25 +2535,45 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
Returns the first file from output_files if set, otherwise tries to
find a reasonable default based on the plugin type.
"""
# Check output_files dict for primary output
snapshot_dir = Path(self.snapshot_dir)
plugin_dir = snapshot_dir / self.plugin
# Fallback: treat output_str as a file path only if it exists on disk
if self.output_str:
try:
output_path = Path(self.output_str)
if output_path.is_absolute():
# If absolute and within snapshot dir, normalize to relative
if snapshot_dir in output_path.parents and output_path.exists():
return str(output_path.relative_to(snapshot_dir))
else:
# If relative, prefer plugin-prefixed path, then direct path
if (plugin_dir / output_path).exists():
return f'{self.plugin}/{output_path}'
if output_path.name in ('index.html', 'index.json') and output_path.parent == Path('.'):
return None
if (snapshot_dir / output_path).exists():
return str(output_path)
except Exception:
pass
# Check output_files dict for primary output (ignore non-output files)
if self.output_files:
# Return first file from output_files (dict preserves insertion order)
first_file = next(iter(self.output_files.keys()), None)
if first_file:
ignored = {'stdout.log', 'stderr.log', 'hook.pid', 'listener.pid', 'cmd.sh'}
output_candidates = [
f for f in self.output_files.keys()
if Path(f).name not in ignored
]
first_file = output_candidates[0] if output_candidates else None
if first_file and (plugin_dir / first_file).exists():
return f'{self.plugin}/{first_file}'
# Fallback: check output_str if it looks like a file path
if self.output_str and ('/' in self.output_str or '.' in self.output_str):
return self.output_str
best_file = self._find_best_output_file(plugin_dir, self.plugin)
if best_file:
return str(best_file.relative_to(snapshot_dir))
# Try to find output file based on plugin's canonical output path
canonical = self.snapshot.canonical_outputs()
plugin_key = f'{self.plugin}_path'
if plugin_key in canonical:
return canonical[plugin_key]
# Fallback to plugin directory
return f'{self.plugin}/'
return None
def create_output_dir(self):
output_dir = Path(self.snapshot_dir) / self.plugin
@@ -2779,7 +2831,7 @@ class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWi
self.output_str = 'Hook did not output ArchiveResult record'
# Walk filesystem and populate output_files, output_size, output_mimetypes
exclude_names = {'stdout.log', 'stderr.log', 'hook.pid', 'listener.pid'}
exclude_names = {'stdout.log', 'stderr.log', 'hook.pid', 'listener.pid', 'cmd.sh'}
mime_sizes = defaultdict(int)
total_size = 0
output_files = {}

View File

@@ -48,6 +48,19 @@ class CustomOutboundWebhookLogFormatter(logging.Formatter):
result = super().format(record)
return result.replace('HTTP Request: ', 'OutboundWebhook: ')
class StripANSIColorCodesFilter(logging.Filter):
_ansi_re = re.compile(r'\x1b\[[0-9;]*m')
_bare_re = re.compile(r'\[[0-9;]*m')
def filter(self, record) -> bool:
msg = record.getMessage()
if isinstance(msg, str) and ('\x1b[' in msg or '[m' in msg):
msg = self._ansi_re.sub('', msg)
msg = self._bare_re.sub('', msg)
record.msg = msg
record.args = ()
return True
ERROR_LOG = tempfile.NamedTemporaryFile().name
@@ -87,6 +100,9 @@ SETTINGS_LOGGING = {
"noisyrequestsfilter": {
"()": NoisyRequestsFilter,
},
"stripansi": {
"()": StripANSIColorCodesFilter,
},
"require_debug_false": {
"()": "django.utils.log.RequireDebugFalse",
},
@@ -101,7 +117,7 @@ SETTINGS_LOGGING = {
"level": "DEBUG",
"markup": False,
"rich_tracebacks": False, # Use standard Python tracebacks (no frame/box)
"filters": ["noisyrequestsfilter"],
"filters": ["noisyrequestsfilter", "stripansi"],
},
"logfile": {
"level": "INFO",
@@ -110,7 +126,7 @@ SETTINGS_LOGGING = {
"maxBytes": 1024 * 1024 * 25, # 25 MB
"backupCount": 10,
"formatter": "rich",
"filters": ["noisyrequestsfilter"],
"filters": ["noisyrequestsfilter", "stripansi"],
},
"outbound_webhooks": {
"class": "rich.logging.RichHandler",

View File

@@ -1,8 +1,10 @@
from django import template
from django.contrib.admin.templatetags.base import InclusionAdminNode
from django.utils.safestring import mark_safe
from django.utils.html import escape
from typing import Union
from pathlib import Path
from archivebox.hooks import (
get_plugin_icon, get_plugin_template, get_plugin_name,
@@ -57,15 +59,18 @@ def plugin_icon(plugin: str) -> str:
Usage: {% plugin_icon "screenshot" %}
"""
return mark_safe(get_plugin_icon(plugin))
icon_html = get_plugin_icon(plugin)
return mark_safe(
f'<span class="abx-plugin-icon" style="display:inline-flex; width:20px; height:20px; align-items:center; justify-content:center;">{icon_html}</span>'
)
@register.simple_tag(takes_context=True)
def plugin_thumbnail(context, result) -> str:
def plugin_card(context, result) -> str:
"""
Render the thumbnail template for an archive result.
Render the card template for an archive result.
Usage: {% plugin_thumbnail result %}
Usage: {% plugin_card result %}
Context variables passed to template:
- result: ArchiveResult object
@@ -74,46 +79,97 @@ def plugin_thumbnail(context, result) -> str:
- plugin: Plugin base name
"""
plugin = get_plugin_name(result.plugin)
template_str = get_plugin_template(plugin, 'thumbnail')
template_str = get_plugin_template(plugin, 'card')
if not template_str:
return ''
# Use embed_path() for the display path
output_path = result.embed_path() if hasattr(result, 'embed_path') else ''
# Use embed_path() for the display path (includes canonical paths)
output_path = result.embed_path() if hasattr(result, 'embed_path') else (result.output_str or '')
icon_html = get_plugin_icon(plugin)
output_lower = (output_path or '').lower()
text_preview_exts = ('.json', '.jsonl', '.txt', '.csv', '.tsv', '.xml', '.yml', '.yaml', '.md', '.log')
force_text_preview = output_lower.endswith(text_preview_exts)
# Create a mini template and render it with context
try:
tpl = template.Template(template_str)
ctx = template.Context({
'result': result,
'snapshot': result.snapshot,
'output_path': output_path,
'plugin': plugin,
})
rendered = tpl.render(ctx)
# Only return non-empty content (strip whitespace to check)
if rendered.strip():
return mark_safe(rendered)
return ''
if template_str and output_path and str(output_path).strip() not in ('.', '/', './') and not force_text_preview:
tpl = template.Template(template_str)
ctx = template.Context({
'result': result,
'snapshot': result.snapshot,
'output_path': output_path,
'plugin': plugin,
'plugin_icon': icon_html,
})
rendered = tpl.render(ctx)
# Only return non-empty content (strip whitespace to check)
if rendered.strip():
return mark_safe(rendered)
except Exception:
return ''
pass
if force_text_preview and output_path and str(output_path).strip() not in ('.', '/', './'):
output_file = Path(output_path)
if not output_file.is_absolute():
output_file = Path(result.snapshot_dir) / output_path
try:
output_file = output_file.resolve()
snap_dir = Path(result.snapshot_dir).resolve()
if snap_dir not in output_file.parents and output_file != snap_dir:
output_file = None
except Exception:
output_file = None
if output_file and output_file.exists() and output_file.is_file():
try:
with output_file.open('rb') as f:
raw = f.read(4096)
text = raw.decode('utf-8', errors='replace').strip()
if text:
lines = text.splitlines()[:6]
snippet = '\n'.join(lines)
escaped = escape(snippet)
preview = (
f'<div class="thumbnail-text" data-plugin="{plugin}" data-compact="1">'
f'<div class="thumbnail-text-header">'
f'<span class="thumbnail-compact-icon">{icon_html}</span>'
f'<span class="thumbnail-text-title">{plugin}</span>'
f'</div>'
f'<pre class="thumbnail-text-pre">{escaped}</pre>'
f'</div>'
)
return mark_safe(preview)
except Exception:
pass
if output_lower.endswith(text_preview_exts):
fallback_label = 'text'
else:
fallback_label = 'output'
fallback = (
f'<div class="thumbnail-compact" data-plugin="{plugin}" data-compact="1">'
f'<span class="thumbnail-compact-icon">{icon_html}</span>'
f'<span class="thumbnail-compact-label">{plugin}</span>'
f'<span class="thumbnail-compact-meta">{fallback_label}</span>'
f'</div>'
)
return mark_safe(fallback)
@register.simple_tag(takes_context=True)
def plugin_embed(context, result) -> str:
def plugin_full(context, result) -> str:
"""
Render the embed iframe template for an archive result.
Render the full template for an archive result.
Usage: {% plugin_embed result %}
Usage: {% plugin_full result %}
"""
plugin = get_plugin_name(result.plugin)
template_str = get_plugin_template(plugin, 'embed')
template_str = get_plugin_template(plugin, 'full')
if not template_str:
return ''
output_path = result.embed_path() if hasattr(result, 'embed_path') else (result.output_str or '')
output_path = result.embed_path() if hasattr(result, 'embed_path') else ''
try:
tpl = template.Template(template_str)
@@ -132,36 +188,6 @@ def plugin_embed(context, result) -> str:
return ''
@register.simple_tag(takes_context=True)
def plugin_fullscreen(context, result) -> str:
"""
Render the fullscreen template for an archive result.
Usage: {% plugin_fullscreen result %}
"""
plugin = get_plugin_name(result.plugin)
template_str = get_plugin_template(plugin, 'fullscreen')
if not template_str:
return ''
output_path = result.embed_path() if hasattr(result, 'embed_path') else (result.output_str or '')
try:
tpl = template.Template(template_str)
ctx = template.Context({
'result': result,
'snapshot': result.snapshot,
'output_path': output_path,
'plugin': plugin,
})
rendered = tpl.render(ctx)
# Only return non-empty content (strip whitespace to check)
if rendered.strip():
return mark_safe(rendered)
return ''
except Exception:
return ''
@register.filter

View File

@@ -8,7 +8,7 @@ from django.views.generic.base import RedirectView
from archivebox.misc.serve_static import serve_static
from archivebox.core.admin_site import archivebox_admin
from archivebox.core.views import HomepageView, SnapshotView, PublicIndexView, AddView, HealthCheckView, live_progress_view
from archivebox.core.views import HomepageView, SnapshotView, SnapshotPathView, PublicIndexView, AddView, HealthCheckView, live_progress_view
from archivebox.workers.views import JobsDashboardView
@@ -32,6 +32,8 @@ urlpatterns = [
path('archive/', RedirectView.as_view(url='/')),
path('archive/<path:path>', SnapshotView.as_view(), name='Snapshot'),
re_path(r'^(?P<username>[^/]+)/(?P<date>\d{4}(?:\d{2})?(?:\d{2})?)/(?P<url>https?://.*)$', SnapshotPathView.as_view(), name='snapshot-path-url'),
re_path(r'^(?P<username>[^/]+)/(?P<date>\d{4}(?:\d{2})?(?:\d{2})?)/(?P<domain>[^/]+)(?:/(?P<snapshot_id>[0-9a-fA-F-]{8,36})(?:/(?P<path>.*))?)?$', SnapshotPathView.as_view(), name='snapshot-path'),
path('admin/core/snapshot/add/', RedirectView.as_view(url='/add/')),
path('add/', AddView.as_view(), name='add'),

View File

@@ -1,7 +1,6 @@
__package__ = 'archivebox.core'
import os
import sys
from django.utils import timezone
import inspect
from typing import Callable, get_type_hints
@@ -26,7 +25,7 @@ import archivebox
from archivebox.config import CONSTANTS, CONSTANTS_CONFIG, DATA_DIR, VERSION
from archivebox.config.common import SHELL_CONFIG, SERVER_CONFIG, ARCHIVING_CONFIG
from archivebox.config.configset import get_flat_config, get_config, get_all_configs
from archivebox.misc.util import base_url, htmlencode, ts_to_date_str
from archivebox.misc.util import base_url, htmlencode, ts_to_date_str, urldecode
from archivebox.misc.serve_static import serve_static_with_byterange_support
from archivebox.misc.logging_util import printable_filesize
from archivebox.search import query_search_index
@@ -52,70 +51,44 @@ class HomepageView(View):
class SnapshotView(View):
# render static html index from filesystem archive/<timestamp>/index.html
@staticmethod
def find_snapshots_for_url(path: str):
"""Return a queryset of snapshots matching a URL-ish path."""
normalized = path
if path.startswith(('http://', 'https://')):
# try exact match on full url / ID first
qs = Snapshot.objects.filter(Q(url=path) | Q(id__icontains=path))
if qs.exists():
return qs
normalized = path.split('://', 1)[1]
# try exact match on full url / ID (without scheme)
qs = Snapshot.objects.filter(
Q(url='http://' + normalized) | Q(url='https://' + normalized) | Q(id__icontains=normalized)
)
if qs.exists():
return qs
# fall back to match on exact base_url
base = base_url(normalized)
qs = Snapshot.objects.filter(
Q(url='http://' + base) | Q(url='https://' + base)
)
if qs.exists():
return qs
# fall back to matching base_url as prefix
return Snapshot.objects.filter(
Q(url__startswith='http://' + base) | Q(url__startswith='https://' + base)
)
@staticmethod
def render_live_index(request, snapshot):
TITLE_LOADING_MSG = 'Not yet archived...'
# Dict of plugin -> ArchiveResult object
archiveresult_objects = {}
# Dict of plugin -> result info dict (for template compatibility)
archiveresults = {}
results = snapshot.archiveresult_set.all()
for result in results:
embed_path = result.embed_path()
abs_path = result.snapshot_dir / (embed_path or 'None')
if (result.status == 'succeeded'
and embed_path
and os.access(abs_path, os.R_OK)
and abs_path.exists()):
if os.path.isdir(abs_path) and not any(abs_path.glob('*.*')):
continue
# Store the full ArchiveResult object for template tags
archiveresult_objects[result.plugin] = result
result_info = {
'name': result.plugin,
'path': embed_path,
'ts': ts_to_date_str(result.end_ts),
'size': abs_path.stat().st_size or '?',
'result': result, # Include the full object for template tags
}
archiveresults[result.plugin] = result_info
# Use canonical_outputs for intelligent discovery
# This method now scans ArchiveResults and uses smart heuristics
canonical = snapshot.canonical_outputs()
# Add any newly discovered outputs from canonical_outputs to archiveresults
outputs = snapshot.discover_outputs()
archiveresults = {out['name']: out for out in outputs}
snap_dir = Path(snapshot.output_dir)
for key, path in canonical.items():
if not key.endswith('_path') or not path or path.startswith('http'):
continue
plugin_name = key.replace('_path', '')
if plugin_name in archiveresults:
continue # Already have this from ArchiveResult
file_path = snap_dir / path
if not file_path.exists() or not file_path.is_file():
continue
try:
file_size = file_path.stat().st_size
if file_size >= 15_000: # Only show files > 15KB
archiveresults[plugin_name] = {
'name': plugin_name,
'path': path,
'ts': ts_to_date_str(file_path.stat().st_mtime or 0),
'size': file_size,
'result': None,
}
except OSError:
continue
# Get available extractor plugins from hooks (sorted by numeric prefix for ordering)
# Convert to base names for display ordering
@@ -131,7 +104,7 @@ class SnapshotView(View):
preferred_types = tuple(preview_priority + [p for p in all_plugins if p not in preview_priority])
all_types = preferred_types + tuple(result_type for result_type in archiveresults.keys() if result_type not in preferred_types)
best_result = {'path': 'None', 'result': None}
best_result = {'path': 'about:blank', 'result': None}
for result_type in preferred_types:
if result_type in archiveresults:
best_result = archiveresults[result_type]
@@ -146,7 +119,6 @@ class SnapshotView(View):
context = {
**snapshot_info,
**snapshot_info.get('canonical', {}),
'title': htmlencode(
snapshot.title
or (snapshot.base_url if snapshot.is_archived else TITLE_LOADING_MSG)
@@ -188,6 +160,14 @@ class SnapshotView(View):
try:
try:
snapshot = Snapshot.objects.get(Q(timestamp=slug) | Q(id__startswith=slug))
canonical_base = snapshot.url_path
if canonical_base != snapshot.legacy_archive_path:
target_path = f'/{canonical_base}/{archivefile or "index.html"}'
query = request.META.get('QUERY_STRING')
if query:
target_path = f'{target_path}?{query}'
return redirect(target_path)
if archivefile == 'index.html':
# if they requested snapshot index, serve live rendered template instead of static html
response = self.render_live_index(request, snapshot)
@@ -221,9 +201,9 @@ class SnapshotView(View):
except Snapshot.MultipleObjectsReturned:
snapshot_hrefs = mark_safe('<br/>').join(
format_html(
'{} <a href="/archive/{}/index.html"><b><code>{}</code></b></a> {} <b>{}</b>',
'{} <a href="/{}/index.html"><b><code>{}</code></b></a> {} <b>{}</b>',
snap.bookmarked_at.strftime('%Y-%m-%d %H:%M:%S'),
snap.timestamp,
snap.archive_path,
snap.timestamp,
snap.url,
snap.title_stripped[:64] or '',
@@ -259,9 +239,9 @@ class SnapshotView(View):
#'</script>'
'</head><body>'
'<center><br/><br/><br/>'
f'Snapshot <a href="/archive/{snapshot.timestamp}/index.html" target="_top"><b><code>[{snapshot.timestamp}]</code></b></a>: <a href="{snapshot.url}" target="_blank" rel="noreferrer">{snapshot.url}</a><br/>'
f'Snapshot <a href="/{snapshot.archive_path}/index.html" target="_top"><b><code>[{snapshot.timestamp}]</code></b></a>: <a href="{snapshot.url}" target="_blank" rel="noreferrer">{snapshot.url}</a><br/>'
f'was queued on {str(snapshot.bookmarked_at).split(".")[0]}, '
f'but no files have been saved yet in:<br/><b><a href="/archive/{snapshot.timestamp}/" target="_top"><code>{snapshot.timestamp}</code></a><code>/'
f'but no files have been saved yet in:<br/><b><a href="/{snapshot.archive_path}/" target="_top"><code>{snapshot.timestamp}</code></a><code>/'
'{}'
f'</code></b><br/><br/>'
'It\'s possible {} '
@@ -270,8 +250,8 @@ class SnapshotView(View):
f'<code style="user-select: all; color: #333">archivebox update -t timestamp {snapshot.timestamp}</code></pre><br/><br/>'
'<div class="text-align: left; width: 100%; max-width: 400px">'
'<i><b>Next steps:</i></b><br/>'
f'- list all the <a href="/archive/{snapshot.timestamp}/" target="_top">Snapshot files <code>.*</code></a><br/>'
f'- view the <a href="/archive/{snapshot.timestamp}/index.html" target="_top">Snapshot <code>./index.html</code></a><br/>'
f'- list all the <a href="/{snapshot.archive_path}/" target="_top">Snapshot files <code>.*</code></a><br/>'
f'- view the <a href="/{snapshot.archive_path}/index.html" target="_top">Snapshot <code>./index.html</code></a><br/>'
f'- go to the <a href="/admin/core/snapshot/{snapshot.pk}/change/" target="_top">Snapshot admin</a> to edit<br/>'
f'- go to the <a href="/admin/core/snapshot/?id__exact={snapshot.id}" target="_top">Snapshot actions</a> to re-archive<br/>'
'- or return to <a href="/" target="_top">the main index...</a></div>'
@@ -288,22 +268,9 @@ class SnapshotView(View):
# slug is a URL
try:
try:
# try exact match on full url / ID first
snapshot = Snapshot.objects.get(
Q(url='http://' + path) | Q(url='https://' + path) | Q(id__icontains=path)
)
snapshot = SnapshotView.find_snapshots_for_url(path).get()
except Snapshot.DoesNotExist:
# fall back to match on exact base_url
try:
snapshot = Snapshot.objects.get(
Q(url='http://' + base_url(path)) | Q(url='https://' + base_url(path))
)
except Snapshot.DoesNotExist:
# fall back to matching base_url as prefix
snapshot = Snapshot.objects.get(
Q(url__startswith='http://' + base_url(path)) | Q(url__startswith='https://' + base_url(path))
)
return redirect(f'/archive/{snapshot.timestamp}/index.html')
raise
except Snapshot.DoesNotExist:
return HttpResponse(
format_html(
@@ -322,20 +289,18 @@ class SnapshotView(View):
status=404,
)
except Snapshot.MultipleObjectsReturned:
snapshots = SnapshotView.find_snapshots_for_url(path)
snapshot_hrefs = mark_safe('<br/>').join(
format_html(
'{} <code style="font-size: 0.8em">{}</code> <a href="/archive/{}/index.html"><b><code>{}</code></b></a> {} <b>{}</b>',
'{} <code style="font-size: 0.8em">{}</code> <a href="/{}/index.html"><b><code>{}</code></b></a> {} <b>{}</b>',
snap.bookmarked_at.strftime('%Y-%m-%d %H:%M:%S'),
str(snap.id)[:8],
snap.timestamp,
snap.archive_path,
snap.timestamp,
snap.url,
snap.title_stripped[:64] or '',
)
for snap in Snapshot.objects.filter(
Q(url__startswith='http://' + base_url(path)) | Q(url__startswith='https://' + base_url(path))
| Q(id__icontains=path)
).only('url', 'timestamp', 'title', 'bookmarked_at').order_by('-bookmarked_at')
for snap in snapshots.only('url', 'timestamp', 'title', 'bookmarked_at').order_by('-bookmarked_at')
)
return HttpResponse(
format_html(
@@ -353,6 +318,108 @@ class SnapshotView(View):
status=404,
)
target_path = f'/{snapshot.archive_path}/index.html'
query = request.META.get('QUERY_STRING')
if query:
target_path = f'{target_path}?{query}'
return redirect(target_path)
class SnapshotPathView(View):
"""Serve snapshots by the new URL scheme: /<username>/<YYYYMMDD>/<domain>/<uuid>/..."""
def get(self, request, username: str, date: str, domain: str | None = None, snapshot_id: str | None = None, path: str = "", url: str | None = None):
if not request.user.is_authenticated and not SERVER_CONFIG.PUBLIC_SNAPSHOTS:
return redirect(f'/admin/login/?next={request.path}')
if username == 'system':
return redirect(request.path.replace('/system/', '/web/', 1))
requested_url = url
if not requested_url and domain and domain.startswith(('http://', 'https://')):
requested_url = domain
snapshot = None
if snapshot_id:
try:
snapshot = Snapshot.objects.get(pk=snapshot_id)
except Snapshot.DoesNotExist:
try:
snapshot = Snapshot.objects.get(id__startswith=snapshot_id)
except Snapshot.DoesNotExist:
snapshot = None
except Snapshot.MultipleObjectsReturned:
snapshot = Snapshot.objects.filter(id__startswith=snapshot_id).first()
else:
# fuzzy lookup by date + domain/url (most recent)
username_lookup = 'system' if username == 'web' else username
if requested_url:
qs = SnapshotView.find_snapshots_for_url(requested_url).filter(crawl__created_by__username=username_lookup)
else:
qs = Snapshot.objects.filter(crawl__created_by__username=username_lookup)
try:
if len(date) == 4:
qs = qs.filter(created_at__year=int(date))
elif len(date) == 6:
qs = qs.filter(created_at__year=int(date[:4]), created_at__month=int(date[4:6]))
elif len(date) == 8:
qs = qs.filter(
created_at__year=int(date[:4]),
created_at__month=int(date[4:6]),
created_at__day=int(date[6:8]),
)
except ValueError:
pass
if requested_url:
snapshot = qs.order_by('-created_at', '-bookmarked_at', '-timestamp').first()
else:
requested_domain = domain or ''
if requested_domain.startswith(('http://', 'https://')):
requested_domain = Snapshot.extract_domain_from_url(requested_domain)
else:
requested_domain = Snapshot.extract_domain_from_url(f'https://{requested_domain}')
# Prefer exact domain matches
matches = [s for s in qs.order_by('-created_at', '-bookmarked_at') if Snapshot.extract_domain_from_url(s.url) == requested_domain]
snapshot = matches[0] if matches else qs.order_by('-created_at', '-bookmarked_at', '-timestamp').first()
if not snapshot:
return HttpResponse(
format_html(
(
'<center><br/><br/><br/>'
'No Snapshots match the given id or url: <code>{}</code><br/><br/><br/>'
'Return to the <a href="/" target="_top">Main Index</a>'
'</center>'
),
snapshot_id or requested_url or domain,
),
content_type="text/html",
status=404,
)
canonical_base = snapshot.url_path
requested_base = f'{username}/{date}/{domain or url or ""}'
if snapshot_id:
requested_base = f'{requested_base}/{snapshot_id}'
if canonical_base != requested_base:
target = f'/{canonical_base}/{path or "index.html"}'
query = request.META.get('QUERY_STRING')
if query:
target = f'{target}?{query}'
return redirect(target)
archivefile = path or "index.html"
if archivefile == "index.html":
return SnapshotView.render_live_index(request, snapshot)
return serve_static_with_byterange_support(
request, archivefile, document_root=snapshot.output_dir, show_indexes=True,
)
class PublicIndexView(ListView):
template_name = 'public_index.html'
@@ -592,7 +659,7 @@ def live_progress_view(request):
'snapshot_id': str(ar.snapshot_id),
'snapshot_url': ar.snapshot.url[:60] if ar.snapshot else '',
'embed_path': embed,
'archive_path': f'/archive/{ar.snapshot.timestamp}/{embed}' if ar.snapshot else '',
'archive_path': f'/{ar.snapshot.archive_path}/{embed}' if ar.snapshot else '',
'end_ts': ar.end_ts.isoformat() if ar.end_ts else None,
})