__package__ = 'archivebox.core' import os from pathlib import Path from django.contrib import admin from django.utils.html import format_html, mark_safe from django.core.exceptions import ValidationError from django.urls import reverse, resolve from django.utils import timezone from archivebox.config import DATA_DIR from archivebox.config.common import SERVER_CONFIG from archivebox.misc.paginators import AccelleratedPaginator from archivebox.base_models.admin import BaseModelAdmin from archivebox.hooks import get_plugin_icon from archivebox.core.models import ArchiveResult, Snapshot def render_archiveresults_list(archiveresults_qs, limit=50): """Render a nice inline list view of archive results with status, plugin, output, and actions.""" results = list(archiveresults_qs.order_by('plugin').select_related('snapshot')[:limit]) if not results: return mark_safe('
No Archive Results yet...
') # Status colors status_colors = { 'succeeded': ('#166534', '#dcfce7'), # green 'failed': ('#991b1b', '#fee2e2'), # red 'queued': ('#6b7280', '#f3f4f6'), # gray 'started': ('#92400e', '#fef3c7'), # amber } rows = [] for idx, result in enumerate(results): status = result.status or 'queued' color, bg = status_colors.get(status, ('#6b7280', '#f3f4f6')) # Get plugin icon icon = get_plugin_icon(result.plugin) # Format timestamp end_time = result.end_ts.strftime('%Y-%m-%d %H:%M:%S') if result.end_ts else '-' # Truncate output for display full_output = result.output_str or '-' output_display = full_output[:60] if len(full_output) > 60: output_display += '...' # Get full command as tooltip cmd_str = ' '.join(result.cmd) if isinstance(result.cmd, list) else str(result.cmd or '-') # Build output link - use embed_path() which checks output_files first embed_path = result.embed_path() if hasattr(result, 'embed_path') else None output_link = f'/archive/{result.snapshot.timestamp}/{embed_path}' if embed_path and result.status == 'succeeded' else f'/archive/{result.snapshot.timestamp}/' # Get version - try cmd_version field version = result.cmd_version if result.cmd_version else '-' # Unique ID for this row's expandable output row_id = f'output_{idx}_{str(result.id)[:8]}' rows.append(f''' {str(result.id)[:8]} {status} {icon} {result.plugin} {output_display} {end_time} {version}
📄 ✏️
Details & Output
ID: {str(result.id)} Version: {version} PWD: {result.pwd or '-'}
Output:
{full_output}
Command:
{cmd_str}
''') total_count = archiveresults_qs.count() footer = '' if total_count > limit: footer = f''' Showing {limit} of {total_count} results   View all → ''' return mark_safe(f'''
{''.join(rows)} {footer}
ID Status Plugin Output Completed Version Actions
''') class ArchiveResultInline(admin.TabularInline): name = 'Archive Results Log' model = ArchiveResult parent_model = Snapshot # fk_name = 'snapshot' extra = 0 sort_fields = ('end_ts', 'plugin', 'output_str', 'status', 'cmd_version') readonly_fields = ('id', 'result_id', 'completed', 'command', 'version') fields = ('start_ts', 'end_ts', *readonly_fields, 'plugin', 'cmd', 'cmd_version', 'pwd', 'status', 'retry_at', 'output_str') # exclude = ('id',) ordering = ('end_ts',) show_change_link = True # # classes = ['collapse'] def get_parent_object_from_request(self, request): resolved = resolve(request.path_info) try: return self.parent_model.objects.get(pk=resolved.kwargs['object_id']) except (self.parent_model.DoesNotExist, ValidationError): return None @admin.display( description='Completed', ordering='end_ts', ) def completed(self, obj): return format_html('

{}

', obj.end_ts.strftime('%Y-%m-%d %H:%M:%S')) def result_id(self, obj): return format_html('[{}]', reverse('admin:core_archiveresult_change', args=(obj.id,)), str(obj.id)[:8]) def command(self, obj): return format_html('{}', " ".join(obj.cmd or [])) def version(self, obj): return format_html('{}', obj.cmd_version or '-') def get_formset(self, request, obj=None, **kwargs): formset = super().get_formset(request, obj, **kwargs) snapshot = self.get_parent_object_from_request(request) # import ipdb; ipdb.set_trace() # formset.form.base_fields['id'].widget = formset.form.base_fields['id'].hidden_widget() # default values for new entries formset.form.base_fields['status'].initial = 'succeeded' formset.form.base_fields['start_ts'].initial = timezone.now() formset.form.base_fields['end_ts'].initial = timezone.now() formset.form.base_fields['cmd_version'].initial = '-' formset.form.base_fields['pwd'].initial = str(snapshot.output_dir) formset.form.base_fields['cmd'].initial = '["-"]' formset.form.base_fields['output_str'].initial = 'Manually recorded cmd output...' if obj is not None: # hidden values for existing entries and new entries formset.form.base_fields['start_ts'].widget = formset.form.base_fields['start_ts'].hidden_widget() formset.form.base_fields['end_ts'].widget = formset.form.base_fields['end_ts'].hidden_widget() formset.form.base_fields['cmd'].widget = formset.form.base_fields['cmd'].hidden_widget() formset.form.base_fields['pwd'].widget = formset.form.base_fields['pwd'].hidden_widget() formset.form.base_fields['cmd_version'].widget = formset.form.base_fields['cmd_version'].hidden_widget() return formset def get_readonly_fields(self, request, obj=None): if obj is not None: return self.readonly_fields else: return [] class ArchiveResultAdmin(BaseModelAdmin): list_display = ('id', 'created_at', 'snapshot_info', 'tags_str', 'status', 'plugin_with_icon', 'cmd_str', 'output_str') sort_fields = ('id', 'created_at', 'plugin', 'status') readonly_fields = ('cmd_str', 'snapshot_info', 'tags_str', 'created_at', 'modified_at', 'output_summary', 'plugin_with_icon') search_fields = ('id', 'snapshot__url', 'plugin', 'output_str', 'cmd_version', 'cmd', 'snapshot__timestamp') autocomplete_fields = ['snapshot'] fieldsets = ( ('Snapshot', { 'fields': ('snapshot', 'snapshot_info', 'tags_str'), 'classes': ('card', 'wide'), }), ('Plugin', { 'fields': ('plugin', 'plugin_with_icon', 'status', 'retry_at'), 'classes': ('card',), }), ('Timing', { 'fields': ('start_ts', 'end_ts', 'created_at', 'modified_at'), 'classes': ('card',), }), ('Command', { 'fields': ('cmd', 'cmd_str', 'cmd_version', 'pwd'), 'classes': ('card',), }), ('Output', { 'fields': ('output_str', 'output_json', 'output_files', 'output_size', 'output_mimetypes', 'output_summary'), 'classes': ('card', 'wide'), }), ) list_filter = ('status', 'plugin', 'start_ts') ordering = ['-start_ts'] list_per_page = SERVER_CONFIG.SNAPSHOTS_PER_PAGE paginator = AccelleratedPaginator save_on_top = True actions = ['delete_selected'] class Meta: verbose_name = 'Archive Result' verbose_name_plural = 'Archive Results' def change_view(self, request, object_id, form_url="", extra_context=None): self.request = request return super().change_view(request, object_id, form_url, extra_context) @admin.display( description='Snapshot Info' ) def snapshot_info(self, result): return format_html( '[{}]   {}   {}
', result.snapshot.timestamp, str(result.snapshot.id)[:8], result.snapshot.bookmarked_at.strftime('%Y-%m-%d %H:%M'), result.snapshot.url[:128], ) @admin.display( description='Snapshot Tags' ) def tags_str(self, result): return result.snapshot.tags_str() @admin.display(description='Plugin', ordering='plugin') def plugin_with_icon(self, result): icon = get_plugin_icon(result.plugin) return format_html( '{} {}', result.plugin, icon, result.plugin, ) def cmd_str(self, result): return format_html( '
{}
', ' '.join(result.cmd) if isinstance(result.cmd, list) else str(result.cmd), ) def output_display(self, result): # Determine output link path - use embed_path() which checks output_files embed_path = result.embed_path() if hasattr(result, 'embed_path') else None output_path = embed_path if (result.status == 'succeeded' and embed_path) else 'index.html' return format_html( '↗️
{}
', result.snapshot.timestamp, output_path, result.output_str, ) def output_summary(self, result): snapshot_dir = Path(DATA_DIR) / str(result.pwd).split('data/', 1)[-1] output_html = format_html( '
{}

', result.output_str, ) output_html += format_html('See result files ...
', str(result.snapshot.timestamp))
        embed_path = result.embed_path() if hasattr(result, 'embed_path') else ''
        path_from_embed = (snapshot_dir / (embed_path or ''))
        output_html += format_html('{}/{}

', str(snapshot_dir), str(embed_path)) if os.access(path_from_embed, os.R_OK): root_dir = str(path_from_embed) else: root_dir = str(snapshot_dir) # print(root_dir, str(list(os.walk(root_dir)))) for root, dirs, files in os.walk(root_dir): depth = root.replace(root_dir, '').count(os.sep) + 1 if depth > 2: continue indent = ' ' * 4 * (depth) output_html += format_html('{}{}/
', indent, os.path.basename(root)) indentation_str = ' ' * 4 * (depth + 1) for filename in sorted(files): is_hidden = filename.startswith('.') output_html += format_html('{}{}
', int(not is_hidden), indentation_str, filename.strip()) return output_html + mark_safe('
') def register_admin(admin_site): admin_site.register(ArchiveResult, ArchiveResultAdmin)