Files
ArchiveBox/archivebox/api/v1_core.py
Claude 202e5b2e59 Add interactive tags editor widget for Django admin
Implement a sleek inline tag editor with autocomplete and AJAX support:

- Create TagEditorWidget and InlineTagEditorWidget in core/widgets.py
  - Pills display with X remove button, sorted alphabetically
  - Text input with HTML5 datalist autocomplete
  - Enter/Space/Comma to add tags, auto-creates if doesn't exist
  - Backspace removes last tag when input is empty

- Add API endpoints in api/v1_core.py
  - GET /tags/autocomplete/ - search tags by name
  - POST /tags/create/ - get_or_create tag
  - POST /tags/add-to-snapshot/ - add tag to snapshot via AJAX
  - POST /tags/remove-from-snapshot/ - remove tag from snapshot

- Update admin_snapshots.py
  - Replace FilteredSelectMultiple with TagEditorWidget in bulk actions
  - Create SnapshotAdminForm with tags_editor field
  - Update title_str() to render inline tag editor in list view
  - Remove TagInline, use widget instead

- Add CSS styles in templates/admin/base.html
  - Blue gradient pill styling matching admin theme
  - Focus ring and hover states
  - Compact inline variant for list view
2025-12-30 02:18:08 +00:00

460 lines
15 KiB
Python

__package__ = 'archivebox.api'
import math
from uuid import UUID
from typing import List, Optional, Union, Any
from datetime import datetime
from django.db.models import Q
from django.core.exceptions import ValidationError
from django.contrib.auth import get_user_model
from django.shortcuts import redirect
from ninja import Router, Schema, FilterSchema, Field, Query
from ninja.pagination import paginate, PaginationBase
from ninja.errors import HttpError
from archivebox.core.models import Snapshot, ArchiveResult, Tag
from archivebox.api.v1_crawls import CrawlSchema
router = Router(tags=['Core Models'])
class CustomPagination(PaginationBase):
class Input(Schema):
limit: int = 200
offset: int = 0
page: int = 0
class Output(Schema):
total_items: int
total_pages: int
page: int
limit: int
offset: int
num_items: int
items: List[Any]
def paginate_queryset(self, queryset, pagination: Input, **params):
limit = min(pagination.limit, 500)
offset = pagination.offset or (pagination.page * limit)
total = queryset.count()
total_pages = math.ceil(total / limit)
current_page = math.ceil(offset / (limit + 1))
items = queryset[offset : offset + limit]
return {
'total_items': total,
'total_pages': total_pages,
'page': current_page,
'limit': limit,
'offset': offset,
'num_items': len(items),
'items': items,
}
### ArchiveResult #########################################################################
class MinimalArchiveResultSchema(Schema):
TYPE: str = 'core.models.ArchiveResult'
id: UUID
created_at: datetime | None
modified_at: datetime | None
created_by_id: str
created_by_username: str
status: str
retry_at: datetime | None
plugin: str
hook_name: str
process_id: UUID | None
cmd_version: str | None
cmd: list[str] | None
pwd: str | None
output_str: str
output_json: dict | None
output_files: dict | None
output_size: int
output_mimetypes: str
start_ts: datetime | None
end_ts: datetime | None
@staticmethod
def resolve_created_by_id(obj):
return str(obj.created_by.pk)
@staticmethod
def resolve_created_by_username(obj) -> str:
return obj.created_by.username
class ArchiveResultSchema(MinimalArchiveResultSchema):
TYPE: str = 'core.models.ArchiveResult'
snapshot_id: UUID
snapshot_timestamp: str
snapshot_url: str
snapshot_tags: List[str]
@staticmethod
def resolve_snapshot_timestamp(obj):
return obj.snapshot.timestamp
@staticmethod
def resolve_snapshot_url(obj):
return obj.snapshot.url
@staticmethod
def resolve_snapshot_id(obj):
return obj.snapshot_id
@staticmethod
def resolve_snapshot_tags(obj):
return sorted(tag.name for tag in obj.snapshot.tags.all())
class ArchiveResultFilterSchema(FilterSchema):
id: Optional[str] = Field(None, q=['id__startswith', 'snapshot__id__startswith', 'snapshot__timestamp__startswith'])
search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'plugin', 'output_str__icontains', 'id__startswith', 'snapshot__id__startswith', 'snapshot__timestamp__startswith'])
snapshot_id: Optional[str] = Field(None, q=['snapshot__id__startswith', 'snapshot__timestamp__startswith'])
snapshot_url: Optional[str] = Field(None, q='snapshot__url__icontains')
snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name__icontains')
status: Optional[str] = Field(None, q='status')
output_str: Optional[str] = Field(None, q='output_str__icontains')
plugin: Optional[str] = Field(None, q='plugin__icontains')
hook_name: Optional[str] = Field(None, q='hook_name__icontains')
process_id: Optional[str] = Field(None, q='process__id__startswith')
cmd: Optional[str] = Field(None, q='cmd__0__icontains')
pwd: Optional[str] = Field(None, q='pwd__icontains')
cmd_version: Optional[str] = Field(None, q='cmd_version')
created_at: Optional[datetime] = Field(None, q='created_at')
created_at__gte: Optional[datetime] = Field(None, q='created_at__gte')
created_at__lt: Optional[datetime] = Field(None, q='created_at__lt')
@router.get("/archiveresults", response=List[ArchiveResultSchema], url_name="get_archiveresult")
@paginate(CustomPagination)
def get_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
"""List all ArchiveResult entries matching these filters."""
return filters.filter(ArchiveResult.objects.all()).distinct()
@router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema, url_name="get_archiveresult")
def get_archiveresult(request, archiveresult_id: str):
"""Get a specific ArchiveResult by id."""
return ArchiveResult.objects.get(Q(id__icontains=archiveresult_id))
### Snapshot #########################################################################
class SnapshotSchema(Schema):
TYPE: str = 'core.models.Snapshot'
id: UUID
created_by_id: str
created_by_username: str
created_at: datetime
modified_at: datetime
status: str
retry_at: datetime | None
bookmarked_at: datetime
downloaded_at: Optional[datetime]
url: str
tags: List[str]
title: Optional[str]
timestamp: str
archive_path: str
num_archiveresults: int
archiveresults: List[MinimalArchiveResultSchema]
@staticmethod
def resolve_created_by_id(obj):
return str(obj.created_by.pk)
@staticmethod
def resolve_created_by_username(obj):
return obj.created_by.username
@staticmethod
def resolve_tags(obj):
return sorted(tag.name for tag in obj.tags.all())
@staticmethod
def resolve_num_archiveresults(obj, context):
return obj.archiveresult_set.all().distinct().count()
@staticmethod
def resolve_archiveresults(obj, context):
if context['request'].with_archiveresults:
return obj.archiveresult_set.all().distinct()
return ArchiveResult.objects.none()
class SnapshotFilterSchema(FilterSchema):
id: Optional[str] = Field(None, q=['id__icontains', 'timestamp__startswith'])
created_by_id: str = Field(None, q='crawl__created_by_id')
created_by_username: str = Field(None, q='crawl__created_by__username__icontains')
created_at__gte: datetime = Field(None, q='created_at__gte')
created_at__lt: datetime = Field(None, q='created_at__lt')
created_at: datetime = Field(None, q='created_at')
modified_at: datetime = Field(None, q='modified_at')
modified_at__gte: datetime = Field(None, q='modified_at__gte')
modified_at__lt: datetime = Field(None, q='modified_at__lt')
search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains', 'id__icontains', 'timestamp__startswith'])
url: Optional[str] = Field(None, q='url')
tag: Optional[str] = Field(None, q='tags__name')
title: Optional[str] = Field(None, q='title__icontains')
timestamp: Optional[str] = Field(None, q='timestamp__startswith')
bookmarked_at__gte: Optional[datetime] = Field(None, q='bookmarked_at__gte')
bookmarked_at__lt: Optional[datetime] = Field(None, q='bookmarked_at__lt')
@router.get("/snapshots", response=List[SnapshotSchema], url_name="get_snapshots")
@paginate(CustomPagination)
def get_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool = False):
"""List all Snapshot entries matching these filters."""
request.with_archiveresults = with_archiveresults
return filters.filter(Snapshot.objects.all()).distinct()
@router.get("/snapshot/{snapshot_id}", response=SnapshotSchema, url_name="get_snapshot")
def get_snapshot(request, snapshot_id: str, with_archiveresults: bool = True):
"""Get a specific Snapshot by id."""
request.with_archiveresults = with_archiveresults
try:
return Snapshot.objects.get(Q(id__startswith=snapshot_id) | Q(timestamp__startswith=snapshot_id))
except Snapshot.DoesNotExist:
return Snapshot.objects.get(Q(id__icontains=snapshot_id))
### Tag #########################################################################
class TagSchema(Schema):
TYPE: str = 'core.models.Tag'
id: UUID
modified_at: datetime
created_at: datetime
created_by_id: str
created_by_username: str
name: str
slug: str
num_snapshots: int
snapshots: List[SnapshotSchema]
@staticmethod
def resolve_created_by_id(obj):
return str(obj.created_by_id)
@staticmethod
def resolve_created_by_username(obj):
User = get_user_model()
return User.objects.get(id=obj.created_by_id).username
@staticmethod
def resolve_num_snapshots(obj, context):
return obj.snapshot_set.all().distinct().count()
@staticmethod
def resolve_snapshots(obj, context):
if context['request'].with_snapshots:
return obj.snapshot_set.all().distinct()
return Snapshot.objects.none()
@router.get("/tags", response=List[TagSchema], url_name="get_tags")
@paginate(CustomPagination)
def get_tags(request):
request.with_snapshots = False
request.with_archiveresults = False
return Tag.objects.all().distinct()
@router.get("/tag/{tag_id}", response=TagSchema, url_name="get_tag")
def get_tag(request, tag_id: str, with_snapshots: bool = True):
request.with_snapshots = with_snapshots
request.with_archiveresults = False
try:
return Tag.objects.get(id__icontains=tag_id)
except (Tag.DoesNotExist, ValidationError):
return Tag.objects.get(slug__icontains=tag_id)
@router.get("/any/{id}", response=Union[SnapshotSchema, ArchiveResultSchema, TagSchema, CrawlSchema], url_name="get_any", summary="Get any object by its ID")
def get_any(request, id: str):
"""Get any object by its ID (e.g. snapshot, archiveresult, tag, crawl, etc.)."""
request.with_snapshots = False
request.with_archiveresults = False
for getter in [get_snapshot, get_archiveresult, get_tag]:
try:
response = getter(request, id)
if response:
return redirect(f"/api/v1/{response._meta.app_label}/{response._meta.model_name}/{response.id}?{request.META['QUERY_STRING']}")
except Exception:
pass
try:
from archivebox.api.v1_crawls import get_crawl
response = get_crawl(request, id)
if response:
return redirect(f"/api/v1/{response._meta.app_label}/{response._meta.model_name}/{response.id}?{request.META['QUERY_STRING']}")
except Exception:
pass
raise HttpError(404, 'Object with given ID not found')
### Tag Editor API Endpoints #########################################################################
class TagAutocompleteSchema(Schema):
tags: List[dict]
class TagCreateSchema(Schema):
name: str
class TagCreateResponseSchema(Schema):
success: bool
tag_id: int
tag_name: str
created: bool
class TagSnapshotRequestSchema(Schema):
snapshot_id: str
tag_name: Optional[str] = None
tag_id: Optional[int] = None
class TagSnapshotResponseSchema(Schema):
success: bool
tag_id: int
tag_name: str
@router.get("/tags/autocomplete/", response=TagAutocompleteSchema, url_name="tags_autocomplete")
def tags_autocomplete(request, q: str = ""):
"""Return tags matching the query for autocomplete."""
if not q:
# Return all tags if no query (limited to 50)
tags = Tag.objects.all().order_by('name')[:50]
else:
tags = Tag.objects.filter(name__icontains=q).order_by('name')[:20]
return {
'tags': [{'id': tag.pk, 'name': tag.name, 'slug': tag.slug} for tag in tags]
}
@router.post("/tags/create/", response=TagCreateResponseSchema, url_name="tags_create")
def tags_create(request, data: TagCreateSchema):
"""Create a new tag or return existing one."""
name = data.name.strip()
if not name:
raise HttpError(400, 'Tag name is required')
tag, created = Tag.objects.get_or_create(
name__iexact=name,
defaults={
'name': name,
'created_by': request.user if request.user.is_authenticated else None,
}
)
# If found by case-insensitive match, use that tag
if not created:
tag = Tag.objects.filter(name__iexact=name).first()
return {
'success': True,
'tag_id': tag.pk,
'tag_name': tag.name,
'created': created,
}
@router.post("/tags/add-to-snapshot/", response=TagSnapshotResponseSchema, url_name="tags_add_to_snapshot")
def tags_add_to_snapshot(request, data: TagSnapshotRequestSchema):
"""Add a tag to a snapshot. Creates the tag if it doesn't exist."""
# Get the snapshot
try:
snapshot = Snapshot.objects.get(
Q(id__startswith=data.snapshot_id) | Q(timestamp__startswith=data.snapshot_id)
)
except Snapshot.DoesNotExist:
raise HttpError(404, 'Snapshot not found')
except Snapshot.MultipleObjectsReturned:
snapshot = Snapshot.objects.filter(
Q(id__startswith=data.snapshot_id) | Q(timestamp__startswith=data.snapshot_id)
).first()
# Get or create the tag
if data.tag_name:
name = data.tag_name.strip()
if not name:
raise HttpError(400, 'Tag name is required')
tag, _ = Tag.objects.get_or_create(
name__iexact=name,
defaults={
'name': name,
'created_by': request.user if request.user.is_authenticated else None,
}
)
# If found by case-insensitive match, use that tag
tag = Tag.objects.filter(name__iexact=name).first() or tag
elif data.tag_id:
try:
tag = Tag.objects.get(pk=data.tag_id)
except Tag.DoesNotExist:
raise HttpError(404, 'Tag not found')
else:
raise HttpError(400, 'Either tag_name or tag_id is required')
# Add the tag to the snapshot
snapshot.tags.add(tag)
return {
'success': True,
'tag_id': tag.pk,
'tag_name': tag.name,
}
@router.post("/tags/remove-from-snapshot/", response=TagSnapshotResponseSchema, url_name="tags_remove_from_snapshot")
def tags_remove_from_snapshot(request, data: TagSnapshotRequestSchema):
"""Remove a tag from a snapshot."""
# Get the snapshot
try:
snapshot = Snapshot.objects.get(
Q(id__startswith=data.snapshot_id) | Q(timestamp__startswith=data.snapshot_id)
)
except Snapshot.DoesNotExist:
raise HttpError(404, 'Snapshot not found')
except Snapshot.MultipleObjectsReturned:
snapshot = Snapshot.objects.filter(
Q(id__startswith=data.snapshot_id) | Q(timestamp__startswith=data.snapshot_id)
).first()
# Get the tag
if data.tag_id:
try:
tag = Tag.objects.get(pk=data.tag_id)
except Tag.DoesNotExist:
raise HttpError(404, 'Tag not found')
elif data.tag_name:
try:
tag = Tag.objects.get(name__iexact=data.tag_name.strip())
except Tag.DoesNotExist:
raise HttpError(404, 'Tag not found')
else:
raise HttpError(400, 'Either tag_name or tag_id is required')
# Remove the tag from the snapshot
snapshot.tags.remove(tag)
return {
'success': True,
'tag_id': tag.pk,
'tag_name': tag.name,
}